如何解决Spark结构化流-每个微批处理都检查一种方法,如果返回true,则停止Spark作业
我有一个来自kafka的spark结构化流应用程序。我正在尝试实现类似这样的方法。
- 每微型批次运行一次方法。如果为true,则停止spark上下文。
以下代码有2个问题。 -它将在每个执行器中运行。 (一次批量生产不会一次)。
- 如果返回true,则不确定如何停止spark上下文。在每个流作家里面时。
class MyStreamApplication(spark: SparkSession) extends java.io.Serializable {
// Fetch config vars from Env - START
@transient val CG1 = {
val props = new Properties()
props.put("bootstrap.servers",BOOTSTRAP_SERVERS)
props.put("group.id",CG_NAME1)
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset","earliest")
new KafkaConsumer[String,String](props)
}
@transient val CG2 = {
val props = new Properties()
props.put("bootstrap.servers",CG_NAME2)
props.put("key.deserializer",String](props)
}
val otherTopic = "topic1"
val consumer1 = CG1
val consumer2 = CG2
// Setup connection to Kafka
val kafka = spark.readStream
.format("kafka")
.option("maxOffsetsPerTrigger",MAX_OFFSETS_PER_TRIGGER)
.option("kafka.bootstrap.servers",BOOTSTRAP_SERVERS) // comma separated list of broker:host
.option("subscribe",topic) // comma separated list of topics
.option("startingOffsets","earliest")
.option("checkpointLocation",CHECKPOINT_LOCATION)
.option("failOnDataLoss","false")
.option("minPartitions",sys.env.getOrElse("MIN_PARTITIONS","64").toInt)
//# .option("kafka.group.id",s"CatchupStream-int-102")
.load()
print(kafka.printSchema())
val consoleOutput = kafka
.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)","CAST(topic AS STRING)","CAST(partition AS INTEGER)","CAST(offset AS LONG)","CAST(timestamp AS Timestamp)","CAST(timestampType AS Integer)")
.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long,epochId: Long): Boolean = true
override def process(row: Row): Unit = {
println(
s"Record received in data frame is -> " + row.mkString)
println(
s"munchkinId: ${row.getAs[String]("key")}," +
s"value: ${row.getAs[String]("value")}")
val flag=runProcess(otherTopic,consumer1,consumer2) //Consumer1 and Consumer2 are KafkaConsumer objects defined outside of foreachWriter
if (flag= true)
{
///stop the current spark job
}
}
override def close(errorOrNull: Throwable): Unit = {}
})
.outputMode("append")
// .format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.option("checkpointLocation",CHECKPOINT_LOCATION)
.start()
consoleOutput.awaitTermination()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。