如何解决反序列化交易一致的消费者消息
该问题也与我之前提出的问题有关。 IIDR CDC with Transaction Details to Kafka 仍未确定。
事务一致的使用者Api的示例AvroConsole在控制台中输出消息,并使用KafkaDeserializer将byte []反序列化为Object 示例TCC控制台链接https://www.ibm.com/support/knowledgecenter/SSTRGZ_11.4.0/com.ibm.cdcdoc.cdckafka.doc/tasks/kafkatccdev.html
我们尝试使用Java KafkaConsumer类执行相同的操作,并且能够使用byte []解串器将其打印出来。 ConsumerRecord键和consumerRecord值仍为序列化格式。
示例代码在下面
final Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,APPLICATION_ID + "-" + UUID.randomUUID().toString());
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
prop.put("schema.registry.url",SCHEMA_REGISTRY_URL);
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID + "-" + UUID.randomUUID().toString());
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName());
consumer = new KafkaConsumer<byte[],byte[]>(properties);
consumer.subscribe(Arrays.asList(TCC_TOPIC));
consumer.seekToBeginning(consumer.assignment());
Map<String,Object> keyDeserializerConfig = new HashMap<String,Object>();
keyDeserializerConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,StreamConfigurator.SCHEMA_REGISTRY_URL);
keyDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
KafkaAvroDeserializer keyDeserializer = new KafkaAvroDeserializer ();
keyDeserializer.configure(keyDeserializerConfig,true);
Map<String,Object> valueDeserializerConfig = new HashMap<String,Object>();
valueDeserializerConfig.putAll(keyDeserializerConfig);
valueDeserializerConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,"false");
KafkaAvroDeserializer valueDeserializer = new KafkaAvroDeserializer ();
valueDeserializer.configure(valueDeserializerConfig,false);
while (true) {
ConsumerRecords<byte[],byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<byte[],byte[]> record : records)
{
System.out.println(record.topic());
System.out.println(record.partition());
System.out.println(keyDeserializer.deserialize("key",record.key()));
System.out.println(valueDeserializer.deserialize("value",record.value()));
System.out.println(record.toString());
}
}
我们在下面的代码行中遇到运行时错误
System.out.println(keyDeserializer.deserialize(“ key”,record.key()));
错误是 线程“主”中的异常org.apache.kafka.common.errors.SerializationException:反序列化ID -1的Avro消息时出错 引起原因:org.apache.kafka.common.errors.SerializationException:未知的魔术字节!
任何帮助将不胜感激。没有很多更好的文档或样本来为交易一致的消费者主题编写消费者代码。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。