Springboot2(38)集成hbase
添加依赖
org.apache.hbase
hbase-client
1.3.0
org.slf4j
slf4j-log4j12
javax.servlet
servlet-api
com.google.guava
guava
org.elasticsearch
elasticsearch
org.springframework.data
spring-data-hadoop
${spring-data-hadoop.version}
javax.servlet
*
添加配置application.yml
hbase.zookeeper.quorum: 10.10.1.141
hbase.zookeeper.port: 2181
hbase.zookeeper.znode: /hbase
hbase.client.keyvalue.maxsize: 1572864000
配置类
@Configuration
@EnableConfigurationProperties(HbaseProperties.class)
public class HbaseConfig {
@Bean("conf")
public org.apache.hadoop.conf.Configuration configuration(
@Value("${hbase.zookeeper.quorum}") String quorum, @Value("${hbase.zookeeper.port}") String port) {
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", quorum);
conf.set("hbase.zookeeper.port", port);
return conf;
}
@Bean
public HbaseTemplate hbaseTemplate(org.apache.hadoop.conf.Configuration conf) {
HbaseTemplate hbaseTemplate = new HbaseTemplate();
hbaseTemplate.setConfiguration(conf);
hbaseTemplate.setAutoFlush(true);
return hbaseTemplate;
}
@Bean("hBaseAdmin")
public Admin createHbaseAdmin(org.apache.hadoop.conf.Configuration conf) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
return admin;
}
}
工具类
package cn.myframe.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
@Slf4j
public class HbaseUtil {
@Autowired
private HbaseTemplate hbaseTemplate;
@Autowired
private Admin hBaseAdmin;
/**
* 创建表
* @param tableName
* @param familys
*/
public void createTable(String tableName,String[] familys){
try{
TableName table = TableName.valueOf(tableName);
if (hBaseAdmin.tableExists(table)) {
log.info("table[{}] already exists!",tableName);
} else {
HTableDescriptor tableDesc = new HTableDescriptor(table);
for (int i = 0; i familys.length; i++) {
tableDesc.addFamily(new HColumnDescriptor(familys[i]));
}
hBaseAdmin.createTable(tableDesc);
}
}catch (Exception e){
}
}
/**
*
* @param query
* @return
*/
public Object execute(HQuery query) {
if(StringUtils.isBlank(String.valueOf(query.getRow())) || query.getColumns().isEmpty()){
return null;
}
return hbaseTemplate.execute(query.getTable(), new TableCallbackObject>() {
@SuppressWarnings("deprecation")
@Override
public Object doInTable(HTableInterface table) throws Throwable {
try {
byte[] rowkey = String.valueOf(query.getRow()).getBytes();
Put put = new Put(rowkey);
for(HBaseColumn col:query.getColumns()){
put.addColumn(Bytes.toBytes(col.getFamily()), Bytes.toBytes(col.getQualifier()),
Bytes.toBytes(col.getValue()));
}
table.put(put);
} catch (Exception e) {
log.warn("==> hbase get object fail> "+query.getRow());
}
return null;
}
});
}
/**
* 批量插入
* @param query
* @return
*/
public Object bufferInsert(HQuery query){
return hbaseTemplate.execute(query.getTable(), new TableCallbackObject>() {
@SuppressWarnings("deprecation")
@Override
public Object doInTable(HTableInterface table) throws Throwable {
table.setAutoFlush(false);
//设置数据缓存区域
table.setWriteBufferSize(64*1024*1024);
try {
for(HBaseColumn col:query.getColumns()){
byte[] rowkey = String.valueOf(col.getRowKey()).getBytes();
Put put = new Put(rowkey);
put.addColumn(Bytes.toBytes(col.getFamily()), Bytes.toBytes(col.getQualifier()),
Bytes.toBytes(col.getValue()));
table.put(put);
}
//刷新缓存区
table.flushCommits();
//关闭表连接
table.close();
//Thread.sleep(20000);
} catch (Exception e) {
log.warn("==> hbase get object fail> "+query.getRow());
}
return null;
}
});
}
/**
* 通过表名和key获取一行数据
* @param query
* @param c
* @param
* @return
*/
public T> T get(HQuery query, ClassT> c) {
if(StringUtils.isBlank(query.getTable()) || StringUtils.isBlank(String.valueOf(query.getRow()))){
return null;
}
return hbaseTemplate.get(query.getTable(), String.valueOf(query.getRow()), new RowMapperT>() {
public T mapRow(Result result, int rowNum) throws Exception {
ListCell> ceList = result.listCells();
T item=c.newInstance();
JSONObject obj = new JSONObject();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
obj.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()),
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
}else{
return null;
}
System.out.println(obj.toJSONString());
item=JSON.parseObject(obj.toJSONString(), c);
return item;
}
});
}
/**
* 通过表名,开始行键和结束行键获取数据
*
* @return
*/
public T> ListT> find(HQuery query,ClassT> c) {
//如果未设置scan,设置scan
/* if (query.getScan() == null) {
//起止搜索
if(StringUtils.isNotBlank(query.getStartRow()) && StringUtils.isNotBlank(query.getStopRow())){
query.setSearchLimit(query.getStartRow(), query.getStopRow());
}
//主要配合pageFilter,指定起始点
if(StringUtils.isNotBlank(query.getStartRow())){
query.setScanStartRow(query.getStartRow());
}
//列匹配搜索
if(StringUtils.isNotBlank(query.getFamily()) &&StringUtils.isNotBlank(query.getQualifier())
&&StringUtils.isNotBlank(query.getQualifierValue())){
query.setSearchEqualFilter(query.getFamily(),query.getQualifier(),query.getQualifierValue());
}
//分页搜索
if(query.getPageFilter()!=null){
query.setFilters(query.getPageFilter());
}
if(query.getScan()==null){
query.setScan(new Scan());
}
}*/
//设置缓存
query.getScan().setCacheBlocks(false);
query.getScan().setCaching(2000);
return hbaseTemplate.find(query.getTable(), query.getScan(), new RowMapperT>() {
@Override
public T mapRow(Result result, int rowNum) throws Exception {
ListCell> ceList = result.listCells();
JSONObject obj = new JSONObject();
T item =c.newInstance();
if (ceList != null && ceList.size() > 0) {
for (Cell cell : ceList) {
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength());
String quali = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
if(value.startsWith("[")){
obj.put(quali, JSONArray.parseArray(value));
}else{
obj.put(quali, value);
}
}
}
item =JSON.parseObject(obj.toJSONString(), c);
return item;
}
});
}
public void delete(HQuery query){
hbaseTemplate.delete(query.getTable(), String.valueOf(query.getRow()), query.getFamily());
}
/**
* 删除表
* @param table
*/
public void deleteAll(String table){
TableName tableName = TableName.valueOf(table);
try {
//在删除一个表之前,需要首先确保它是 disabled
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
}
service类
public class BusReceiverServiceImp implements BusReceiverService {
@Autowired
private HbaseUtil hbaseUtil;
/**
* 创建表
*/
@PostConstruct
public void createTable(){
String[] familys = {"base","extends"};
TableName tableName = TableName.valueOf("nias:bus_receiver");
hbaseUtil.createTable("nias:bus_receiver",familys);
}
@Override
public void save(BusReceiverEntity busReceiverEntity) {
HQuery hQuery = new HQuery("nias:bus_receiver",String.valueOf(busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("base","name",busReceiverEntity.getName(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("base","regionCode",busReceiverEntity.getRegionCode(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","address",busReceiverEntity.getAddress(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","memberFamily",String.valueOf(busReceiverEntity.getMemberFamily()),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","enName",String.valueOf(busReceiverEntity.getEnName()),busReceiverEntity.getId()));
hbaseUtil.bufferInsert(hQuery);
}
public void batchSave(ListBusReceiverEntity> list){
HQuery hQuery = new HQuery("nias:bus_receiver");
for(BusReceiverEntity busReceiverEntity:list){
hQuery.getColumns().add(new HBaseColumn("base","name",busReceiverEntity.getName(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("base","regionCode",busReceiverEntity.getRegionCode(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","address",busReceiverEntity.getAddress(),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","memberFamily",String.valueOf(busReceiverEntity.getMemberFamily()),busReceiverEntity.getId()));
hQuery.getColumns().add(new HBaseColumn("extends","enName",String.valueOf(busReceiverEntity.getEnName()),busReceiverEntity.getId()));
}
hbaseUtil.bufferInsert(hQuery);
}
@Override
public BusReceiverEntity queryByRowId(Serializable id) {
HQuery hQuery = new HQuery("nias:bus_receiver",id);
BusReceiverEntity busReceiverEntity = hbaseUtil.get(hQuery,BusReceiverEntity.class);
return busReceiverEntity;
}
/**
* 删除表
* @param table
*/
public void deleteAll(String table){
hbaseUtil.deleteAll(table);
}
}
文章来源于互联网:Springboot2(38)集成hbase
文章评论