微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

电信客服_Kafka消费者写入HBase

流程

  • kafka配置
  • 创建消费者
  • 关注主题ct
  • 获取数据
  • 将数据写入HBase
consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
package com.csw.ct.consumer.bean;

import com.csw.ct.common.bean.Consumer;
import com.csw.ct.common.constant.Names;
import com.csw.ct.consumer.dao.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
 * 通话日志消费者对象
 */
public class CalllogConsumer implements Consumer {
    /**
     * 消费数据
     */
    public void consume() {

        try {
            //创建配置对象
            Properties prop = new Properties ();
            prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));

            //获取flume采集的数据
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);

            //关注主题
            consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));

            //HBase数据访问对象
            HBaseDao dao = new HBaseDao ();
            //初始化
            dao.init();

            //消费数据
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println (consumerRecord.value());
                    //插入数据
                    //dao.insertDate(consumerRecord.value());
                    Calllog log = new Calllog(consumerRecord.value());

                    dao.insertDate (log);
                }
            }
        } catch (Exception e) {
            e.printStackTrace ();
        }

    }


    public void close() throws IOException {

    }
}

写入HBase具体代码
https://www.cnblogs.com/chenshaowei/p/12736522.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐