如何解决如何获得Kafka使用者Alpakka返回的Map [String,String]?
我应该从Kafka使用者那里得到一个 Map [String,String] ,但我真的不知道如何。我设法配置了使用者,它工作正常,但我不知道如何获取地图。
implicit val system: ActorSystem = ActorSystem()
val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val = kafkaConsumerSettings =
ConsumerSettings(consumerConfig,new StringDeserializer,new StringDeserializer)
.withBootstrapServers(localhost:9094)
.withGroupId(group1)
Consumer
.plainSource(kafkaConsumerSettings,Subscriptions.topics(entity.entity_name))
.toMat(Sink.foreach(println))(DrainingControl.apply)
.run()
解决方法
Lightbend's recommendation用于处理字节数组,同时反序列化来自Kafka的传入数据
消息反序列化的一般建议是使用字节数组(或字符串)作为值,并在Akka Stream中的映射操作中进行反序列化,而不是直接在Kafka反序列化器中实现。在Akka Stream中明确处理反序列化后,如下面的示例所示,更容易实现所需的错误处理策略。
为此,您可以使用以下设置来设置使用者:
val consumerSettings = ConsumerSettings(consumerConfig,new StringDeserializer,new ByteArrayDeserializer)
然后通过从Record类调用.value()
方法获得结果。为了反序列化,我建议使用circe +颚。这段代码可以解决问题。
import io.circe.jawn
import io.circe.generic.auto._
val bytes = record.value()
val data = jawn.parseByteBuffer(ByteBuffer.wrap(bytes)).flatMap(_.as[Map[String,String]])
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。