Springboot整合HBase

Springboot整合HBase数据库

1、添加依赖
<!-- Spring Boot HBase 依赖 -->
<dependency>
    <groupId>com.spring4all</groupId>
    <artifactId>spring-boot-starter-hbase</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop-hbase</artifactId>
    <version>2.5.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.5.0.RELEASE</version>
</dependency>
2、添加配置
通过Yaml方式配置
spring:
  hbase:
     zookeeper:
      quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org
      property:
         clientPort: 2181
  data:
    hbase:
      quorum: XXX
      rootDir: XXX
      nodeParent: XXX

zookeeper:
  znode:
    parent: /hbase

3、添加配置类
@Configuration
public class HBaseConfig {
    @Bean
    public HBaseService getHbaseService() {
        //设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
        //System.setProperty("hadoop.home.dir","D:\\Program Files\\Hadoop");
        //执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        return new HBaseService(conf);
    }
}
4、工具类的方式实现HBASE操作
@Service
public class HBaseService {

    private Admin admin = null;
    private Connection connection = null;

    public HBaseService(Configuration conf) {
        connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
    }

    //创建表 create <table>,{NAME => <column family>,VERSIONS => <VERSIONS>}
    public boolean creatTable(String tableName, List<String> columnFamily) {
        //列族column family
        List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
        columnFamily.forEach(cf -> {
            cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                Bytes.toBytes(cf)).build());
        });
        //表 table
        TableDescriptor tableDesc = TableDescriptorBuilder
            .newBuilder(TableName.valueOf(tableName))
            .setColumnFamilies(cfDesc).build();
        if (admin.tableExists(TableName.valueOf(tableName))) {
            log.debug("table Exists!");
        } else {
            admin.createTable(tableDesc);
            log.debug("create table Success!");
        }
        close(admin, null, null);
        return true;
    }

    public List<String> getAllTableNames() {
        List<String> result = new ArrayList<>();
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            result.add(tableName.getNameAsString());
        }
        close(admin, null);
        return result;
    }

    public Map<String, Map<String, String>> getResultScanner(String tableName) {
        Scan scan = new Scan();
        return this.queryData(tableName, scan);
    }

    private Map<String, String>> queryData(String tableName, Scan scan) {
        // <rowKey,对应的行数据>
        Map<String, String>> result = new HashMap<>();
        ResultScanner rs = null;
        //获取表
        Table table = null;
        table = getTable(tableName);
        rs = table.getScanner(scan);
        for (Result r : rs) {
            // 每一行数据
            Map<String, String> columnMap = new HashMap<>();
            String rowKey = null;
            // 行键,列族和列限定符一起确定一个单元(Cell)
            for (Cell cell : r.listCells()) {
                if (rowKey == null) {
                    rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                }
                columnMap.put(
                    //列限定符
                    Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                    //列族
                    Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            }
            if (rowKey != null) {
                result.put(rowKey, columnMap);
            }
        }
        close(null, rs, table);

        return result;
    }

    public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {
        Table table = null;
        table = getTable(tableName);
        putData(table, rowKey, tableName, familyName, columns, values);
        close(null, table);

    }

    private void putData(Table table, String tableName, 
                         String familyName, String[] values) {
        //设置rowkey
        Put put = new Put(Bytes.toBytes(rowKey));
        if (columns != null && values != null && columns.length == values.length) {
            for (int i = 0; i < columns.length; i++) {
                if (columns[i] != null && values[i] != null) {
                    put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                } else {
                    throw new NullPointerException(MessageFormat.format(
                        "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                }
            }
        }
        table.put(put);
        log.debug("putData add or update data Success,rowKey:" + rowKey);
        table.close();

    }
    private Table getTable(String tableName) throws IOException {
        return connection.getTable(TableName.valueOf(tableName));
    }

    private void close(Admin admin, ResultScanner rs, Table table) {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                log.error("关闭Admin失败", e);
            }

            if (rs != null) {
                rs.close();
            }

            if (table != null) {
                rs.close();
            }

            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    log.error("关闭Table失败", e);
                }
            }
        }
    }
}

测试类
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
class HBaseApplicationTests {
    @Resource
    private HBaseService hbaseService;
    //测试创建表
    @Test
    public void testCreateTable() {
        hbaseService.creatTable("test_base", Arrays.asList("a", "back"));
    }
    //测试加入数据
    @Test
    public void testPutData() {
        hbaseService.putData("test_base", "000001", "a", new String[]{
                "project_id", "varName", "coefs", "pvalues", "tvalues",
                "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                "0.0000", "12.2293", "null"});
        hbaseService.putData("test_base", "000002", "idno_prov", "0.9317", "9.8679", "000003", "education", "0.8984", "25.5649", "null"});
    }
    //测试遍历全表
    @Test
    public void testGetResultScanner() {
        Map<String, String>> result2 = hbaseService.getResultScanner("test_base");
        System.out.println("-----遍历查询全表内容-----");
        result2.forEach((k, value) -> {
            System.out.println(k + "--->" + value);
        });
    }
}

三、使用spring-data-hadoop-hbase

3、配置类
@Configuration
public class HBaseConfiguration {
 
    @Value("${hbase.zookeeper.quorum}")
    private String zookeeperQuorum;
 
    @Value("${hbase.zookeeper.property.clientPort}")
    private String clientPort;
 
    @Value("${zookeeper.znode.parent}")
    private String znodeParent;
 
    @Bean
    public HbaseTemplate hbaseTemplate() {
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        conf.set("hbase.zookeeper.property.clientPort", clientPort);
        conf.set("zookeeper.znode.parent", znodeParent);
        return new HbaseTemplate(conf);
    }
}
4、业务类中使用HbaseTemplate

这个是作为工具类

@Service
@Slf4j
public class HBaseService {
 
 
    @Autowired
    private HbaseTemplate hbaseTemplate;
 	
 	//查询列簇
    public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, 
                                           String stopRowkey, String column, String qualifier) {
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        if (StringUtils.isNotBlank(column)) {
            log.debug("{}", column);
            filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL,
                       new BinaryComparator(Bytes.toBytes(column))));
        }
        if (StringUtils.isNotBlank(qualifier)) {
            log.debug("{}", qualifier);
            filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, 
                       new BinaryComparator(Bytes.toBytes(qualifier))));
        }
        Scan scan = new Scan();
        if (filterList.getFilters().size() > 0) {
            scan.setFilter(filterList);
        }
        scan.setStartRow(Bytes.toBytes(startRowkey));
        scan.setStopRow(Bytes.toBytes(stopRowkey));
 
        return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
    }
 
    public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, 
                                          String familyColumn, String column) {
        return rowKeys.stream().map(rk -> {
            if (StringUtils.isNotBlank(familyColumn)) {
                if (StringUtils.isNotBlank(column)) {
                    return hbaseTemplate.get(tableName, rk, familyColumn, 
                                column, rowNum) -> rowMapper);
                } else {
                    return hbaseTemplate.get(tableName,
                                (rowMapper, rowNum) -> rowMapper);
                }
            }
            return hbaseTemplate.get(tableName, rowNum) -> rowMapper);
        }).collect(Collectors.toList());
    }
}

四、使用spring-boot-starter-data-hbase

参考:https://blog.csdn.net/cpongo1/article/details/89550486

## 下载spring-boot-starter-hbase代码
git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
## 安装
cd spring-boot-starter-hbase
mvn clean install
2、添加配置项
  • spring.data.hbase.quorum 指定 HBase 的 zk 地址
  • spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存储的路径
  • spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
3、定义好DTO
@Data
public class City {
    private Long id;
    private Integer age;
    private String cityName;  
}
4、创建对应rowMapper
public class CityRowMapper implements RowMapper<City> {
 
    private static byte[] COLUMN_FAMILY = "f".getBytes();
    private static byte[] NAME = "name".getBytes();
    private static byte[] AGE = "age".getBytes();
 
    @Override
    public City mapRow(Result result, int rowNum) throws Exception {
        String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME));
        int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE));
 
        City dto = new City();
        dto.setCityName(name);
        dto.setAge(age);
        return dto;
    }
}
5、操作实现增改查
  • HbaseTemplate.find 返回 HBase 映射的 City 列表
  • HbaseTemplate.get 返回 row 对应的 City 信息
  • HbaseTemplate.saveOrUpdates 保存或者更新
    如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能
@Service
public class CityServiceImpl implements CityService {
 
    @Autowired private HbaseTemplate hbaseTemplate;
 	//查询
    public List<City> query(String startRow, String stopRow) {
        Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
        scan.setCaching(5000);
        List<City> dtos = this.hbaseTemplate.find("people_table", new CityRowMapper());
        return dtos;
    }
 	//查询
    public City query(String row) {
        City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper());
        return dto;
    }
 	//新增或者更新
    public void saveOrUpdate() {
        List<Mutation> saveOrUpdates = new ArrayList<Mutation>();
        Put            put           = new Put(Bytes.toBytes("135xxxxxx"));
        put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test"));
        saveOrUpdates.add(put);
        this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates);
    }
}

Springboot整合Influxdb

中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html

注意,项目建立在spring-boot-web基础上

1、添加依赖
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.15</version>
</dependency>
2、添加配置
spring:
  influx:
    database: my_sensor1
    password: admin
    url: http://127.0.0.1:6086
    user: admin
3、编写配置类
@Configuration
public class InfluxdbConfig {
        
    @Value("${spring.influx.url}")
    private String influxDBUrl; 

    @Value("${spring.influx.user}")
    private String userName;    

    @Value("${spring.influx.password}")
    private String password;    

    @Value("${spring.influx.database}")
    private String database;    

    @Bean("influxDB")
    public InfluxDB influxdb(){     
        InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
        try {
            
            /** 
             * 异步插入:
             * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒    
             * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒   
             * 满足任何一个条件就会发送一次写的请求。
             */
            influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);
            
        } catch (Exception e) { 
            e.printStackTrace();
        } finally { 
            //设置默认策略
            influxDB.setRetentionPolicy("sensor_retention");    
        }
        //设置日志输出级别
        influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);  
        return influxDB;
    }
}
4、InfluxDB原生API实现
@SpringBootTest(classes = {MainApplication.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class InfluxdbDBTest {

    @Autowired
    private InfluxDB influxDB;
    
    //measurement
    private final String measurement = "sensor";
    
    @Value("${spring.influx.database}")
    private String database;
    
    /**
     * 批量插入第一种方式
     */
    @Test
    public void insert(){
        List<String> lines = new ArrayList<String>();       
        Point point = null;     
        for(int i=0;i<50;i++){          
            point = Point.measurement(measurement)
            .tag("deviceId", "sensor" + i)
            .addField("temp", 3)
            .addField("voltage", 145+i)
            .addField("A1", "4i")
            .addField("A2", "4i").build();
            lines.add(point.lineProtocol());
        }
        //写入
        influxDB.write(lines);
    }
    
    /**
     * 批量插入第二种方式
     */
    @Test
    public void batchInsert(){
        BatchPoints batchPoints = BatchPoints
                .database(database)
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
      //遍历sqlserver获取数据
      for(int i=0;i<50;i++){
        //创建单条数据对象——表名
        Point point = Point.measurement(measurement)
          //tag属性——只能存储String类型
                .tag("deviceId", "sensor" + i)
                .addField("temp", 3)
                .addField("voltage", 145+i)
                .addField("A1", "4i")
                .addField("A2", "4i").build();
        //将单条数据存储到集合中
        batchPoints.point(point);
      }
      //批量插入
      influxDB.write(batchPoints); 
    }
    
    /**
     * 获取数据
     */
    @Test
    public void datas(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,因此可以设置分页查询条件
        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
        
        String queryCondition = "";  //查询条件暂且为空
        // 此处查询所有内容,如果
        String queryCmd = "SELECT * FROM "
            // 查询指定设备下的日志信息
            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
            // + 策略name + "." + measurement
            + measurement
            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        
        QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
        System.out.println("query result => "+queryResult);
    }
}
5、采用封装工具类
1、创建实体类
@Data
@Measurement(name = "sensor")
public class Sensor {

    @Column(name="deviceId",tag=true)
    private String deviceId;
    
    @Column(name="temp")
    private float temp;
    
    @Column(name="voltage")
    private float voltage;
    
    @Column(name="A1")
    private float A1;
    
    @Column(name="A2")
    private float A2;
    
    @Column(name="time")
    private String time;    
    
}
2、创建工具类
@Component
public class InfluxdbUtils {

    @Autowired
    private InfluxDB influxDB;
    
    @Value("${spring.influx.database}")
    private String database;    
    
    /**
     * 新增单条记录,利用java的反射机制进行新增操作
     */
    @SneakyThrows
    public void insertOne(Object obj){
        //获取度量
        Class<?> clasz = obj.getClass();
        Measurement measurement = clasz.getAnnotation(Measurement.class);
        //构建
        Point.Builder builder = Point.measurement(measurement.name());
        // 获取对象属性
        Field[] fieldArray = clasz.getDeclaredFields();
        Column column = null;
        for(Field field : fieldArray){
                column = field.getAnnotation(Column.class);
                //设置属性可操作
                field.setAccessible(true); 
                if(column.tag()){
                    //tag属性只能存储String类型
                    builder.tag(column.name(), field.get(obj).toString());
                }else{
                    //设置field
                    if(field.get(obj) != null){
                        builder.addField(column.name(), field.get(obj).toString());
                    }
                }
        }
        influxDB.write(builder.build());
    }
    
    /**
     * 批量新增,方法一
     */
    @SneakyThrows
    public void insertBatchByRecords(List<?> records){
        List<String> lines = new ArrayList<String>();   
        records.forEach(record->{
            Class<?> clasz = record.getClass();
            //获取度量
            Measurement measurement = clasz.getAnnotation(Measurement.class);
            //构建
            Point.Builder builder = Point.measurement(measurement.name());
            Field[] fieldArray = clasz.getDeclaredFields();
            Column column = null;
            for(Field field : fieldArray){
                    column = field.getAnnotation(Column.class);
                    //设置属性可操作
                    field.setAccessible(true); 
                    if(column.tag()){
                        //tag属性只能存储String类型
                        builder.tag(column.name(), field.get(record).toString());
                    }else{
                        //设置field
                        if(field.get(record) != null){
                            builder.addField(column.name(), field.get(record).toString());
                        }
                    }
            }
            lines.add(builder.build().lineProtocol());
        });
        influxDB.write(lines);
    }
    
    /**
     * 批量新增,方法二
     */
    @SneakyThrows
    public void insertBatchByPoints(List<?> records){
        BatchPoints batchPoints = BatchPoints.database(database)
                .consistency(InfluxDB.ConsistencyLevel.ALL)
                .build();
        records.forEach(record->{
            Class<?> clasz = record.getClass();
            //获取度量
            Measurement measurement = clasz.getAnnotation(Measurement.class);
            //构建
            Point.Builder builder = Point.measurement(measurement.name());
            Field[] fieldArray = clasz.getDeclaredFields();
            Column column = null;
            for(Field field : fieldArray){
                    column = field.getAnnotation(Column.class);
                    //设置属性可操作
                    field.setAccessible(true); 
                    if(column.tag()){
                        //tag属性只能存储String类型
                        builder.tag(column.name(), field.get(record).toString());
                        }
                    }
            }
            batchPoints.point(builder.build());
        });
        influxDB.write(batchPoints);
    }
    
    /**
     * 查询,返回Map集合
     * @param query 完整的查询语句
     */
    public List<Object> fetchRecords(String query){
        List<Object> results = new ArrayList<Object>();
        QueryResult queryResult = influxDB.query(new Query(query, database));
        queryResult.getResults().forEach(result->{
            result.getSeries().forEach(serial->{
                List<String> columns = serial.getColumns();
                int fieldSize = columns.size();
                serial.getValues().forEach(value->{     
                    Map<String,Object> obj = new HashMap<String,Object>();
                    for(int i=0;i<fieldSize;i++){   
                        obj.put(columns.get(i), value.get(i));
                    }
                    results.add(obj);
                });
            });
        });
        return results;
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     */
    public List<Object> fetchRecords(String fieldKeys, String measurement){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     */
    public List<Object> fetchRecords(String fieldKeys, String measurement, String order){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);       
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回map集合
     * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
     * @param measurement 度量,不可为空;
     */
    public List<Object> fetchRecords(String fieldKeys, String order, String limit){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        query.append(limit);
        return this.fetchRecords(query.toString());
    }
    
    /**
     * 查询,返回对象的list集合
     */
    @SneakyThrows
    public <T> List<T> fetchResults(String query, Class<?> clasz){
        List results = new ArrayList<>();
        QueryResult queryResult = influxDB.query(new Query(query, database));
        queryResult.getResults().forEach(result->{
            result.getSeries().forEach(serial->{
                List<String> columns = serial.getColumns();
                int fieldSize = columns.size();     
                serial.getValues().forEach(value->{ 
                    Object obj = null;
                        obj = clasz.newInstance();
                        for(int i=0;i<fieldSize;i++){   
                            String fieldName = columns.get(i);
                            Field field = clasz.getDeclaredField(fieldName);
                            field.setAccessible(true);
                            Class<?> type = field.getType();
                            if(type == float.class){
                                field.set(obj, Float.valueOf(value.get(i).toString()));
                            }else{
                                field.set(obj, value.get(i));
                            }                           
                        }
                    results.add(obj);
                });
            });
        });
        return results;
    }
    
    /**
     * 查询,返回对象的list集合
     */
    public <T> List<T> fetchResults(String fieldKeys, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
        return this.fetchResults(query.toString(), clasz);
    }
    
    /**
     * 查询,返回对象的list集合
     */
    public <T> List<T> fetchResults(String fieldKeys, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        return this.fetchResults(query.toString(), String limit, Class<?> clasz){
        StringBuilder query = new StringBuilder();
        query.append("select ").append(fieldKeys).append(" from ").append(measurement);
        query.append(" order by ").append(order);
        query.append(limit);        
        return this.fetchResults(query.toString(), clasz);
    }
}
3、使用工具类的测试代码
@SpringBootTest(classes = {MainApplication.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class InfluxdbUtilTest {

    @Autowired
    private InfluxdbUtils influxdbUtils;
    
    /**
     * 插入单条记录
     */
    @Test
    public void insert(){
      Sensor sensor = new Sensor();
      sensor.setA1(10);
      sensor.setA2(10);
      sensor.setDeviceId("0002");
      sensor.setTemp(10L);
      sensor.setTime("2021-01-19");
      sensor.setVoltage(10);
      influxdbUtils.insertOne(sensor);
    }
    
    /**
     * 批量插入第一种方式
     */
    @GetMapping("/index22")
    public void batchInsert(){  
        List<Sensor> sensorList = new ArrayList<Sensor>();
        for(int i=0; i<50; i++){
            Sensor sensor = new Sensor();
            sensor.setA1(2);
            sensor.setA2(12);
            sensor.setTemp(9);
            sensor.setVoltage(12);
            sensor.setDeviceId("sensor4545-"+i);
            sensorList.add(sensor);
        }
        influxdbUtils.insertBatchByRecords(sensorList);
    }
    
    /**
     * 批量插入第二种方式
     */
    @GetMapping("/index23")
    public void batchInsert1(){ 
        List<Sensor> sensorList = new ArrayList<Sensor>();
        Sensor sensor = null;
        for(int i=0; i<50; i++){
            sensor = new Sensor();
            sensor.setA1(2);
            sensor.setA2(12);
            sensor.setTemp(9);
            sensor.setVoltage(12);
            sensor.setDeviceId("sensor4545-"+i);
            sensorList.add(sensor);
        }
        influxdbUtils.insertBatchByPoints(sensorList);
    }
        
    /**
     * 查询数据
     */
    @GetMapping("/datas2")
    public void datas(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,如果
        String queryCmd = "SELECT * FROM sensor"
            // 查询指定设备下的日志信息
            // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
            // + 策略name + "." + measurement
            // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        
        List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);
        System.out.println("query result => {}"+sensorList );
    }
    
    /**
     * 获取数据
     */
    @GetMapping("/datas21")
    public void datas1(@RequestParam Integer page){
        int pageSize = 10;
        // InfluxDB支持分页查询,选择field数值会严重拖慢查询速度)
            + queryCondition
            // 查询结果需要按照时间排序
            + " ORDER BY time DESC"
            // 添加分页查询条件
            + pageQuery;
        List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);
        //List<Sensor> sensorList = influxdbUtils.fetchResults("*","sensor",Sensor.class);
        sensorList.forEach(sensor->{
            System.out.println("query result => {}"+sensorList );
        });     
    }
}
6、采用封装数据模型的方式
1、在Influxdb库中创建存储策略
CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
2、创建数据模型
@Data
@Measurement(name = "m_order_payment",
		database = "db_order", 
		retentionPolicy = "rp_order_payment")
public class OrderPayment implements Serializable  {

    // 统计批次
    @Column(name = "batch_id", tag = true)
    private String batchId;

    // 哪个BU
    @Column(name = "bu_id", tag = true)
    private String buId;

    // BU 名称
    @Column(name = "bu_name")
    private String buName;

    // 总数
    @Column(name = "total_count", tag = true)
    private String totalCount;

    // 支付量
    @Column(name = "pay_count", tag = true)
    private String payCount;

    // 金额
    @Column(name = "total_money", tag = true)
    private String totalMoney;
}
3、创建Mapper
public class InfluxMapper extends InfluxDBMapper {

    public InfluxMapper(InfluxDB influxDB) {
        super(influxDB);
    }
}
4、配置Mapper
@Log4j2
@Configuration
public class InfluxAutoConfiguration {

    @Bean
    public InfluxMapper influxMapper(InfluxDB influxDB) {
        InfluxMapper influxMapper = new InfluxMapper(influxDB);
        return influxMapper;
    }
}
5、测试CRUD
@SpringBootTest(classes = {MainApplication.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class InfluxdbMapperTest {


    @Autowired
    private InfluxMapper influxMapper;


    @Test
    public void save(OrderPayment product) {
        influxMapper.save(product);
    }
    @Test
    public void queryAll() {
        List<OrderPayment> products = influxMapper.query(OrderPayment.class);
        System.out.println(products);
    }

    @Test
    public void queryByBu(String bu) {
        String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu);
        Query query = new Query(sql, "db_order");
        List<OrderPayment> products = influxMapper.query(query, OrderPayment.class);
        System.out.println(products);
    }
}

参考:https://blog.csdn.net/cpongo1/article/details/89550486

https://github.com/SpringForAll/spring-boot-starter-hbase

https://github.com/JeffLi1993/springboot-learning-example

原文地址:https://blog.csdn.net/QingChunBuSanChang/article/details/132596960

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


文章浏览阅读301次。你可以使用Thrift客户端来测试HBase Thrift服务。例如,在Python中,你可以使用。请确保你的HBase伪集群已正确配置并且Thrift服务已经启动。这将在你的伪集群中启动HBase Thrift服务。库或者直接使用Thrift接口。进入HBase的安装目录,找到。请根据需要进行相应的配置。这将停止Thrift服务。_hbase 单机 thrift 配置
文章浏览阅读565次。hive和hbase数据迁移_hive转hbase
文章浏览阅读707次。基于单机版安装HBase,前置条件为Hadoop安装完成,安装Hadoop可以参考链接,Hadoop单机安装。地址:https://dlcdn.apache.org/hbase/2.4.13/hbase-2.4.13-src.tar.gz2.解压缩文件3.进入到conf目录下4.修改配置文件 hbase-env.sh示例:示例:6.修改配置文件 hbase-site.xml示例:8.访问页面访问你所以在服务器的16010端口,查看页面以上就是单机版安装HBase的内容,后续_hbase 2.4.13下载
文章浏览阅读301次。linux集群搭建-HBase_linux中在/home目录下创建目录hbase
文章浏览阅读933次。中没有库的概念,说一个数据说的是哪一个名称空间下的那一张表下的哪一个行键的哪一个列族下面的哪一个列对应的是这个数据。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。注意:put数据需要指定往哪个命名空间的哪个表的哪个rowKey的哪个列族的哪个列中put数据,put的值是什么。操作Hbase系统DDL,对名称空间等进行操作。_hbase中报错undefined method for main:object
文章浏览阅读1k次,点赞16次,收藏21次。整理和梳理日常hbase的监控核心指标,作为经验沉淀_hbase 对应promethus指标名
文章浏览阅读1.5k次,点赞45次,收藏20次。今天把之前学习Hbase的入门基础知识笔记翻出来了,为了不忘记也是帮助身边的小伙伴,我把他又整理了下放了出来给大家,希望对HBASE一知半解的小伙伴,能够对Hbase有一个清晰的认识,好了废话不多说,进入正题。以上内容就是初的识HBase 入门知识,包含了hbase的由来,特性,物理存储,逻辑存储模型,以及优缺点,应用场景这些内容,相信后面在使用或更深入的研究Hbase打下了良好的基础,后面的更深入的学习内容,看计划安排在后面的文章中进行更新。
文章浏览阅读655次。HDFS,适合运行在通用硬件上的分布式文件系统,是一个高度容错性的系统,适合部署在廉价的机器上。Hbase,是一个分布式的、面向列的开源数据库,适合于非结构化数据存储。MapReduce,一种编程模型,方便编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。Chukwa,是一个开源的用于监控大型分布式系统的数据收集系统。_开源非结构化数据存储
文章浏览阅读1.9k次。mongodb和hbase的区别和应用场景_hbase和mongodb的区别
文章浏览阅读1.2k次。Hbase入门篇01---基本概念和部署教程_hbase教程
文章浏览阅读1.6k次,点赞19次,收藏25次。hbase相关内容
文章浏览阅读942次,点赞16次,收藏20次。在hbase1.x中transition是令广大大数据运维人员头疼的一个话题,因为,region 的状态转移涉及到了三个核心组件,分别为:hbase master,zookeeper和hbase 的regionserver,这三个组件中的某一个region的状态都是一致的情况下,这个region 才算是正常,状态转移过程及其复杂,hbase 集群很容易出现RIT。好消息是,hbase2.x中有个工具HBCK2,这个工具可不是简单的hbase1.x中hbck 的升级,变化有点大,详细变化请参考帮助文档(
文章浏览阅读1k次。在HBase中,Region分裂是一种自动的机制,用于在Region大小达到一定阈值时将其分裂成两个Region,以便更好地管理数据。HBase中的Region大小是可以配置的,通过设置HBase表的最小和最大Region大小来控制。需要注意的是,禁止Region分裂后,当表的大小达到一定阈值时,数据将不再分裂成新的Region,因此需要根据实际需求进行调整。需要注意的是,禁止Region分裂后,当表的大小达到一定阈值时,数据将不再分裂成新的Region,因此需要根据实际需求进行调整。_hbase region大小
文章浏览阅读737次。可以看出,HBase作为数据仓库的一种补充,可以用于存储和管理大量数据,以便快速地分析和查询。是一种基于数据库的形式,用于存储和管理大量数据,以便快速地分析和查询。例如,可以使用HBase存储一些用户行为数据,然后进行分析,以便更好地了解用户行为和需求。其次,需要配置HBase相关的环境变量,例如JAVA_HOME、HBASE_HOME等。HBase可以用于存储结构化和非结构化数据,包括文本、图像、视频等。例如,可以使用HBase存储一些传感器数据,然后进行实时分析和处理。一、HBase集群环境搭建。_用hbase 搭建数仓
文章浏览阅读1.9k次。Data。_springboot整合hbase
文章浏览阅读880次,点赞23次,收藏20次。etc/abrt下的两个文件,分别是:abrt-action-save-package-data.conf 和 abrt.conf,修改内容如下。我们后面排查的时候去查看/run/cloudera-scm-agent/process/2325-hbase-REGIONSERVER下是否有。发现有个hs_err_pid15967.log JVM生成的错误日志,那么把这个日志下载查看,返现日志这么写的。接下来就等下一次hbase的节点挂了之后查看转储文件,转储文件在/var/sqool/abrt下。_regionserver 退出 没有错误日志
文章浏览阅读1.7k次。以下命令都需要在Hbase Shell中运行:Hbase信息status:服务器状态version:版本表操作查看所有表:list表基本信息:describe "表名称"查看表是否存在:exists '表名称'创建表:create '表名称', '列族1', '列族2', '列族3'删除表:首先禁用表:disable '表名称'然后删除表:drop '表名称'修改表:表数据操作查看所有数据:scan "表名称"..._hbase sehll怎么看登录的是哪个hbase
文章浏览阅读885次,点赞18次,收藏21次。在HBase中执行查询操作通常使用HBase Shell或编程语言API(如Java或Python)来执行。使用编程语言API,您可以使用相应的HBase客户端库来执行查询操作。这是一个简单的Java代码示例,演示了如何使用HBase Java API进行单行查询。这些示例仅为基本查询操作,HBase Shell还提供其他高级查询功能,如按时间戳过滤,使用正则表达式进行查询等。请注意,这只是HBase查询的基本示例,您可以根据实际需求和HBase的数据模型进行更复杂的查询操作。
文章浏览阅读7.3k次,点赞7次,收藏28次。找到hbase的bin目录并进入,执行启动hbase hmaster命令。问题原因 hmaster挂了 ,需要重新启动hmaster才行。hbase shell输入命令出现如下问题。_keepererrorcode = nonode for /hbase/master
文章浏览阅读1.3k次。三次信息化浪潮。_大数据应用开发技术笔记