1. 引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
2. 生产者
package org.study.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; /** * 生产者 */ public class ProducerSample { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); //zookeeper的地址 props.put("zk.connect", "127.0.0.1:2181"); //用于建立与 kafka 集群连接的 host/port 组 props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test-topic"; Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 1")); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 2")); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 3")); producer.close(); } }
3. 消费者
package org.study.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * 消费者 */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic";// topic name Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。 props.put("group.id", "testGroup1");// Consumer Group Name props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交 props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。