java实现hbase数据导出

1. HBase-client方式实现

1.1 依赖

 <!--HBase依赖坐标-->
 <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.6</version>
        <exclusions><!--排除依赖:不加入这句会报错-->
            <exclusion>
                <groupId>*</groupId>
                <artifactId>*</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

1.2 配置及代码

1.2.1 get方式
public class HBaseService {
    private static final Logger logger = LoggerFactory.getLogger(HBaseService.class);

    /**
     * 配置文件读取的配置信息
     */
    static Configuration configuration = HBaseConfiguration.create();

    /**
     * 链接信息
     */
    private static Connection conn = null;

    static {
        try {
            conn = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 进行数据的查询以及写入到文件中(通过get方式查询获得数据并写入文件)
     * @param rowKey rowKey信息
     * @param tableName 表名
     * @param dirName 文件目录
     * @param fileExist 文件是否存在的标志
     */
    public static void addInfoToFile(String rowKey,String tableName,String dirName,boolean fileExist){
        Table table = null;
        ResultScanner result = null;
        try {
            Connection connection = ConnectionFactory.createConnection(configuration);
            table = connection.getTable(TableName.valueOf(tableName));
            List<Get> gets = new ArrayList<>();
            Get get = new Get(Bytes.toBytes(rowKey));
            gets.add(get);
            // result的集合
            Result[] resultArr = table.get(gets);
            Map<String,Map<String,String>> dataMap = new HashMap<>();
            for (Result r : resultArr) {
                String rowKey1 = Bytes.toString(r.getRow());
                Map<String,String> columnDataMap;
                if (dataMap.containsKey(rowKey1)){
                    columnDataMap = dataMap.get(rowKey1);
                }else {
                    columnDataMap = new HashMap<>();
                }
                for (Cell kv : r.rawCells()) {
                    String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
                    String value = Base64Encoder.encode(CellUtil.cloneValue(kv));
                    columnDataMap.put(qualifire,value);
                    dataMap.put(rowKey1,columnDataMap);
                }
            }

             if (MapUtil.isNotEmpty(dataMap)){
                 for (String r : dataMap.keySet()) {
                     Map<String,String> columnMap = dataMap.get(r);
                     StrBuilder lineStr = new StrBuilder();
                     lineStr.append(r + "||");
                     for (String s : columnMap.keySet()) {
                         lineStr.append(s + ":" + columnMap.get(s) + "\t");
                     }
                     String fileName = dirName + File.separator + "data.txt";
                     File f = new File(fileName);
                     if (!f.exists()){
                         try {
                             f.createNewFile();
                         }catch (IOException e){
                             logger.error("创建文件失败,异常信息:{}",e.getMessage());
                         }
                     }
                     BufferedWriter writer = new BufferedWriter(
                             new FileWriter(fileName,true));

                     writer.write(lineStr.toString()  + "\n");
                     logger.info("写入rowkey:{}的波形数据到:{}",r,fileName);
                     writer.close();
                 }
            }
        }catch (Exception e){
            logger.error("写入rowkey:{}的波形数据到:{}失败,错误的信息:{}",rowKey,dirName,e.getMessage());
        }
    }
}
1.3.1 Scan方式
   /**
     * 通过scan的方式进行数据获取
     * @param rowKey rowkey
     * @param startKey 开始的rowKey
     * @param stopKey 结束的rowKey
     * @param regexStr rowKey的正则匹配表达式
     */
    public static void findRowKey(String rowKey,String startKey,String stopKey,String regexStr){
        Table table = null;
        ResultScanner result = null;
        try {
            TableName[] tbs = conn.getAdmin().listTableNames();
            FilterList filters = new FilterList();
            table = conn.getTable(TableName.valueOf("Vibration_WaveData"));
            Scan scan = new Scan();
            // 通过正则匹配的方式+rowkey进行数据过滤
            RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,regexComparator);
            // 设置start和stop Rowkey 可以提供检索效率
            scan.setStartRow(startKey.getBytes());
            scan.setStopRow(stopKey.getBytes());
            scan.setFilter(rowFilter);
            // 每次从服务器端获取的行数
            scan.setCaching(100000);
            ResultScanner result1 = table.getScanner(scan);
            for (Result r : result1) {
                for (KeyValue kv : r.raw()) {
                    System.out.println(String.format("row:%s,family:%s,qualifier:%s,qualifiervalue:%s,timestamp:%s.",Bytes.toString(kv.getRow()),Bytes.toString(kv.getFamily()),Bytes.toString(kv.getQualifier()),Bytes.toString(kv.getValue()),kv.getTimestamp()));
                }
            }
            result1.close();
            conn.close();
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
    }

2. mapReduce实现

2.1 依赖

 <!--hadoop依赖坐标-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.6</version>
        </dependency>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.6</version>
        </dependency>

2.2 配置文件

hbase-site.xml:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <!-- 指定 hbase 是分布式的 -->
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <!-- 指定 zk 的地址,多个用“,”分割 -->
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.1.100:2181,192.168.1.102:2181</value>
    </property>

    <!-- 开启 uber 模式,默认关闭 -->
    <property>
        <name>mapreduce.job.ubertask.enable</name>
        <value>true</value>
    </property>
    <!-- uber 模式中最大的 mapTask 数量,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxmaps</name>
        <value>9</value>
    </property>
    <!-- uber 模式中最大的 reduce 数量,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxreduces</name>
        <value>1</value>
    </property>
    <!-- uber 模式中最大的输入数据量,默认使用 dfs.blocksize 的值,可向下修改 -->
    <property>
        <name>mapreduce.job.ubertask.maxbytes</name>
        <value></value>
    </property>
</configuration>

2.3 导出的代码

public class ReadHbaseDataByMRToHDFS {
static Configuration configuration = HBaseConfiguration.create();
    /**
     * 进行hbase数据导出的操作
     * @param tableName 表名
     * @param dirName   文件夹名称
     * @param startRow  开始的row key
     * @param stopRow   结束的row key
     * @param regexStr  进行匹配的字符
     */
    public void exportHbaseData(String tableName,String startRow,String stopRow,String regexStr) {
    
        logger.info("开始进行HBase数据导出,tableName:{},dirName:{},startRow:{},stopRow:{},regexStr:{}",tableName,startRow,stopRow,regexStr);
        System.setProperty("HADOOP_USER_NAME","root");
        // 一次rpc请求的超时时间,如果某次RPC请求超过该值,客户端就会主动管理Socket
        configuration.set("hbase.rpc.timeout","600000");
        // ,该参数是表示HBase客户端发起一次scan操作的rpc调用至得到响应之间总的超时时间
        configuration.set("hbase.client.scanner.timeout.period","600000");
        configuration.set("mapreduce.job.ubertask.maxmaps","10");
        configuration.set("mapreduce.job.ubertask.maxreduces","1");
        configuration.set("mapreduce.task.io.sort.mb","1024");
        configuration.set("mapred.map.tasks","10");
        try {
            Job job = Job.getInstance(configuration);
            job.setJarByClass(ReadHbaseDataByMRToHDFS.class);
            //设置reduce个数
            job.setNumReduceTasks(0);
            //设置map
            Scan scan = new Scan();
// 设置start和stop rowkey以及regex提高检索效率
            RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
            scan.setStartRow(startRow.getBytes()).setStopRow(stopRow.getBytes());
            RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,regexComparator);
            scan.setFilter(rowFilter);
            // 每次从服务器端获取的行数
            scan.setCaching(900000);
            //参数false,关于添加依赖jar
            TableMapReduceUtil.initTableMapperJob(tableName,scan,ReadHBaseDataByMRToHDFSMapper.class,Text.class,NullWritable.class,job,false);

            //输出目录
            FileOutputFormat.setOutputPath(job,new Path(dirName));
            //提交
            boolean isDone = job.waitForCompletion(true);
            if (isDone){
                Thread.sleep(3000);
                logger.info("进行HBase数据导出成功,tableName:{},regexStr:{},状态:{}",regexStr,isDone);
            }
          } catch (Exception e) {
            logger.error("进行HBase数据导出时出现异常,tableName:{},异常信息:{}",e.getMessage());
        }
    }

    /**
     * 参数
     * ImmutableBytesWritable
     * Result :HBase中的数据每次取出来是一个Result:就是一个rowkey做一个result
     * <p>
     * keyOut:
     * valueOut:
     */
    static class ReadHBaseDataByMRToHDFSMapper extends TableMapper<Text,NullWritable> {
        Text outKey = new Text();
        @Override
        protected void map(ImmutableBytesWritable key,Result value,Context context) throws IOException,InterruptedException {
            List<Cell> cells = value.listCells();
            Map<String,String>> cellMap = new HashMap<>();
            //一个cell一条数据 包含一个column
            for (Cell cell : cells) {
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                Map<String,String> columnMap = new HashMap<>();
                if (cellMap.containsKey(rowkey)){
                    columnMap = cellMap.get(rowkey);
                }
                // String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String columnValue = Base64Encoder.encode(CellUtil.cloneValue(cell));
                columnMap.put(column,columnValue);
                cellMap.put(rowkey,columnMap);
                // long timeStamp = cell.getTimestamp();
               // outKey.set(rowkey + "\t\t" + column + "\t\t" + columnValue + "\n");
            }
            if (CollUtil.isNotEmpty(cellMap)){
                String lineStr = "";
                for (String s : cellMap.keySet()) {
                    Map<String,String> columnMap = cellMap.get(s);
                    lineStr = s + "||";
                    for (String c : columnMap.keySet()) {
                        lineStr += c + ":" + columnMap.get(c) + "\t";
                    }
                }
                outKey.set(lineStr);
                context.write(outKey,NullWritable.get());
                outKey.clear();
            }
        }
    }
}

原文地址:https://blog.csdn.net/github_38924695/article/details/134003247

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读301次。你可以使用Thrift客户端来测试HBase Thrift服务。例如,在Python中,你可以使用。请确保你的HBase伪集群已正确配置并且Thrift服务已经启动。这将在你的伪集群中启动HBase Thrift服务。库或者直接使用Thrift接口。进入HBase的安装目录,找到。请根据需要进行相应的配置。这将停止Thrift服务。_hbase 单机 thrift 配置
文章浏览阅读565次。hive和hbase数据迁移_hive转hbase
文章浏览阅读707次。基于单机版安装HBase,前置条件为Hadoop安装完成,安装Hadoop可以参考链接,Hadoop单机安装。地址:https://dlcdn.apache.org/hbase/2.4.13/hbase-2.4.13-src.tar.gz2.解压缩文件3.进入到conf目录下4.修改配置文件 hbase-env.sh示例:示例:6.修改配置文件 hbase-site.xml示例:8.访问页面访问你所以在服务器的16010端口,查看页面以上就是单机版安装HBase的内容,后续_hbase 2.4.13下载
文章浏览阅读301次。linux集群搭建-HBase_linux中在/home目录下创建目录hbase
文章浏览阅读933次。中没有库的概念,说一个数据说的是哪一个名称空间下的那一张表下的哪一个行键的哪一个列族下面的哪一个列对应的是这个数据。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。操作Hbase系统DDL,对名称空间等进行操作。_hbase中报错undefined method for main:object
文章浏览阅读1k次,点赞16次,收藏21次。整理和梳理日常hbase的监控核心指标,作为经验沉淀_hbase 对应promethus指标名
文章浏览阅读1.5k次,点赞45次,收藏20次。今天把之前学习Hbase的入门基础知识笔记翻出来了,为了不忘记也是帮助身边的小伙伴,我把他又整理了下放了出来给大家,希望对HBASE一知半解的小伙伴,能够对Hbase有一个清晰的认识,好了废话不多说,进入正题。以上内容就是初的识HBase 入门知识,包含了hbase的由来,特性,物理存储,逻辑存储模型,以及优缺点,应用场景这些内容,相信后面在使用或更深入的研究Hbase打下了良好的基础,后面的更深入的学习内容,看计划安排在后面的文章中进行更新。
文章浏览阅读655次。HDFS,适合运行在通用硬件上的分布式文件系统,是一个高度容错性的系统,适合部署在廉价的机器上。Hbase,是一个分布式的、面向列的开源数据库,适合于非结构化数据存储。MapReduce,一种编程模型,方便编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。Chukwa,是一个开源的用于监控大型分布式系统的数据收集系统。_开源非结构化数据存储
文章浏览阅读1.9k次。mongodb和hbase的区别和应用场景_hbase和mongodb的区别
文章浏览阅读1.2k次。Hbase入门篇01---基本概念和部署教程_hbase教程
文章浏览阅读1.6k次,点赞19次,收藏25次。hbase相关内容
文章浏览阅读942次,点赞16次,收藏20次。在hbase1.x中transition是令广大大数据运维人员头疼的一个话题,因为,region 的状态转移涉及到了三个核心组件,分别为:hbase master,zookeeper和hbase 的regionserver,这三个组件中的某一个region的状态都是一致的情况下,这个region 才算是正常,状态转移过程及其复杂,hbase 集群很容易出现RIT。好消息是,hbase2.x中有个工具HBCK2,这个工具可不是简单的hbase1.x中hbck 的升级,变化有点大,详细变化请参考帮助文档(
文章浏览阅读1k次。在HBase中,Region分裂是一种自动的机制,用于在Region大小达到一定阈值时将其分裂成两个Region,以便更好地管理数据。HBase中的Region大小是可以配置的,通过设置HBase表的最小和最大Region大小来控制。需要注意的是,禁止Region分裂后,当表的大小达到一定阈值时,数据将不再分裂成新的Region,因此需要根据实际需求进行调整。需要注意的是,禁止Region分裂后,当表的大小达到一定阈值时,数据将不再分裂成新的Region,因此需要根据实际需求进行调整。_hbase region大小
文章浏览阅读737次。可以看出,HBase作为数据仓库的一种补充,可以用于存储和管理大量数据,以便快速地分析和查询。是一种基于数据库的形式,用于存储和管理大量数据,以便快速地分析和查询。例如,可以使用HBase存储一些用户行为数据,然后进行分析,以便更好地了解用户行为和需求。其次,需要配置HBase相关的环境变量,例如JAVA_HOME、HBASE_HOME等。HBase可以用于存储结构化和非结构化数据,包括文本、图像、视频等。例如,可以使用HBase存储一些传感器数据,然后进行实时分析和处理。一、HBase集群环境搭建。_用hbase 搭建数仓
文章浏览阅读1.9k次。Data。_springboot整合hbase
文章浏览阅读880次,点赞23次,收藏20次。etc/abrt下的两个文件,分别是:abrt-action-save-package-data.conf 和 abrt.conf,修改内容如下。我们后面排查的时候去查看/run/cloudera-scm-agent/process/2325-hbase-REGIONSERVER下是否有。发现有个hs_err_pid15967.log JVM生成的错误日志,那么把这个日志下载查看,返现日志这么写的。接下来就等下一次hbase的节点挂了之后查看转储文件,转储文件在/var/sqool/abrt下。_regionserver 退出 没有错误日志
文章浏览阅读1.7k次。以下命令都需要在Hbase Shell中运行:Hbase信息status:服务器状态version:版本表操作查看所有表:list表基本信息:describe "表名称"查看表是否存在:exists '表名称'创建表:create '表名称', '列族1', '列族2', '列族3'删除表:首先禁用表:disable '表名称'然后删除表:drop '表名称'修改表:表数据操作查看所有数据:scan "表名称"..._hbase sehll怎么看登录的是哪个hbase
文章浏览阅读885次,点赞18次,收藏21次。在HBase中执行查询操作通常使用HBase Shell或编程语言API(如Java或Python)来执行。使用编程语言API,您可以使用相应的HBase客户端库来执行查询操作。这是一个简单的Java代码示例,演示了如何使用HBase Java API进行单行查询。这些示例仅为基本查询操作,HBase Shell还提供其他高级查询功能,如按时间戳过滤,使用正则表达式进行查询等。请注意,这只是HBase查询的基本示例,您可以根据实际需求和HBase的数据模型进行更复杂的查询操作。
文章浏览阅读7.3k次,点赞7次,收藏28次。找到hbase的bin目录并进入,执行启动hbase hmaster命令。问题原因 hmaster挂了 ,需要重新启动hmaster才行。hbase shell输入命令出现如下问题。_keepererrorcode = nonode for /hbase/master
文章浏览阅读1.3k次。三次信息化浪潮。_大数据应用开发技术笔记