如何解决如何在Spark结构化流媒体中的每个Spark任务中获取写入Kafka的记录数?
我通过扩展CustomListener
创建了SparkListener
。当我将数据写入文件时,它确实会打印在spark任务中写入的记录数,但是如果我写入Kafka,则recordsWrittenCount
始终为零。
如何获取每个Spark任务中写入kafka的记录数?
class CustomListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
var recordsWrittenCount = 0L
var recordsReadCount = 0L
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
recordsReadCount += taskEnd.taskMetrics.inputMetrics.recordsRead
println("TaskId: " + taskEnd.taskInfo.taskId + " recordsReadCount: " + recordsReadCount + " recordsWrittenCount: " + recordsWrittenCount)
}
}
}
object TestReader extends App {
val spark = SparkSession.builder().master("local[1]").getOrCreate()
spark.sparkContext.addSparkListener(new CustomListener)
val rates = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","inTopic")
.load()
rates
.writeStream
.trigger(Trigger.ProcessingTime("30 seconds"))
.foreachBatch {
(batchDF: DataFrame,batchId: Long) => {
// Below prints correct value for recordsWrittenCount
batchDF
.write
.format("csv")
.option("delimiter","|")
.mode(SaveMode.Overwrite)
.save("/tmp/KafkaDir")
//This always returns zero value for recordsWrittenCount
// batchDF
// .write
// .format("kafka")
// .option("kafka.bootstrap.servers","localhost:9092")
// .option("checkpointLocation","/tmp/test2")
// .option("topic","outTopic")
// .save()
}
}
.start()
spark.streams.awaitAnyTermination()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。