Springboot2(38)集成hbase

2021年12月24日 433点热度 0条评论

Springboot2(38)集成hbase

源码地址

springboot2教程系列

HBase建模、使用以及优化

HBase系统架构

添加依赖


		
			org.apache.hbase
			hbase-client
			1.3.0
			
				
					org.slf4j
					slf4j-log4j12
				
				
					javax.servlet
					servlet-api
				
				
					com.google.guava
					guava
				
				
					org.elasticsearch
					elasticsearch
				
			
		

		
			org.springframework.data
			spring-data-hadoop
			${spring-data-hadoop.version}
			
				
					javax.servlet
					*
				
			
		

添加配置application.yml

hbase.zookeeper.quorum: 10.10.1.141
hbase.zookeeper.port: 2181
hbase.zookeeper.znode: /hbase
hbase.client.keyvalue.maxsize: 1572864000

配置类

@Configuration
@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseConfig {


    @Bean("conf")
    public org.apache.hadoop.conf.Configuration configuration(
        @Value("${hbase.zookeeper.quorum}") String quorum,                                               @Value("${hbase.zookeeper.port}") String port) {
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", quorum);
        conf.set("hbase.zookeeper.port", port);
        return conf;
    }

    @Bean
    public HbaseTemplate hbaseTemplate(org.apache.hadoop.conf.Configuration conf) {
        HbaseTemplate hbaseTemplate = new HbaseTemplate();
        hbaseTemplate.setConfiguration(conf);
        hbaseTemplate.setAutoFlush(true);
        return hbaseTemplate;
    }

    @Bean("hBaseAdmin")
    public Admin createHbaseAdmin(org.apache.hadoop.conf.Configuration conf) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        return admin;
    }


}

工具类

package cn.myframe.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;

@Component
@Slf4j
public class HbaseUtil {

    @Autowired
    private HbaseTemplate hbaseTemplate;

    @Autowired
    private Admin hBaseAdmin;

    /**
     * 创建表
     * @param tableName
     * @param familys
     */
    public  void createTable(String tableName,String[] familys){
        try{
            TableName table = TableName.valueOf(tableName);
            if (hBaseAdmin.tableExists(table)) {
                log.info("table[{}] already exists!",tableName);
            } else {
                HTableDescriptor tableDesc = new HTableDescriptor(table);
                for (int i = 0; i  familys.length; i++) {
                    tableDesc.addFamily(new HColumnDescriptor(familys[i]));
                }
                hBaseAdmin.createTable(tableDesc);
            }
        }catch (Exception e){
        }
    }


    /**
     *
     * @param query
     * @return
     */
    public Object execute(HQuery query) {
        if(StringUtils.isBlank(String.valueOf(query.getRow())) || query.getColumns().isEmpty()){
            return null;
        }
        return hbaseTemplate.execute(query.getTable(), new TableCallbackObject>() {
            @SuppressWarnings("deprecation")
            @Override
            public Object doInTable(HTableInterface table) throws Throwable {
                try {
                    byte[] rowkey = String.valueOf(query.getRow()).getBytes();
                    Put put = new Put(rowkey);
                    for(HBaseColumn col:query.getColumns()){
                        put.addColumn(Bytes.toBytes(col.getFamily()), Bytes.toBytes(col.getQualifier()),
                                Bytes.toBytes(col.getValue()));
                    }
                    table.put(put);
                } catch (Exception e) {
                    log.warn("==> hbase get object fail> "+query.getRow());
                }
                return null;
            }
        });
    }

    /**
     * 批量插入
     * @param query
     * @return
     */
    public Object bufferInsert(HQuery query){
        return hbaseTemplate.execute(query.getTable(), new TableCallbackObject>() {
            @SuppressWarnings("deprecation")
            @Override
            public Object doInTable(HTableInterface table) throws Throwable {
                table.setAutoFlush(false);
                //设置数据缓存区域
                table.setWriteBufferSize(64*1024*1024);
                try {
                    for(HBaseColumn col:query.getColumns()){
                        byte[] rowkey = String.valueOf(col.getRowKey()).getBytes();
                        Put put = new Put(rowkey);
                        put.addColumn(Bytes.toBytes(col.getFamily()), Bytes.toBytes(col.getQualifier()),
                                Bytes.toBytes(col.getValue()));
                        table.put(put);
                    }
                    //刷新缓存区
                    table.flushCommits();
                    //关闭表连接
                    table.close();
                    //Thread.sleep(20000);
                } catch (Exception e) {
                    log.warn("==> hbase get object fail> "+query.getRow());
                }
                return null;
            }
        });
    }


    /**
     * 通过表名和key获取一行数据
     * @param query
     * @param c
     * @param 
     * @return
     */
    public T> T get(HQuery query, ClassT> c) {
        if(StringUtils.isBlank(query.getTable()) || StringUtils.isBlank(String.valueOf(query.getRow()))){
            return null;
        }

        return hbaseTemplate.get(query.getTable(), String.valueOf(query.getRow()), new RowMapperT>() {
            public T mapRow(Result result, int rowNum) throws Exception {
                ListCell> ceList = result.listCells();
                T item=c.newInstance();
                JSONObject obj = new JSONObject();
                if (ceList != null && ceList.size() > 0) {
                    for (Cell cell : ceList) {
                        obj.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
                                cell.getQualifierLength()),
                                Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                    }
                }else{
                    return null;
                }
                System.out.println(obj.toJSONString());
                item=JSON.parseObject(obj.toJSONString(), c);
                return item;
            }
        });
    }

    /**
     * 通过表名,开始行键和结束行键获取数据
     *
     * @return
     */
    public T> ListT> find(HQuery query,ClassT> c) {
        //如果未设置scan,设置scan
       /* if (query.getScan() == null) {

            //起止搜索
            if(StringUtils.isNotBlank(query.getStartRow()) && StringUtils.isNotBlank(query.getStopRow())){
                query.setSearchLimit(query.getStartRow(), query.getStopRow());
            }

            //主要配合pageFilter,指定起始点
            if(StringUtils.isNotBlank(query.getStartRow())){
                query.setScanStartRow(query.getStartRow());
            }

            //列匹配搜索
            if(StringUtils.isNotBlank(query.getFamily()) &&StringUtils.isNotBlank(query.getQualifier())
                    &&StringUtils.isNotBlank(query.getQualifierValue())){
                query.setSearchEqualFilter(query.getFamily(),query.getQualifier(),query.getQualifierValue());
            }

            //分页搜索
            if(query.getPageFilter()!=null){
                query.setFilters(query.getPageFilter());
            }

            if(query.getScan()==null){
                query.setScan(new Scan());
            }
        }*/

        //设置缓存
        query.getScan().setCacheBlocks(false);
        query.getScan().setCaching(2000);

        return hbaseTemplate.find(query.getTable(), query.getScan(), new RowMapperT>() {
            @Override
            public T mapRow(Result result, int rowNum) throws Exception {

                ListCell> ceList = result.listCells();
                JSONObject obj = new JSONObject();
                T item =c.newInstance();
                if (ceList != null && ceList.size() > 0) {
                    for (Cell cell : ceList) {
                        String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
                                cell.getValueLength());

                        String quali = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
                                cell.getQualifierLength());
                        if(value.startsWith("[")){
                            obj.put(quali, JSONArray.parseArray(value));
                        }else{
                            obj.put(quali, value);
                        }
                    }
                }
                item =JSON.parseObject(obj.toJSONString(), c);
                return item;
            }

        });
    }

    public void delete(HQuery query){
        hbaseTemplate.delete(query.getTable(), String.valueOf(query.getRow()), query.getFamily());
    }

    /**
     * 删除表
     * @param table
     */
    public void deleteAll(String table){
        TableName tableName = TableName.valueOf(table);
        try {
            //在删除一个表之前,需要首先确保它是 disabled
            hBaseAdmin.disableTable(tableName);
            hBaseAdmin.deleteTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

service类

public class BusReceiverServiceImp implements BusReceiverService {

    @Autowired
    private HbaseUtil hbaseUtil;

    /**
     * 创建表
     */
   @PostConstruct
    public  void createTable(){
       String[] familys = {"base","extends"};
       TableName tableName = TableName.valueOf("nias:bus_receiver");
       hbaseUtil.createTable("nias:bus_receiver",familys);
    }

    @Override
    public void save(BusReceiverEntity busReceiverEntity) {
        HQuery hQuery = new HQuery("nias:bus_receiver",String.valueOf(busReceiverEntity.getId()));
        hQuery.getColumns().add(new HBaseColumn("base","name",busReceiverEntity.getName(),busReceiverEntity.getId()));
        hQuery.getColumns().add(new HBaseColumn("base","regionCode",busReceiverEntity.getRegionCode(),busReceiverEntity.getId()));
        hQuery.getColumns().add(new HBaseColumn("extends","address",busReceiverEntity.getAddress(),busReceiverEntity.getId()));
        hQuery.getColumns().add(new HBaseColumn("extends","memberFamily",String.valueOf(busReceiverEntity.getMemberFamily()),busReceiverEntity.getId()));
        hQuery.getColumns().add(new HBaseColumn("extends","enName",String.valueOf(busReceiverEntity.getEnName()),busReceiverEntity.getId()));
        hbaseUtil.bufferInsert(hQuery);
    }

    public void batchSave(ListBusReceiverEntity> list){
        HQuery hQuery = new HQuery("nias:bus_receiver");
        for(BusReceiverEntity busReceiverEntity:list){
            hQuery.getColumns().add(new HBaseColumn("base","name",busReceiverEntity.getName(),busReceiverEntity.getId()));
            hQuery.getColumns().add(new HBaseColumn("base","regionCode",busReceiverEntity.getRegionCode(),busReceiverEntity.getId()));
            hQuery.getColumns().add(new HBaseColumn("extends","address",busReceiverEntity.getAddress(),busReceiverEntity.getId()));
            hQuery.getColumns().add(new HBaseColumn("extends","memberFamily",String.valueOf(busReceiverEntity.getMemberFamily()),busReceiverEntity.getId()));
            hQuery.getColumns().add(new HBaseColumn("extends","enName",String.valueOf(busReceiverEntity.getEnName()),busReceiverEntity.getId()));
        }
        hbaseUtil.bufferInsert(hQuery);
    }

    @Override
    public BusReceiverEntity queryByRowId(Serializable id) {
        HQuery hQuery = new HQuery("nias:bus_receiver",id);
        BusReceiverEntity busReceiverEntity = hbaseUtil.get(hQuery,BusReceiverEntity.class);
        return busReceiverEntity;
    }

    /**
     * 删除表
     * @param table
     */
    public void deleteAll(String table){
        hbaseUtil.deleteAll(table);
    }


}

文章来源于互联网:Springboot2(38)集成hbase

harry

这个人很懒,什么都没留下

文章评论