HBase API操作
依赖的jar包
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> </dependencies>View Code
public class TestHbase { //1.构建Configuration, Connection, Admin //Configuration 持有了zk的信息,进而hbase集群的信息可以间接获得 public static Configuration conf; //Connection hbase连接 借助配置信息 获得连接 public static Connection connection; public static Admin admin; static{ //为静态属性初始化,或者说辅助类初始化 conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103"); try { connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printstacktrace(); } //admin try { admin = connection.getAdmin(); } catch (IOException e) { e.printstacktrace(); } }
//1.创建库 public static void createNS(String namespace) throws IOException { //①构建 ns的描述器 声明库名 NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(namespace).build(); //②创建库 try{ admin.createNamespace(namespaceDescriptor); }catch (NamespaceExistException e){ System.out.println("该库已经存在!"); } //③关资源 admin.close(); }
//2.判断表是否存在 public static boolean isExists(String tableName) throws IOException { boolean exists = admin.tableExists(TableName.valueOf(tableName)); System.out.println("exits:" + exists); admin.close(); return exists; }
//3.创建表 public static void createTable(String tableName, String... info) throws IOException { //①HTableDescriptor HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); //②添加columnFamily for (String cf : info) { hTableDescriptor.addFamily(new HColumnDescriptor(cf)); } //③建表 admin.createTable(hTableDescriptor); //④释放资源 admin.close(); }
public static void deleteTable(String tableName) throws IOException { //禁用并删除表 admin.disableTable(TableName.valueOf(tableName)); admin.deleteTable(TableName.valueOf(tableName)); admin.close(); }
//5.插入数据 put 'student','1001','cf1:name','kris' public static void insertData(String tableName, String rowkey, String column, String value) throws IOException { //①获取table Table table = connection.getTable(TableName.valueOf(tableName)); //②获得put Put put = new Put(Bytes.toBytes(rowkey));//把String类型转成bytes类型 put.addColumn(Bytes.toBytes(column.split(":")[0]), Bytes.toBytes(column.split(":")[1]), Bytes.toBytes(value)); table.put(put); //③添加数据 table.close();//④释放资源 }
//6.删除数据 public static void deleteData(String tableName, String... rowkey) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); for (String rk : rowkey) { Delete del = new Delete(Bytes.toBytes(rk));//获得delete对象,其中持有要删除行的rowkey table.delete(del); } table.close(); }
//7.查询 public static void queryAll(String tableName) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Scan scan = new Scan(); ResultScanner results = table.getScanner(scan); for (Result result : results) { //result对应一行数据 Cell[] cells = result.rawCells(); //获取一行的所有cells for (Cell cell : cells) { String rowkey = Bytes.toString(CellUtil.cloneRow(cell));// String family = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("rowkey:" + rowkey + "\t" + family + ":" + column +"\t" + value); } } }
//8.查询单行 public static void getRow(String tableName, String rowkey) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowkey)); get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("name")); //get.addFamily(Bytes.toBytes("cf1")); //如果不追加列族,则查询所有列族 Result result = table.get(get); Cell[] cells = result.rawCells(); for (Cell cell : cells) { System.out.println("查询单行"); String row = Bytes.toString(CellUtil.cloneRow(cell)); String family = Bytes.toString(CellUtil.cloneFamily(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String value = Bytes.toString(CellUtil.cloneValue(cell)); System.out.println("row:" + row +"\t" + family + ":" + column + "\t" + value); } }
MapReduce
用MapReduce将数据从本地文件系统导入到HBase的表中,
比如从HBase中读取一些原始数据后使用MapReduce做数据分析。
结合计算型框架进行计算统计
查看HBase的MapReduce任务的执行,把jar打印出来的就是需要添加到hadoop的CLAsspATH下的jar包
$ bin/hbase mapredcp
环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export HBASE_HOME=/opt/module/hbase $ export HADOOP_HOME=/opt/module/hadoop-2.7.2 $ export HADOOP_CLAsspATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效
在etc/hadoop下hadoop-env.sh中配置:(注意:在for循环之后配),要重启下才生效
export HADOOP_CLAsspATH=$HADOOP_CLAsspATH:/opt/module/hbase/lib/*
运行官方的MapReduce任务 -- 案例一:统计Student表中有多少行数据; 要在hbase中先有student表 $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student -- 案例二:使用MapReduce将hdfs数据导入到HBase 1)在本地创建一个tsv格式的文件:fruit.tsv 1001 Apple Red 1002 Pear Yellow 1003 Pineapple Yellow 2)创建HBase表 hbase(main):001:0> create 'fruit','info' 3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件 [kris@hadoop101 hadoop-2.7.2]$ hdfs dfs -mkdir /hbase/input_fruit [kris@hadoop101 datas]$ hadoop fs -put fruit.tsv /hbase/input_fruit //把表名写死了;不写死可以通过命令行run(String[] args)传参 [kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr.ReadFromTableDriver bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.atguigu.mr2.ReadFromHdfsDriver /fruit.tsv fruit_mr //给它传参数 4)执行MapReduce到HBase的fruit表中 [kris@hadoop101 hadoop-2.7.2]$ bin/yarn jar /opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit /hbase/input_fruit //hdfs://hadoop101:9000/这个可省略 5)使用scan命令查看导入后的结果 hbase(main):001:0> scan ‘fruit’
hadoop jar
yarn jar
8032rm和yarn的通信端口号
与Hive的集成
HBase与Hive的集成在最新的两个版本中无法兼容。所以,我们只能重新编译:hive-hbase-handler-1.2.2.jar!
将所需要的lib的jar包导入进行编译; apache-hive-1.2.1-src\hbase-handler\src\java
操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的jar包(或者使用软连接的形式)。
对hive添加的jar添加到这个参数,需重启;相当于hbase的客户端-hive,hive和hbase集成所依赖的类
export HIVE_AUX_JARS_PATH=/opt/module/hbase-1.3.1/lib/*
同时在hive-site.xml中修改zookeeper的属性,如下:
<property> <name>hive.zookeeper.quorum</name> <value>hadoop101,hadoop102,hadoop103</value> <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property> <property> <name>hive.zookeeper.client.port</name> <value>2181</value> <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description> </property>View Code
建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。
hive表字段映射到hbase中;用stored by指定下格式类型hbase;指定映射关系;serd是序列化和反序列化;列顺序和字段一一对应;:key即rowkey
hive中表删,hbase中就没有了
0: jdbc:hive2://hadoop101:10000> create table hive_hbase_emp_table(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") tblproperties ("hbase.table.name"="hbase_emp_table"); 完成之后,可以分别进入Hive和HBase查看,都生成了对应的表 在Hive中创建临时中间表,用于load文件中的数据; 不能将数据直接load进Hive所关联HBase的那张表中;要经过mapreduce; 向hive_hbase_emp_table表中插入数据: hive> insert into table hive_hbase_emp_table select * from emp; 查看Hive以及关联的HBase表中是否已经成功的同步插入了数据 Hive: hive> select * from hive_hbase_emp_table; HBase: hbase> scan ‘hbase_emp_table’
在HBase中已经存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。
创建外部表: create external table relevance_hbase_emp(empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties ("hbase.columns.mapping"=":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") tblproperties ("hbase.table.name"="hbase_emp_table"); 关联后就可以使用Hive函数进行一些分析操作了 hive (default)> select * from relevance_hbase_emp;
hbase中已有表,去关联它,创建外部表即可;hive中删表,数据在hbase中还是存在的;
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。