如何解决将Avro消息推送到kafka主题
我正在尝试使用 kafka-node-avro
库可将数据推送到现有主题。 我已经使用curl和主题将模式添加到SchemaRegistry中。 我收到以下错误:
Error: Topic name or id is needed to build Schema
at new Shema (/work/node_modules/kafka-node-avro/lib/schema.js:7:41)
at Object.Pool.getByName (/work/node_modules/kafka-node-avro/lib/schemaPool.js:70:10)
at new Promise (<anonymous>)
我的代码段如下:
const Settings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},"schema": {
"registry" : config.KafkaRegistry
}
};
console.log("settings registry: ",config.KafkaRegistry);
console.log("settings kafkaHost: ",config.KafkaHost)
KafkaAvro.init(Settings).then( kafka => {
const producer = kafka.addProducer();
let payloads = [
{
topic: 'classifier-response-test',messages: JSON.stringify(kafkaData)
}
];
producer.send(payloads).then( success => {
// Message was sent encoded with Avro Schema
console.log("message sent ! Awesome ! :) ")
},error => {
// Something wrong happen
console.log("There seems that there is a mistake ! try Again ;) ")
console.log(error)
});
},error => {
// something wrong happen
console.log("There seems that there is a global mistake ! try Again ;) ")
console.log(error)
});
解决方法
问题是我们需要将主题列表放在架构设置中,以在主题和架构注册表之间建立链接。 我们可以输入模式ID或主题名称。
const kafkaSettings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},"schema": {
"registry" : config.KafkaRegistry,"topics": [
{"name": "classifier-response-test" }
]
}
};
,
该库尚未准备好send
一条消息列表,这是为什么它试图在发送机制中将模式动态添加到模式池时抱怨的原因。最简单的解决方案是一次发送1条消息,在您的代码示例中可能就像
const Settings = {
"kafka" : {
"kafkaHost" : config.KafkaHost
},"schema": {
"registry" : config.KafkaRegistry
}
};
KafkaAvro.init(Settings).then( kafka => {
kafka.send({
topic: 'classifier-response-test',messages: JSON.stringify(kafkaData)
}).then( success => {
// Message was sent encoded with Avro Schema
console.log("message sent ! Awesome ! :) ")
},error => {
// Something wrong happen
console.log("There seems that there is a mistake ! try Again ;) ")
console.log(error)
});
},error => {
// something wrong happen
console.log("There seems that there is a global mistake ! try Again ;) ")
console.log(error)
});
感谢您使用 kafka-node-avro
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。