如何解决如何将Protobuf数据从Flink转发到Kafka和stdout?
我想在此处添加一些代码,并从Flink中输出protobuf数据。
我正在使用Flink的Apache Kafka连接器,以便将Flink连接到Kafka。
这是我Flink的代码。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers","localhost:9092").
val producer = FlinkKafkaProducer011(topic,new myProtobufSchema,props)
env.addSink(producer)
env.execute("To Kafka")
这是我的卡夫卡代码。
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG,"protobuf-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
// TODO: implement here to stdout
val streams: KafkaStreams = new KafkaStreams(builder.build(),props)
streams.start()
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
解决方法
您需要设置https://whoer.net才能从某个主题中消费
val builder: StreamsBuilder = new StreamsBuilder()
.stream(topic)
.print(Printed.toSysOut());