微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

HBase API操作

 

HBase API操作

依赖的jar包

<dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.3.1</version>
    </dependency>

</dependencies>
View Code

 

public class TestHbase {
    //1.构建Configuration, Connection, Admin
    //Configuration 持有了zk的信息,进而hbase集群的信息可以间接获得
    public static Configuration conf;
    //Connection  hbase连接  借助配置信息 获得连接
    public static Connection connection;
    public static Admin admin;
    static{  //为静态属性初始化,或者说辅助类初始化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103");
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printstacktrace();
        }
        //admin
        try {
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printstacktrace();
        }

    }

 

//1.创建库
    public static void createNS(String namespace) throws IOException {
        //①构建  ns的描述器  声明库名
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build();
        //②创建库
        try{
            admin.createNamespace(namespaceDescriptor);
        }catch (NamespaceExistException e){
            System.out.println("该库已经存在!");
        }
        //③关资源
        admin.close();
    }

 

    //2.判断表是否存在
    public static boolean isExists(String tableName) throws IOException {
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        System.out.println("exits:" + exists);
        admin.close();
        return exists;
    }

 

    //3.创建表
    public static void createTable(String tableName, String... info) throws IOException {
        //①HTableDescriptor
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        //②添加columnFamily
        for (String cf : info) {
            hTableDescriptor.addFamily(new HColumnDescriptor(cf));
        }
        //③建表
        admin.createTable(hTableDescriptor);
        //④释放资源
        admin.close();
    }

 

    public static void deleteTable(String tableName) throws IOException {
        //禁用并删除表
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
        admin.close();
    }

 

    //5.插入数据  put 'student','1001','cf1:name','kris'
    public static void insertData(String tableName, String rowkey, String column, String value) throws IOException {
        //①获取table
        Table table = connection.getTable(TableName.valueOf(tableName));
        //②获得put
        Put put = new Put(Bytes.toBytes(rowkey));//把String类型转成bytes类型
        put.addColumn(Bytes.toBytes(column.split(":")[0]), Bytes.toBytes(column.split(":")[1]),
                Bytes.toBytes(value));
        table.put(put); //③添加数据
        table.close();//④释放资源
    }

 

//6.删除数据
    public static void deleteData(String tableName, String... rowkey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        for (String rk : rowkey) {
            Delete del = new Delete(Bytes.toBytes(rk));//获得delete对象,其中持有要删除行的rowkey
            table.delete(del);
        }
        table.close();
    }

 

//7.查询
    public static void  queryAll(String tableName) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) { //result对应一行数据
            Cell[] cells = result.rawCells(); //获取一行的所有cells
            for (Cell cell : cells) {
                String rowkey = Bytes.toString(CellUtil.cloneRow(cell));//
                String family = Bytes.toString(CellUtil.cloneFamily(cell));
                String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                String value = Bytes.toString(CellUtil.cloneValue(cell));
                System.out.println("rowkey:" + rowkey + "\t" + family + ":" + column
                +"\t" + value);
            }
        }
    }

 

//8.查询单行
    public static void getRow(String tableName, String rowkey) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowkey));
        get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name"));
        //get.addFamily(Bytes.toBytes("cf1")); //如果不追加列族,则查询所有列族
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("查询单行");
            String row = Bytes.toString(CellUtil.cloneRow(cell));
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String column = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println("row:" + row +"\t" + family + ":" + column + "\t" + value);
        }
    }

 

MapReduce

用MapReduce将数据从本地文件系统导入到HBase的表中,

比如从HBase中读取一些原始数据后使用MapReduce做数据分析。

结合计算型框架进行计算统计
查看HBase的MapReduce任务的执行,把jar打印出来的就是需要添加到hadoop的CLAsspATH下的jar包

$ bin/hbase mapredcp

环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)

$ export HBASE_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLAsspATH=`${HBASE_HOME}/bin/hbase mapredcp`

 

(2)永久生效
在etc/hadoop下hadoop-env.sh中配置:(注意:在for循环之后配),要重启下才生效
export HADOOP_CLAsspATH=$HADOOP_CLAsspATH:/opt/module/hbase/lib/*

运行官方的MapReduce任务
-- 案例一:统计Student表中有多少行数据; 要在hbase中先有student表
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student

-- 案例二:使用MapReduce将hdfs数据导入到HBase
1)在本地创建一个tsv格式的文件:fruit.tsv
1001    Apple    Red
1002    Pear        Yellow
1003    Pineapple    Yellow

2)创建HBase表
hbase(main):001:0> create 'fruit','info'

3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
[kris@hadoop101 hadoop-2.7.2]$ hdfs dfs -mkdir /hbase/input_fruit  
[kris@hadoop101 datas]$ hadoop fs -put fruit.tsv /hbase/input_fruit

//把表名写死了;不写死可以通过命令行run(String[] args)传参
[kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr.ReadFromTableDriver
bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr2.ReadFromHdfsDriver /fruit.tsv fruit_mr //给它传参数

4)执行MapReduce到HBase的fruit表中
[kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar /opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit /hbase/input_fruit //hdfs://hadoop101:9000/这个可省略


5)使用scan命令查看导入后的结果
hbase(main):001:0> scan ‘fruit’

hadoop jar
yarn jar
8032rm和yarn的通信端口号

与Hive的集成

 

HBase与Hive的集成在最新的两个版本中无法兼容。所以,我们只能重新编译:hive-hbase-handler-1.2.2.jar!

将所需要的lib的jar包导入进行编译; apache-hive-1.2.1-src\hbase-handler\src\java

 

操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的jar包(或者使用软连接的形式)。

 对hive添加的jar添加到这个参数,需重启;相当于hbase的客户端-hive,hive和hbase集成所依赖的类

export HIVE_AUX_JARS_PATH=/opt/module/hbase-1.3.1/lib/*

同时在hive-site.xml中修改zookeeper的属性,如下:

<property>
  <name>hive.zookeeper.quorum</name>
  <value>hadoop101,hadoop102,hadoop103</value>
  <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
  <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
View Code

建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。

hive表字段映射到hbase中;用stored by指定下格式类型hbase;指定映射关系;serd是序列化和反序列化;列顺序和字段一一对应;:key即rowkey

hive中表删,hbase中就没有了

0: jdbc:hive2://hadoop101:10000> create table hive_hbase_emp_table(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
tblproperties ("hbase.table.name"="hbase_emp_table");

完成之后,可以分别进入Hive和HBase查看,都生成了对应的表
    在Hive中创建临时中间表,用于load文件中的数据; 不能将数据直接load进Hive所关联HBase的那张表中;要经过mapreduce;
向hive_hbase_emp_table表中插入数据:
hive> insert into table hive_hbase_emp_table select * from emp;
查看Hive以及关联的HBase表中是否已经成功的同步插入了数据
Hive:
hive> select * from hive_hbase_emp_table;
HBase:
hbase> scan ‘hbase_emp_table’

在HBase中已经存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。

创建外部表:
create external table relevance_hbase_emp(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
tblproperties ("hbase.table.name"="hbase_emp_table");
关联后就可以使用Hive函数进行一些分析操作了
hive (default)> select * from relevance_hbase_emp;

 hbase中已有表,去关联它,创建外部表即可;hive中删表,数据在hbase中还是存在的;

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐