流程
- kafka配置
- 创建消费者
- 关注主题ct
- 获取数据
- 将数据写入HBase
consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
package com.csw.ct.consumer.bean;
import com.csw.ct.common.bean.Consumer;
import com.csw.ct.common.constant.Names;
import com.csw.ct.consumer.dao.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
/**
* 通话日志消费者对象
*/
public class CalllogConsumer implements Consumer {
/**
* 消费数据
*/
public void consume() {
try {
//创建配置对象
Properties prop = new Properties ();
prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));
//获取flume采集的数据
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);
//关注主题
consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));
//HBase数据访问对象
HBaseDao dao = new HBaseDao ();
//初始化
dao.init();
//消费数据
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println (consumerRecord.value());
//插入数据
//dao.insertDate(consumerRecord.value());
Calllog log = new Calllog(consumerRecord.value());
dao.insertDate (log);
}
}
} catch (Exception e) {
e.printStackTrace ();
}
}
public void close() throws IOException {
}
}
写入HBase具体代码
https://www.cnblogs.com/chenshaowei/p/12736522.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。