如何解决流星Mongo集合为每个游标迭代找到并保存到ElasticSearch问题
我有连接到MongoDB的Meteor App。
在mongo中,我有一个表,该表具有约70万条记录。
每周我都有一份Cron工作,我从表中读取所有记录(使用Mongo Cursor),并且以每10k的批处理方式将它们插入Elastic Search中,以便对其进行索引。
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main',_type: 'article',_id: doc.id }
},doc);
if (0 === articles.length % 10000) {
client.bulk({ maxRetries: 5,index: 'main',type: 'article',body: articles })
data = []
}
})
由于每个记录都是同步的,因此在继续运行之前先遍历每个记录,并且client.bulk是异步的,这使弹性搜索服务器超载,并且由于内存不足异常而崩溃。
有没有一种方法可以在插入完成后暂停forEach?我尝试了async / await,但这似乎不起作用。
let articles = []
Collections.Articles.find({}).forEach(async function(doc) {
articles.push({
index: {_index: 'main',doc);
if (0 === articles.length % 10000) {
await client.bulk({ maxRetries: 5,body: articles })
data = []
}
})
有什么办法实现这一目标吗?
编辑:我正在尝试实现这样的目标-如果我使用诺言
let articles = []
Collections.Articles.find({}).forEach(function(doc) {
articles.push({
index: {_index: 'main',doc);
if (0 === articles.length % 10000) {
// Pause FETCHING rows with forEach
client.bulk({ maxRetries: 5,body: articles }).then(() => {
console.log('inserted')
// RESUME FETCHING rows with forEach
console.log("RESUME READING");
})
data = []
}
})
解决方法
如果您关注不受限制的迭代,则可以使用内部的Meteor._sleepForMs
方法,该方法允许您在同步样式的代码中放置异步超时:
Collections.Articles.find().forEach((doc,index) => {
console.log(index,doc._id)
Meteor._sleepForMs(timeout)
})
现在,这在Meteor环境(Meteor.startup
,Meteor.methods
,Meteor.publish
)中可以正常工作。
您的cron可能不在此环境中(= Fiber),因此您可以编写一个绑定该环境的包装器:
const bound = fct => Meteor.bindEnvironment(fct)
const iterateSlow = bound(function (timeout) {
Collections.Articles.find().forEach((doc,index) => {
console.log(index,doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
iterateSlow(50) // iterates with 50ms timeout
这是一个完整的最小示例,您可以使用一个新项目来复制它:
// create a minimal collection
const MyDocs = new Mongo.Collection('myDocs')
// fill the collection
Meteor.startup(() => {
for (let i = 0; i < 100; i++) {
MyDocs.insert({})
}
})
// bind helper
const bound = fct => Meteor.bindEnvironment(fct)
// iterate docs with interval between
const iterateSlow = bound(function (timeout) {
MyDocs.find().forEach((doc,doc._id)
Meteor._sleepForMs(timeout)
})
return true
})
// simulate external environment,like when cron runs
setTimeout(() => {
iterateSlow(50)
},2000)
,
设法使其与ES2018异步迭代配合使用
有一个想法
Using async/await with a forEach loop
这是有效的代码
let articles = []
let cursor = Collections.Articles.find({})
for await (doc of cursor) {
articles.push({
index: {_index: 'main',_type: 'article',_id: doc.id }
},doc);
if (articles.length === 10000) {
await client.bulk({ maxRetries: 5,index: 'trusted',type: 'artikel',body: articles })
articles = []
}
}
这正常工作,并且设法将所有记录插入到弹性搜索中而不会崩溃。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。