Hbase从入门到入坑

2021年12月15日 356点热度 0条评论

Hbase从入门到入坑

  1. 本博客已迁移至微信公众号!将不再更新
  2. 关注公众号即可获得免费学习资源,获得免费指导!!!
  3. 公众号后续将会持续更新clickhouse,sparkstreaming,flink,数仓建模,用户画像,实时计算,推荐系统,实时数仓等内容,感兴趣的朋友可以关注
  4. 不定期会有朋友的面经分享

%title插图%num%title插图%num

目录

一 什么是HBASE

二 安装HBASE

三 hbase初体验

四 HBASE客户端API操作

五 HBASE运行原理

5.1 master职责

5.2 Region Server 职责

5.3 zookeeper集群所起作用

5.4 HBASE读写数据流程

5.4.1 写数据流程

5.4.2 读数据流程

5.4.3 数据flush过程

5.4.4 数据合并过程

5.5 hbase:meta表

5.6 Region Server内部机制

六.HBASE优化

6.1 高可用

6.2 预分区

6.3 RowKey设计

6.4 内存优化

6.5 基础优化


一 什么是HBASE

HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。

HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。

HBASE与mysql、oralce、db2、sqlserver等关系型数据库不同,它是一个NoSQL数据库(非关系型数据库)

  1. Hbase的表模型与关系型数据库的表模型不同:
  2. Hbase的表没有固定的字段定义;
  3. Hbase的表中每行存储的都是一些key-value对
  4. Hbase的表中有列族的划分,用户可以指定将哪些kv插入哪个列族
  5. Hbase的表在物理存储上,是按照列族来分割的,不同列族的数据一定存储在不同的文件中
  6. Hbase的表中的每一行都固定有一个行键,而且每一行的行键在表中不能重复
  7. Hbase中的数据,包含行键,包含key,包含value,都是byte[ ]类型,hbase不负责为用户维护数据类型
  8. HBASE对事务的支持很差

HBASE相比于其他nosql数据库(mongodb、redis、cassendra、hazelcast)的特点:

Hbase的表数据存储在HDFS文件系统中

从而,hbase具备如下特性:存储容量可以线性扩展; 数据存储的安全性可靠性极高!

二 安装HBASE

HBASE是一个分布式系统

其中有一个管理角色:  HMaster(一般2台,一台active,一台backup)

其他的数据节点角色:  HRegionServer(很多台,看数据容量)

2.1 安装准备

需要先有一个java环境

首先,要有一个HDFS集群,并正常运行; regionserver应该跟hdfs中的datanode在一起

其次,还需要一个zookeeper集群,并正常运行

然后,安装HBASE

角色分配如下:

Hdp01:  namenode  datanode  regionserver  hmaster  zookeeper

Hdp02:  datanode   regionserver  zookeeper

Hdp03:  datanode   regionserver  zookeeper

2.2 安装步骤

解压hbase安装包

修改hbase-env.sh

export JAVA_HOME=/root/apps/jdk1.7.0_67

export HBASE_MANAGES_ZK=false

修改hbase-site.xml

        

                hbase.rootdir

                hdfs://hdp01:9000/hbase

        

        

                hbase.cluster.distributed

                true

        

        

                hbase.zookeeper.quorum

                hdp01:2181,hdp02:2181,hdp03:2181

        

修改 regionservers

hdp01

hdp02

hdp03

2.3 启动hbase集群

bin/start-hbase.sh

启动完后,还可以在集群中找任意一台机器启动一个备用的master

bin/hbase-daemon.sh start master

新启的这个master会处于backup状态

三 hbase初体验

3.1 启动hbase命令行客户端

bin/hbase shell

Hbase> list     // 查看表

Hbase> status   // 查看集群状态

Hbase> version  // 查看集群版本

3.2 hbase表模型的特点

%title插图%num

  1. 一个表,有表名
  2. 一个表可以分为多个列族(不同列族的数据会存储在不同文件中)
  3. 表中的每一行有一个“行键rowkey”,而且行键在表中不能重复
  4. 表中的每一对kv数据称作一个cell
  5. hbase可以对数据存储多个历史版本(历史版本数量可配置)
  6. 整张表由于数据量过大,会被横向切分成若干个region(用rowkey范围标识),不同region的数据也存储在不同文件中%title插图%num
  7. hbase会对插入的数据按顺序存储:

     要点一:首先会按行键排序

     要点二:同一行里面的kv会按列族排序,再按k排序

3.3 hbase的表中能存储什么数据类型

hbase中只支持byte[]

此处的byte[] 包括了: rowkey,key,value,列族名,表名

3.4 hbase命令行客户端操作

名称

命令表达式

创建表

create '表名', '列族名1','列族名2','列族名N'

查看所有表

list

描述表

describe  ‘表名’

判断表存在

exists  '表名'

判断是否禁用启用表

is_enabled '表名'

is_disabled ‘表名’

添加记录      

put  ‘表名’, ‘rowKey’, ‘列族 : 列‘  ,  '值'

查看记录rowkey下的所有数据

get  '表名' , 'rowKey'

查看表中的记录总数

count  '表名'

获取某个列族

get '表名','rowkey','列族'

获取某个列族的某个列

get '表名','rowkey','列族:列’

删除记录

delete  ‘表名’ ,‘行名’ , ‘列族:列'

删除整行

deleteall '表名','rowkey'

删除一张表

先要屏蔽该表,才能对该表进行删除

第一步 disable ‘表名’ ,第二步  drop '表名'

清空表

truncate '表名'

查看所有记录

scan "表名"  

查看某个表某个列中所有数据

scan "表名" , {COLUMNS=>'列族名:列名'}

更新记录

就是重写一遍,进行覆盖,hbase没有修改,都是追加

3.4.1 建表

create 't_user_info','base_info','extra_info'

                      表名      列族名   列族名

3.4.2 插入数据

hbase(main):011:0> put 't_user_info','001','base_info:username','zhangsan'

0 row(s) in 0.2420 seconds

 

hbase(main):012:0> put 't_user_info','001','base_info:age','18'

0 row(s) in 0.0140 seconds

 

hbase(main):013:0> put 't_user_info','001','base_info:sex','female'

0 row(s) in 0.0070 seconds

 

hbase(main):014:0> put 't_user_info','001','extra_info:career','it'

0 row(s) in 0.0090 seconds

 

hbase(main):015:0> put 't_user_info','002','extra_info:career','actoress'

0 row(s) in 0.0090 seconds

 

hbase(main):016:0> put 't_user_info','002','base_info:username','liuyifei'

0 row(s) in 0.0060 seconds

3.4.3 查询方式一 scan扫描

hbase(main):017:0> scan 't_user_info'

ROW                               COLUMN+CELL                                                                                     

 001                              column=base_info:age, timestamp=1496567924507, value=18                                         

 001                              column=base_info:sex, timestamp=1496567934669, value=female                                     

 001                              column=base_info:username, timestamp=1496567889554, value=zhangsan                              

 001                              column=extra_info:career, timestamp=1496567963992, value=it                                     

 002                              column=base_info:username, timestamp=1496568034187, value=liuyifei                              

 002                              column=extra_info:career, timestamp=1496568008631, value=actoress    

3.4.4 查询方式二 get单行数据

hbase(main):020:0> get 't_user_info','001'

COLUMN                            CELL                                                                                            

 base_info:age                    timestamp=1496568160192, value=19                                                               

 base_info:sex                    timestamp=1496567934669, value=female                                                           

 base_info:username               timestamp=1496567889554, value=zhangsan                                                         

 extra_info:career                timestamp=1496567963992, value=it                                                               

4 row(s) in 0.0770 seconds

3.4.5 删除一个kv数据

hbase(main):021:0> delete 't_user_info','001','base_info:sex'

0 row(s) in 0.0390 seconds

删除整行数据

hbase(main):024:0> deleteall 't_user_info','001'

0 row(s) in 0.0090 seconds

hbase(main):025:0> get 't_user_info','001'

COLUMN                            CELL                                                                                            

0 row(s) in 0.0110 seconds

3.4.6 删除整个表

hbase(main):028:0> disable 't_user_info'

0 row(s) in 2.3640 seconds

hbase(main):029:0> drop 't_user_info'

0 row(s) in 1.2950 seconds

hbase(main):030:0> list

TABLE                                                                                                                             

0 row(s) in 0.0130 seconds

=> []

3.5 Hbase重要特性-排序特性(行键)

与nosql数据库们一样,row key是用来检索记录的主键。访问HBASE table中的行,只有三种方式:

1.通过单个row key访问

2.通过row key的range(正则)

3.全表扫描

Row key行键 (Row key)可以是任意字符串(最大长度 是 64KB,实际应用中长度一般为 10-100bytes),在HBASE内部,row key保存为字节数组。存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)

插入到hbase中去的数据,hbase会自动排序存储:

排序规则:  首先看行键,然后看列族名,然后看列(key)名; 按字典顺序

Hbase的这个特性跟查询效率有极大的关系

比如:一张用来存储用户信息的表,有名字,户籍,年龄,职业....等信息

然后,在业务系统中经常需要:

查询某个省的所有用户

经常需要查询某个省的指定姓的所有用户

思路:如果能将相同省的用户在hbase的存储文件中连续存储,并且能将相同省中相同姓的用户连续存储,那么,上述两个查询需求的效率就会提高!!!

做法:将查询条件拼到rowkey内

四 HBASE客户端API操作

4.1 简洁版

HbaseClientDDL 

package cn.hbase.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.junit.Before;
import org.junit.Test;


/**
 *  
 *  1、构建连接
 *  2、从连接中取到一个表DDL操作工具admin
 *  3、admin.createTable(表描述对象);
 *  4、admin.disableTable(表名);
	5、admin.deleteTable(表名);
	6、admin.modifyTable(表名,表描述对象);	
 *  
 *
 */
public class HbaseClientDDL {
	Connection conn = null;
	
	@Before
	public void getConn() throws Exception{
		// 构建一个连接对象
		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
		
		conn = ConnectionFactory.createConnection(conf);
	}
	
	
	
	/**
	 * DDL
	 * @throws Exception 
	 */
	@Test
	public void testCreateTable() throws Exception{

		// 从连接中构造一个DDL操作器
		Admin admin = conn.getAdmin();
		
		// 创建一个表定义描述对象
		HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("user_info"));
		
		// 创建列族定义描述对象
		HColumnDescriptor hColumnDescriptor_1 = new HColumnDescriptor("base_info");
		hColumnDescriptor_1.setMaxVersions(3); // 设置该列族中存储数据的最大版本数,默认是1
		
		HColumnDescriptor hColumnDescriptor_2 = new HColumnDescriptor("extra_info");
		
		// 将列族定义信息对象放入表定义对象中
		hTableDescriptor.addFamily(hColumnDescriptor_1);
		hTableDescriptor.addFamily(hColumnDescriptor_2);
		
		
		// 用ddl操作器对象:admin 来建表
		admin.createTable(hTableDescriptor);
		
		// 关闭连接
		admin.close();
		conn.close();
		
	}
	
	
	/**
	 * 删除表
	 * @throws Exception 
	 */
	@Test
	public void testDropTable() throws Exception{
		
		Admin admin = conn.getAdmin();
		
		// 停用表
		admin.disableTable(TableName.valueOf("user_info"));
		// 删除表
		admin.deleteTable(TableName.valueOf("user_info"));
		
		
		admin.close();
		conn.close();
	}
	
	// 修改表定义--添加一个列族
	@Test
	public void testAlterTable() throws Exception{
		
		Admin admin = conn.getAdmin();
		
		// 取出旧的表定义信息
		HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf("user_info"));
		
		
		// 新构造一个列族定义
		HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("other_info");
		hColumnDescriptor.setBloomFilterType(BloomType.ROWCOL); // 设置该列族的布隆过滤器类型
		
		// 将列族定义添加到表定义对象中
		tableDescriptor.addFamily(hColumnDescriptor);
		
		
		// 将修改过的表定义交给admin去提交
		admin.modifyTable(TableName.valueOf("user_info"), tableDescriptor);
		
		
		admin.close();
		conn.close();
		
	}
	
	
	/**
	 * DML -- 数据的增删改查
	 */
	
	

}

HbaseClientDML

package cn.hbase.demo;


import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;

public class HbaseClientDML {
	Connection conn = null;
	
	@Before
	public void getConn() throws Exception{
		// 构建一个连接对象
		Configuration conf = HBaseConfiguration.create(); // 会自动加载hbase-site.xml
		conf.set("hbase.zookeeper.quorum", "hdp-01:2181,hdp-02:2181,hdp-03:2181");
		
		conn = ConnectionFactory.createConnection(conf);
	}
	
	
	/**
	 * 增
	 * 改:put来覆盖
	 * @throws Exception 
	 */
	@Test
	public void testPut() throws Exception{
		
		// 获取一个操作指定表的table对象,进行DML操作
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		// 构造要插入的数据为一个Put类型(一个put对象只能对应一个rowkey)的对象
		Put put = new Put(Bytes.toBytes("001"));
		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("张三"));
		put.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("18"));
		put.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("北京"));
		
		
		Put put2 = new Put(Bytes.toBytes("002"));
		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("李四"));
		put2.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes("28"));
		put2.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("addr"), Bytes.toBytes("上海"));
	
		
		ArrayList puts = new ArrayList();
		puts.add(put);
		puts.add(put2);
		
		
		// 插进去
		table.put(puts);
		
		table.close();
		conn.close();
		
	}
	
	
	/**
	 * 循环插入大量数据
	 * @throws Exception 
	 */
	@Test
	public void testManyPuts() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		ArrayList puts = new ArrayList();
		
		for(int i=0;i dels = new ArrayList();
		dels.add(delete1);
		dels.add(delete2);
		
		table.delete(dels);
		
		
		table.close();
		conn.close();
	}
	
	/**
	 * 查
	 * @throws Exception 
	 */
	@Test
	public void testGet() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		Get get = new Get("002".getBytes());
		
		Result result = table.get(get);
		
		// 从结果中取用户指定的某个key的value
		byte[] value = result.getValue("base_info".getBytes(), "age".getBytes());
		System.out.println(new String(value));
		
		System.out.println("-------------------------");
		
		// 遍历整行结果中的所有kv单元格
		CellScanner cellScanner = result.cellScanner();
		while(cellScanner.advance()){
			Cell cell = cellScanner.current();
			
			byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
			byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
			byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
			byte[] valueArray = cell.getValueArray(); // value的字节数组
			
			System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
			System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
			System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
			System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
			
		}
		
		table.close();
		conn.close();
		
	}
	
	
	/**
	 * 按行键范围查询数据
	 * @throws Exception 
	 */
	@Test
	public void testScan() throws Exception{
		
		Table table = conn.getTable(TableName.valueOf("user_info"));
		
		// 包含起始行键,不包含结束行键,但是如果真的想查询出末尾的那个行键,那么,可以在末尾行键上拼接一个不可见的字节(00)
		Scan scan = new Scan("10".getBytes(), "1000001".getBytes());
		
		ResultScanner scanner = table.getScanner(scan);
		
		Iterator iterator = scanner.iterator();
		
		while(iterator.hasNext()){
			
			Result result = iterator.next();
			// 遍历整行结果中的所有kv单元格
			CellScanner cellScanner = result.cellScanner();
			while(cellScanner.advance()){
				Cell cell = cellScanner.current();
				
				byte[] rowArray = cell.getRowArray();  //本kv所属的行键的字节数组
				byte[] familyArray = cell.getFamilyArray();  //列族名的字节数组
				byte[] qualifierArray = cell.getQualifierArray();  //列名的字节数据
				byte[] valueArray = cell.getValueArray(); // value的字节数组
				
				System.out.println("行键: "+new String(rowArray,cell.getRowOffset(),cell.getRowLength()));
				System.out.println("列族名: "+new String(familyArray,cell.getFamilyOffset(),cell.getFamilyLength()));
				System.out.println("列名: "+new String(qualifierArray,cell.getQualifierOffset(),cell.getQualifierLength()));
				System.out.println("value: "+new String(valueArray,cell.getValueOffset(),cell.getValueLength()));
			}
			System.out.println("----------------------");
		}
	}
	
	@Test
	public void test(){
		String a = "000";
		String b = "000";
		
		System.out.println(a);
		System.out.println(b);
		
		
		byte[] bytes = a.getBytes();
		byte[] bytes2 = b.getBytes();
		
		System.out.println("");
		
	}
	
	

}

4.2 完整版

package com.zgcbank.hbase;

import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HbaseTest {

	/**
	 * 配置ss
	 */
	static Configuration config = null;
	private Connection connection = null;
	private Table table = null;

	@Before
	public void init() throws Exception {
		config = HBaseConfiguration.create();// 配置
		config.set("hbase.zookeeper.quorum", "master,work1,work2");// zookeeper地址
		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
		connection = ConnectionFactory.createConnection(config);
		table = connection.getTable(TableName.valueOf("user"));
	}

	/**
	 * 创建一个表
	 * 
	 * @throws Exception
	 */
	@Test
	public void createTable() throws Exception {
		// 创建表管理类
		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
		// 创建表描述类
		TableName tableName = TableName.valueOf("test3"); // 表名称
		HTableDescriptor desc = new HTableDescriptor(tableName);
		// 创建列族的描述类
		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
		// 将列族添加到表中
		desc.addFamily(family);
		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
		// 将列族添加到表中
		desc.addFamily(family2);
		// 创建表
		admin.createTable(desc); // 创建表
	}

	@Test
	@SuppressWarnings("deprecation")
	public void deleteTable() throws MasterNotRunningException,
			ZooKeeperConnectionException, Exception {
		HBaseAdmin admin = new HBaseAdmin(config);
		admin.disableTable("test3");
		admin.deleteTable("test3");
		admin.close();
	}

	/**
	 * 向hbase中增加数据
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings({ "deprecation", "resource" })
	@Test
	public void insertData() throws Exception {
		table.setAutoFlushTo(false);
		table.setWriteBufferSize(534534534);
		ArrayList arrayList = new ArrayList();
		for (int i = 21; i 

4.3 MapReduce操作Hbase

4.3.1 实现方法

Hbase对MapReduce提供支持,它实现了TableMapper类和TableReducer类,我们只需要继承这两个类即可。

1、写个mapper继承TableMapper

参数:Text:mapper的输出key类型; IntWritable:mapper的输出value类型。

      其中的map方法如下:

map(ImmutableBytesWritable key, Result value,Context context)

 参数:key:rowKey;value: Result ,一行数据; context上下文

2、写个reduce继承TableReducer

参数:Text:reducer的输入key; IntWritable:reduce的输入value;

 ImmutableBytesWritable:reduce输出到hbase中的rowKey类型。

      其中的reduce方法如下:

reduce(Text key, Iterable values,Context context)

参数: key:reduce的输入key;values:reduce的输入value;

4.3.2 准备表

1、建立数据来源表‘word’,包含一个列族‘content’

向表中添加数据,在列族中放入列‘info’,并将短文数据放入该列中,如此插入多行,行键为不同的数据即可

2、建立输出表‘stat’,包含一个列族‘content’

3、通过Mr操作Hbase的‘word’表,对‘content:info’中的短文做词频统计,并将统计结果写入‘stat’表的‘content:info中’,行键为单词

4.3.3 实现

package com.zgcbank.hbase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
/**
 * mapreduce操作hbase
 *
 */
public class HBaseMr {
	/**
	 * 创建hbase配置
	 */
	static Configuration config = null;
	static {
		config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
		config.set("hbase.zookeeper.property.clientPort", "2181");
	}
	/**
	 * 表信息
	 */
	public static final String tableName = "word";//表名1
	public static final String colf = "content";//列族
	public static final String col = "info";//列
	public static final String tableName2 = "stat";//表名2
	/**
	 * 初始化表结构,及其数据
	 */
	public static void initTB() {
		HTable table=null;
		HBaseAdmin admin=null;
		try {
			admin = new HBaseAdmin(config);//创建表管理
			/*删除表*/
			if (admin.tableExists(tableName)||admin.tableExists(tableName2)) {
				System.out.println("table is already exists!");
				admin.disableTable(tableName);
				admin.deleteTable(tableName);
				admin.disableTable(tableName2);
				admin.deleteTable(tableName2);
			}
			/*创建表*/
				HTableDescriptor desc = new HTableDescriptor(tableName);
				HColumnDescriptor family = new HColumnDescriptor(colf);
				desc.addFamily(family);
				admin.createTable(desc);
				HTableDescriptor desc2 = new HTableDescriptor(tableName2);
				HColumnDescriptor family2 = new HColumnDescriptor(colf);
				desc2.addFamily(family2);
				admin.createTable(desc2);
			/*插入数据*/
				table = new HTable(config,tableName);
				table.setAutoFlush(false);
				table.setWriteBufferSize(500);
				List lp = new ArrayList();
				Put p1 = new Put(Bytes.toBytes("1"));
				p1.add(colf.getBytes(), col.getBytes(),	("The Apache Hadoop software library is a framework").getBytes());
				lp.add(p1);
				Put p2 = new Put(Bytes.toBytes("2"));p2.add(colf.getBytes(),col.getBytes(),("The common utilities that support the other Hadoop modules").getBytes());
				lp.add(p2);
				Put p3 = new Put(Bytes.toBytes("3"));
				p3.add(colf.getBytes(), col.getBytes(),("Hadoop by reading the documentation").getBytes());
				lp.add(p3);
				Put p4 = new Put(Bytes.toBytes("4"));
				p4.add(colf.getBytes(), col.getBytes(),("Hadoop from the release page").getBytes());
				lp.add(p4);
				Put p5 = new Put(Bytes.toBytes("5"));
				p5.add(colf.getBytes(), col.getBytes(),("Hadoop on the mailing list").getBytes());
				lp.add(p5);
				table.put(lp);
				table.flushCommits();
				lp.clear();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if(table!=null){
					table.close();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	/**
	 * MyMapper 继承 TableMapper
	 * TableMapper 
	 * Text:输出的key类型,
	 * IntWritable:输出的value类型
	 */
	public static class MyMapper extends TableMapper {
		private static IntWritable one = new IntWritable(1);
		private static Text word = new Text();
		@Override
		//输入的类型为:key:rowKey; value:一行数据的结果集Result
		protected void map(ImmutableBytesWritable key, Result value,
				Context context) throws IOException, InterruptedException {
			//获取一行数据中的colf:col
			String words = Bytes.toString(value.getValue(Bytes.toBytes(colf), Bytes.toBytes(col)));// 表里面只有一个列族,所以我就直接获取每一行的值
			//按空格分割
			String itr[] = words.toString().split(" ");
			//循环输出word和1
			for (int i = 0; i  
	 * Text:输入的key类型,
	 * IntWritable:输入的value类型,
	 * ImmutableBytesWritable:输出类型,表示rowkey的类型
	 */
	public static class MyReducer extends
			TableReducer {
		@Override
		protected void reduce(Text key, Iterable values,
				Context context) throws IOException, InterruptedException {
			//对mapper的数据求和
			int sum = 0;
			for (IntWritable val : values) {//叠加
				sum += val.get();
			}
			// 创建put,设置rowkey为单词
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 封装数据
			put.add(Bytes.toBytes(colf), Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
			//写到hbase,需要指定rowkey、put
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
		}
	}
	
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		config.set("df.default.name", "hdfs://master:9000/");//设置hdfs的默认路径
		config.set("hadoop.job.ugi", "hadoop,hadoop");//用户名,组
		config.set("mapred.job.tracker", "master:9001");//设置jobtracker在哪
		//初始化表
		initTB();//初始化表
		//创建job
		Job job = new Job(config, "HBaseMr");//job
		job.setJarByClass(HBaseMr.class);//主类
		//创建scan
		Scan scan = new Scan();
		//可以指定查询某一列
		scan.addColumn(Bytes.toBytes(colf), Bytes.toBytes(col));
		//创建查询hbase的mapper,设置表名、scan、mapper类、mapper的输出key、mapper的输出value
		TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class,Text.class, IntWritable.class, job);
		//创建写入hbase的reducer,指定表名、reducer类、job
		TableMapReduceUtil.initTableReducerJob(tableName2, MyReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

五 HBASE运行原理

5.1 master职责

1.管理监控HRegionServer,实现其负载均衡。

2.处理region的分配或转移,比如在HRegion split时分配新的HRegion;在HRegionServer退出时迁移其负责的HRegion到其他        HRegionServer上。

3.处理元数据的变更

4.管理namespace和table的元数据(实际存储在HDFS上)。

5.权限控制(ACL)。

6.监控集群中所有HRegionServer的状态(通过Heartbeat和监听ZooKeeper中的状态)。

5.2 Region Server 职责

  1. 管理自己所负责的region数据的读写。
  2. 读写HDFS,管理Table中的数据。
  3. Client直接通过HRegionServer读写数据(从HMaster中获取元数据,找到RowKey所在的HRegion/HRegionServer后)。
  4. 刷新缓存到HDFS
  5. 维护Hlog
  6. 执行压缩
  7. 负责处理Region分片

5.3 zookeeper集群所起作用

  1. 存放整个HBase集群的元数据以及集群的状态信息。
  2. 实现HMaster主从节点的failover。

注: HMaster通过监听ZooKeeper中的Ephemeral节点(默认:/hbase/rs/*)来监控HRegionServer的加入和宕机。

在第一个HMaster连接到ZooKeeper时会创建Ephemeral节点(默认:/hbasae/master)来表示Active的HMaster,其后加进来的HMaster则监听该Ephemeral节点

如果当前Active的HMaster宕机,则该节点消失,因而其他HMaster得到通知,而将自身转换成Active的HMaster,在变为Active的HMaster之前,它会在/hbase/masters/下创建自己的Ephemeral节点。

5.4 HBASE读写数据流程

5.4.1 写数据流程

客户端现在要插入一条数据,rowkey=r000001, 这条数据应该写入到table表中的那个region中呢?

1/ 客户端要连接zookeeper, 从zk的/hbase节点找到hbase:meta表所在的regionserver(host:port);

2/ regionserver扫描hbase:meta中的每个region的起始行健,对比r000001这条数据在那个region的范围内;

3/ 从对应的 info:server key中存储了region是有哪个regionserver(host:port)在负责的;

4/ 客户端直接请求对应的regionserver;

5/ regionserver接收到客户端发来的请求之后,就会将数据写入到region中

%title插图%num

5.4.2 读数据流程

客户端现在要查询rowkey=r000001这条数据,那么这个流程是什么样子的呢?

%title插图%num

1/ 首先Client连接zookeeper, 找到hbase:meta表所在的regionserver;

2/ 请求对应的regionserver,扫描hbase:meta表,根据namespace、表名和rowkey在meta表中找到r00001所在的region是由那个regionserver负责的;

3/找到这个region对应的regionserver

4/ regionserver收到了请求之后,扫描对应的region返回数据到Client

(先从MemStore找数据,如果没有,再到BlockCache里面读;BlockCache还没有,再到StoreFile上读(为了读取的效率);

如果是从StoreFile里面读取的数据,不是直接返回给客户端,而是先写入BlockCache,再返回给客户端。)

注:客户会缓存这些位置信息,然而第二步它只是缓存当前RowKey对应的HRegion的位置,因而如果下一个要查的RowKey不在同一个HRegion中,则需要继续查询hbase:meta所在的HRegion,然而随着时间的推移,客户端缓存的位置信息越来越多,以至于不需要再次查找hbase:meta Table的信息,除非某个HRegion因为宕机或Split被移动,此时需要重新查询并且更新缓存。

5.4.3 数据flush过程

1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;

2)并将数据存储到HDFS中;

3)在HLog中做标记点。

5.4.4 数据合并过程

1)当数据块达到3块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并;

2)当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理;

3)当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.;

4)注意:HLog会同步到HDFS。

5.5 hbase:meta表

hbase:meta表存储了所有用户HRegion的位置信息:

Rowkey:tableName,regionStartKey,regionId,replicaId等;

info列族:这个列族包含三个列,他们分别是:

info:regioninfo列:

regionId,tableName,startKey,endKey,offline,split,replicaId;

info:server列:HRegionServer对应的server:port;

info:serverstartcode列:HRegionServer的启动时间戳。

%title插图%num

5.6 Region Server内部机制

%title插图%num

  • WAL即Write Ahead Log,在早期版本中称为HLog,它是HDFS上的一个文件,如其名字所表示的,所有写操作都会先保证将数据写入这个Log文件后,才会真正更新MemStore,最后写入HFile中。WAL文件存储在/hbase/WALs/${HRegionServer_Name}的目录中
  • BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。
  • HRegion是一个Table中的一个Region在一个HRegionServer中的表达。一个Table可以有一个或多个Region,他们可以在一个相同的HRegionServer上,也可以分布在不同的HRegionServer上,一个HRegionServer可以有多个HRegion,他们分别属于不同的Table。HRegion由多个Store(HStore)构成,每个HStore对应了一个Table在这个HRegion中的一个Column Family,即每个Column Family就是一个集中的存储单元,因而最好将具有相近IO特性的Column存储在一个Column Family,以实现高效读取(数据局部性原理,可以提高缓存的命中率)。HStore是HBase中存储的核心,它实现了读写HDFS功能,一个HStore由一个MemStore 和0个或多个StoreFile组成。
  • MemStore是一个写缓存(In Memory Sorted Buffer),所有数据的写在完成WAL日志写后,会 写入MemStore中,由MemStore根据一定的算法将数据Flush到地层HDFS文件中(HFile),通常每个HRegion中的每个 Column Family有一个自己的MemStore。
  • HFile(StoreFile) 用于存储HBase的数据(Cell/KeyValue)。在HFile中的数据是按RowKey、Column Family、Column排序,对相同的Cell(即这三个值都一样),则按timestamp倒序排列。
  • FLUSH详述
  1. 每一次Put/Delete请求都是先写入到MemStore中,当MemStore满后会Flush成一个新的StoreFile(底层实现是HFile),即一个HStore(Column Family)可以有0个或多个StoreFile(HFile)。
  2. 当一个HRegion中的所有MemStore的大小总和超过了hbase.hregion.memstore.flush.size的大小,默认128MB。此时当前的HRegion中所有的MemStore会Flush到HDFS中。
  3. 当全局MemStore的大小超过了hbase.regionserver.global.memstore.upperLimit的大小,默认40%的内存使用量。此时当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush顺序是MemStore大小的倒序(一个HRegion中所有MemStore总和作为该HRegion的MemStore的大小还是选取最大的MemStore作为参考?有待考证),直到总体的MemStore使用量低于hbase.regionserver.global.memstore.lowerLimit,默认38%的内存使用量。
  4. 当前HRegionServer中WAL的大小超过了hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs的数量,当前HRegionServer中所有HRegion中的MemStore都会Flush到HDFS中,Flush使用时间顺序,最早的MemStore先Flush直到WAL的数量少于hbase.regionserver.hlog.blocksize * hbase.regionserver.max.logs这里说这两个相乘的默认大小是2GB,查代码,hbase.regionserver.max.logs默认值是32,而hbase.regionserver.hlog.blocksize默认是32MB。但不管怎么样,因为这个大小超过限制引起的Flush不是一件好事,可能引起长时间的延迟

六.HBASE优化

6.1 高可用

在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。

1.关闭HBase集群(如果没有开启则跳过此步)

[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh

2.在conf目录下创建backup-masters文件

[atguigu@hadoop102 hbase]$ touch conf/backup-masters

3.在backup-masters文件中配置高可用HMaster节点

[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters

4.将整个conf目录scp到其他节点

[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/

[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/

6.2 预分区

每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。

1.手动设定预分区

hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']

2.生成16进制序列预分区

create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}

3.按照文件中设置的规则预分区

创建splits.txt文件内容如下:

aaaa

bbbb

cccc

dddd

然后执行:

create 'staff3','partition3',SPLITS_FILE => 'splits.txt'

4.使用JavaAPI创建预分区

//自定义算法,产生一系列Hash散列值存储在二维数组中

byte[][] splitKeys = 某个散列值函数

//创建HBaseAdmin实例

HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());

//创建HTableDescriptor实例

HTableDescriptor tableDesc = new HTableDescriptor(tableName);

//通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表

hAdmin.createTable(tableDesc, splitKeys);

6.3 RowKey设计

一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。

1.生成随机数、hash、散列值

比如:

原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7

原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd

原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913

在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。

2.字符串反转

20170524000001转成10000042507102

20170524000002转成20000042507102

3.字符串拼接

20170524000001_a12e

20170524000001_93i7

6.4 内存优化

HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。

6.5 基础优化

1.允许在HDFS的文件中追加内容

hdfs-site.xml、hbase-site.xml

属性:dfs.support.append

解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。

2.优化DataNode允许的最大文件打开数

 

 

hdfs-site.xml

属性:dfs.datanode.max.transfer.threads

解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096

3.优化延迟高的数据操作的等待时间

hdfs-site.xml

属性:dfs.image.transfer.timeout

解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。

4.优化数据的写入效率

mapred-site.xml

属性:

mapreduce.map.output.compress

mapreduce.map.output.compress.codec

解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。

5.设置RPC监听数量

hbase-site.xml

属性:hbase.regionserver.handler.count

解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。

6.优化HStore文件大小

hbase-site.xml

属性:hbase.hregion.max.filesize

解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。

7.优化hbase客户端缓存

hbase-site.xml

属性:hbase.client.write.buffer

解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。

8.指定scan.next扫描HBase所获取的行数

hbase-site.xml

属性:hbase.client.scanner.caching

解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。

9.flush、compact、split机制

当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。

涉及属性:

128M就是Memstore的默认阈值

hbase.hregion.memstore.flush.size:134217728

即:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。

hbase.regionserver.global.memstore.upperLimit:0.4

hbase.regionserver.global.memstore.lowerLimit:0.38

即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit

================================================================================

以后博客的内容都是通过微信公众号链接的形式发布,之后迁移到公众号的文章都会重新修正,也更加详细,对于以前博客内容里面的错误或者理解不当的地方都会在公众号里面修正。

欢迎关注我的微信公众号,以后我会发布更多工作中总结的技术内容。

%title插图%num

文章来源于互联网:Hbase从入门到入坑

harry

这个人很懒,什么都没留下

文章评论