如何解决无法使用Spark结构的流将数据发送到MongoDB
我遵循了Unable to send data to MongoDB using Kafka-Spark Structured Streaming,将数据从Spark结构化流传输到mongoDB,并成功实现了它,但存在一个问题。 就像when函数
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
if (messageCountAccum != null)
messageCountAccum.add(1)
}
代码正在执行,没有任何问题,但是没有数据发送到MongoDB
但是如果我添加这样的打印语句
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
result.foreach(println) //print statement
if (messageCountAccum != null)
messageCountAccum.add(1)
}
数据正在插入MongoDB
我不知道为什么????
解决方法
foreach
初始化写入器接收器。如果没有 foreach,您的数据框永远不会被计算。
Try this :
val df = // your df here
df.map(r => process(r))
df.count()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。