如何解决卡夫卡消费者返回空值
我试图在 Java 中创建一个 kafka 使用者,但无论如何 consumer.poll(5000)
方法调用都返回空值。这是代码:
package com.apache.kafka.consumer;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public class Consumer {
public static void main(String[] args) throws Exception {
final Logger logger = Logger.getLogger(Consumer.class);
//Kafka consumer configuration settings
String topicName = "mytopic";
Properties props = new Properties();
props.put("bootstrap.servers","localhost:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms","1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy","range");
KafkaConsumer<String,String> consumer = new
KafkaConsumer<String,String>(props);
//Kafka Consumer subscribes list of topics here.
consumer.subscribe("sampletopic");
while (true) {
Map<String,ConsumerRecords<String,String>> records = consumer.poll(0);
for (ConsumerRecords<String,String> record : records.values()) {
System.out.println(records);
}
}
}
}
请帮忙!!!
我已经创建了主题并在其中添加了一些数据,并且zookeeper和kafka运行良好。我不知道为什么 poll()
方法返回 null。
解决方法
对 poll
的调用需要处于循环中,这就是文献将其称为轮询循环的原因。
如果它返回 null
,要么是轮询过早并退出 main
,要么主题中没有数据
在此处查看使用示例https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
Properties props = new Properties();
props.setProperty("bootstrap.servers","localhost:9092");
props.setProperty("group.id","test");
props.setProperty("enable.auto.commit","true");
props.setProperty("auto.commit.interval.ms","1000");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo","bar"));
while (true) {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String,String> record : records)
System.out.printf("offset = %d,key = %s,value = %s%n",record.offset(),record.key(),record.value());
}
注意循环^
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。