HBase 是 BigTable 的开源 Java 版本。是建立在 HDFS 之上,提供高可靠性、高性能、列存储、可伸缩、实时读写 NoSql 的数据库系统。
它介于 NoSql 和 RDBMS 之间,仅能通过主键(row key)和主键的 range 来检索数据,仅支持单行事务(可通过 hive 支持来实现多表 join 等复杂操作)。
主要用来存储结构化和半结构化的松散数据。
Hbase 查询数据功能很简单,不支持 join 等复杂操作,不支持复杂的事务(行级的事务) Hbase 中支持的数据类型:byte[] 与 hadoop 一样,Hbase 目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。
HBase 中的表一般有这样的特点:
HBase 的发展历程
HBase 的原型是 Google 的 BigTable 论文,受到了该论文思想的启发,目前作为 Hadoop 的子项目来开发维护,用于支持结构化的数据存储。
官方网站:http://hbase.apache.org
| 关系型数据库(结构) | HBase(结构) | 关系型数据库(功能) | HBase(功能) |
|---|---|---|---|
| 数据库以表的形式存在 | 数据库以 region 的形式存在 | 支持向上扩展 | 支持向外扩展 |
| 支持 FAT、NTFS、EXT、文件系统 | 支持 HDFS 文件系统 | 使用 SQL 查询 | 使用 API 和 MapReduce 来访问 HBase 表数据 |
| 使用 Commit log 存储日志 | 使用 WAL(Write-Ahead Logs)存储日志 | 面向行,即每一行都是一个连续单元 | 面向列,即每一列都是一个连续的单元 |
| 参考系统是坐标系统 | 参考系统是 Zookeeper | 数据总量依赖于服务器配置 | 数据总量不依赖具体某台机器,而取决于机器数量 |
| 使用主键(PK) | 使用行键(row key) | 具有 ACID 支持 | HBase 不支持 ACID(Atomicity、Consistency、Isolation、Durability) |
| 支持分区 | 支持分片 | 适合结构化数据 | 适合结构化数据和非结构化数据 |
| 使用行、列、单元格 | 使用行、列、列族和单元格 | 传统关系型数据库一般都是中心化的 | 一般都是分布式的 |
| 支持事务 | HBase 不支持事务 | ||
| 支持 Join | 不支持 Join |
Hbase 适合存储 PB 级别的海量数据,在 PB 级别的数据以及采用廉价 PC 存储的情况下,能在几十到百毫秒内返回数据。这与 Hbase 的极易扩展性息息相关。正式因为 Hbase 良好的扩展性,才为海量数据的存储提供了便利。
这里的列式存储其实说的是列族存储,Hbase 是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。
Hbase 的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。通过横向添加 RegionSever 的机器,进行水平扩展,提升 Hbase 上层的处理能力,提升 Hbsae 服务更多 Region 的能力。备注:RegionServer 的作用是管理 region、承接业务的访问,这个后面会详细的介绍通过横向添加 Datanode 的机器,进行存储层扩容,提升 Hbase 的数据存储能力和提升后端存储的读写能力。
由于目前大部分使用 Hbase 的架构,都是采用的廉价 PC,因此单个 IO 的延迟其实并不小,一般在几十到上百 ms 之间。这里说的高并发,主要是在并发的情况下,Hbase 的单个 IO 延迟下降并不多。能获得高并发、低延迟的服务。
稀疏主要是针对 Hbase 列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。
Hbase和Hive在大数据架构中处在不同位置,Hbase主要解决实时数据查询问题,Hive主要解决海量数据处理和计算问题,一般是配合使用。
Hbase:Hadoop database 的简称,也就是基于Hadoop数据库,是一种NoSQL数据库,主要适用于海量明细数据(十亿、百亿)的随机实时查询,如日志明细、交易清单、轨迹行为等。
Hive:Hive是Hadoop数据仓库,严格来说,不是数据库,主要是让开发人员能够通过SQL来计算和处理HDFS上的结构化数据,适用于离线的批量数据计算。
Hbase主要解决实时数据查询问题,Hive主要解决数据处理和计算问题,一般是配合使用。
在大数据架构中,Hive和HBase是协作关系,数据流一般如下图:


Zookeeper: Master 的高可用、RegionServer 的监控、元数据的入口以及集群配置的维护等
HDFS: 提供底层数据支撑
功能:
功能:
HBase 的修改记录,当对 HBase 读写数据的时候,数据不是直接写进磁盘,它会在内存中保留一段时间(时间以及数据量阈值可以设定)。但把数据保存在内存中可能有更高的概率引起数据丢失,为了解决这个问题,数据会先写在一个叫做 Write-Ahead logfile 的文件中,然后再写入内存中。所以在系统出现故障的时候,数据可以通过这个日志文件重建。
这是在磁盘上保存原始数据的实际的物理文件,是实际的存储文件。
HFile 存储在 Store 中,一个 Store 对应 HBase 表中的一个列族。
顾名思义,就是内存存储,位于内存中,用来保存当前的数据操作,所以当数据保存在 WAL 中之后,RegsionServer 会在内存中存储键值对。
Hbase 表的分片,HBase 表会根据 RowKey 值被切分成不同的 region 存储在 RegionServer 中,在一个 RegionServer 中可以有多个不同的 region。
$ bin/hbase shell
hbase(main):001:0> help
hbase(main):002:0> list
创建 user 表,包含 info、data 两个列族
hbase(main):010:0> create 'user', 'info', 'data'
或者
hbase(main):010:0> create 'user', {NAME => 'info', VERSIONS => '3'},{NAME => 'data'}
向 user 表中插入信息,row key 为 rk0001,列族 info 中添加 name 列标示符,值为 zhangsan
hbase(main):011:0> put 'user', 'rk0001', 'info:name', 'zhangsan'
向 user 表中插入信息,row key 为 rk0001,列族 info 中添加 gender 列标示符,值为 female
hbase(main):012:0> put 'user', 'rk0001', 'info:gender', 'female'
向 user 表中插入信息,row key 为 rk0001,列族 info 中添加 age 列标示符,值为 20
hbase(main):013:0> put 'user', 'rk0001', 'info:age', 20
向 user 表中插入信息,row key 为 rk0001,列族 data 中添加 pic 列标示符,值为 picture
hbase(main):014:0> put 'user', 'rk0001', 'data:pic', 'picture'
获取 user 表中 row key 为 rk0001 的所有信息
hbase(main):015:0> get 'user', 'rk0001'
获取 user 表中 row key 为 rk0001,info 列族的所有信息
hbase(main):016:0> get 'user', 'rk0001', 'info'
获取 user 表中 row key 为 rk0001,info 列族的 name、age 列标示符的信息
hbase(main):017:0> get 'user', 'rk0001', 'info:name', 'info:age'
获取 user 表中 row key 为 rk0001,info、data 列族的信息
hbase(main):018:0> get 'user', 'rk0001', 'info', 'data'
或者这样写
hbase(main):019:0> get 'user', 'rk0001', {COLUMN => ['info', 'data']}
或者这样写
hbase(main):020:0> get 'user', 'rk0001', {COLUMN => ['info:name', 'data:pic']}
获取 user 表中 row key 为 rk0001,cell 的值为 zhangsan 的信息
hbase(main):030:0> get 'user', 'rk0001', {FILTER => "ValueFilter(=, 'binary:zhangsan')"}
获取 user 表中 row key 为 rk0001,列标示符中含有 a 的信息
hbase(main):031:0> get 'user', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"}
继续插入一批数据
hbase(main):032:0> put 'user', 'rk0002', 'info:name', 'fanbingbing'
hbase(main):033:0> put 'user', 'rk0002', 'info:gender', 'female'
hbase(main):034:0> put 'user', 'rk0002', 'info:nationality', '中国'
hbase(main):035:0> get 'user', 'rk0002', {FILTER => "ValueFilter(=, 'binary:中国')"}
查询 user 表中的所有信息
scan 'user'
查询 user 表中列族为 info 的信息
scan 'user', {COLUMNS => 'info'}
scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 5}
scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 3}
查询 user 表中列族为 info 和 data 的信息
scan 'user', {COLUMNS => ['info', 'data']}
scan 'user', {COLUMNS => ['info:name', 'data:pic']}
查询 user 表中列族为 info、列标示符为 name 的信息
scan 'user', {COLUMNS => 'info:name'}
查询 user 表中列族为 info、列标示符为 name 的信息,并且版本最新的 5 个
scan 'user', {COLUMNS => 'info:name', VERSIONS => 5}
查询 user 表中列族为 info 和 data 且列标示符中含有 a 字符的信息
scan 'user', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"}
查询 user 表中列族为 info,rk 范围是(rk0001, rk0003)的数据
scan 'user', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'}
查询 user 表中 row key 以 rk 字符开头的
scan 'user',{FILTER=>"PrefixFilter('rk')"}
查询 user 表中指定范围的数据
scan 'user', {TIMERANGE => [1392368783980, 1392380169184]}
count 'user'
更新操作同插入操作一模一样,只不过有数据就更新,没数据就添加。
将 user 表的 f1 列族版本号改为 5
hbase(main):050:0> alter 'user', NAME => 'info', VERSIONS => 5
删除 user 表 row key 为 rk0001,列标示符为 info:name 的数据
hbase(main):045:0> delete 'user', 'rk0001', 'info:name'
删除 user 表 row key 为 rk0001,列标示符为 info:name,timestamp 为 1392383705316 的数据
delete 'user', 'rk0001', 'info:name', 1392383705316
alter 'user', NAME => 'info', METHOD => 'delete'
或者
alter 'user', NAME => 'info', METHOD => 'delete'
hbase(main):017:0> truncate 'user'
首先需要先让该表为 disable 状态,使用命令:
hbase(main):049:0> disable 'user
然后才能 drop 这个表,使用命令:
hbase(main):050:0> drop 'user'
注意:如果直接 drop 表,会报错:Drop the named table. Table must first be disabled
例如:显示服务器状态
hbase(main):058:0> status 'node01'
显示 HBase 当前用户,例如:
hbase> whoami
显示当前所有的表
hbase> list
统计指定表的记录数,例如:
hbase> count 'user'
展示表结构信息
hbase> describe 'user'
检查表是否存在,适用于表量特别多的情况
hbase> exists 'user'
检查表是否启用或禁用
hbase> is_enabled 'user'
该命令可以改变表和列族的模式,例如:
为当前表增加列族:
hbase> alter 'user', NAME => 'CF2', VERSIONS => 2
为当前表删除列族:
hbase(main):002:0> alter 'user', 'delete' => 'CF2'
禁用一张表/启用一张表
删除一张表,记得在删除表之前必须先禁用
清空表
为了提高 HBase存储的利用率,很多 HBase使用者会对 HBase 表中的数据进行压缩。目前 HBase 可以支持的压缩方式有 GZ(GZIP)、LZO、LZ4 以及 Snappy。它们之间的区别如下:
各种压缩各有不同的特点,我们需要根据业务需求(解压和压缩速率、压缩率等)选择不同的压缩格式。多数情况下,选择Snppy或LZ0是比较好的选择,因为它们的压缩开销底,能节省空间。这里介绍一下 HBase 中使用 Snappy 的方法,其他的压缩设置方法和这个类似。
在创建 HBase 表的时候我们可以指定数据的压缩格式,如下;
hbase(main):010:0> create 'iteblog',{NAME=>'f1'}, {NAME=>'f2',COMPRESSION=>'Snappy'}
Created table iteblog
Took 1.2539 seconds
=> Hbase::Table - iteblog
hbase(main):011:0> describe 'iteblog'
Table iteblog is ENABLED
iteblog
COLUMN FAMILIES DESCRIPTION
{NAME => 'f1', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY =>'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
{NAME => 'f2', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY =>'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
2 row(s)
Took 0.0522 seconds
上面例子的表 iteblog 有两个列族,我们选择对 f2 列族进行 Snappy 压缩, f1 列族数据不压缩。
当然,如果我们表已经创建了,同样也可以对其进行压缩,方式如下:
hbase(main):001:0> alter 'iteblog', NAME => 'f', COMPRESSION => 'snappy'
Updating all regions with the new schema...
27/27 regions updated.
Done.
Took 9.5146 seconds
hbase(main):003:0> describe 'iteblog'
Table iteblog is ENABLED
iteblog
COLUMN FAMILIES DESCRIPTION
{NAME => 'f', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536'}
1 row(s)
Took 0.0782 seconds
设置完之后,其实数据并没有被压缩,我们需要对当前表执行 major_compact 命令手动进行压缩:
hbase(main):002:0> major_compact 'iteblog'
Took 1.2255 seconds
这样,iteblog 表的数据就可以被压缩了。
@Test
public void createTable() throws IOException {
//创建配置文件对象,并指定zookeeper的连接地址
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
//集群配置↓
//configuration.set("hbase.zookeeper.quorum", "101.236.39.141,101.236.46.114,101.236.46.113");
configuration.set("hbase.master", "node01:60000");
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
//通过HTableDescriptor来实现我们表的参数设置,包括表名,列族等等
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("myuser"));
//添加列族
hTableDescriptor.addFamily(new HColumnDescriptor("f1"));
//添加列族
hTableDescriptor.addFamily(new HColumnDescriptor("f2"));
//创建表
boolean myuser = admin.tableExists(TableName.valueOf("myuser"));
if(!myuser){
admin.createTable(hTableDescriptor);
}
//关闭客户端连接
admin.close();
}
@Test
public void addDatas() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
//获取表
Table myuser = connection.getTable(TableName.valueOf("myuser"));
//创建put对象,并指定rowkey
Put put = new Put("0001".getBytes());
put.addColumn("f1".getBytes(),"id".getBytes(), Bytes.toBytes(1));
put.addColumn("f1".getBytes(),"name".getBytes(), Bytes.toBytes("张三"));
put.addColumn("f1".getBytes(),"age".getBytes(), Bytes.toBytes(18));
put.addColumn("f2".getBytes(),"address".getBytes(), Bytes.toBytes("地球人"));
put.addColumn("f2".getBytes(),"phone".getBytes(), Bytes.toBytes("15874102589"));
//插入数据
myuser.put(put);
//关闭表
myuser.close();
}
初始化一批数据到 HBase 当中用于查询
@Test
public void insertBatchData() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
//获取表
Table myuser = connection.getTable(TableName.valueOf("myuser"));
//创建put对象,并指定rowkey
Put put = new Put("0002".getBytes());
put.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(1));
put.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("曹操"));
put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(30));
put.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("沛国谯县"));
put.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("16888888888"));
put.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("helloworld"));
Put put2 = new Put("0003".getBytes());
put2.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(2));
put2.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("刘备"));
put2.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(32));
put2.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put2.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("幽州涿郡涿县"));
put2.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("17888888888"));
put2.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("talk is cheap , show me the code"));
Put put3 = new Put("0004".getBytes());
put3.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(3));
put3.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("孙权"));
put3.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(35));
put3.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put3.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("下邳"));
put3.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("12888888888"));
put3.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("what are you 弄啥嘞!"));
Put put4 = new Put("0005".getBytes());
put4.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(4));
put4.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("诸葛亮"));
put4.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(28));
put4.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put4.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("四川隆中"));
put4.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("14888888888"));
put4.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("出师表你背了嘛"));
Put put5 = new Put("0005".getBytes());
put5.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(5));
put5.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("司马懿"));
put5.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(27));
put5.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put5.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("哪里人有待考究"));
put5.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("15888888888"));
put5.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("跟诸葛亮死掐"));
Put put6 = new Put("0006".getBytes());
put6.addColumn("f1".getBytes(),"id".getBytes(),Bytes.toBytes(5));
put6.addColumn("f1".getBytes(),"name".getBytes(),Bytes.toBytes("xiaobubu—吕布"));
put6.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(28));
put6.addColumn("f2".getBytes(),"sex".getBytes(),Bytes.toBytes("1"));
put6.addColumn("f2".getBytes(),"address".getBytes(),Bytes.toBytes("内蒙人"));
put6.addColumn("f2".getBytes(),"phone".getBytes(),Bytes.toBytes("15788888888"));
put6.addColumn("f2".getBytes(),"say".getBytes(),Bytes.toBytes("貂蝉去哪了"));
List<Put> listPut = new ArrayList<Put>();
listPut.add(put);
listPut.add(put2);
listPut.add(put3);
listPut.add(put4);
listPut.add(put5);
listPut.add(put6);
myuser.put(listPut);
myuser.close();
}
按照 rowkey 进行查询获取所有列的所有值
查询主键 rowkey 为 0003 的人:
@Test
public void searchData() throws IOException {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Get get = new Get(Bytes.toBytes("0003"));
Result result = myuser.get(get);
Cell[] cells = result.rawCells();
//获取所有的列名称以及列的值
for (Cell cell : cells) {
//注意,如果列属性是int类型,那么这里就不会显示
System.out.println(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()));
System.out.println(Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));
}
myuser.close();
}
按照 rowkey 查询指定列族下面的指定列的值:
@Test
public void searchData2() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
//通过rowKey进行查询
Get get = new Get("0003".getBytes());
get.addColumn("f1".getBytes(),"id".getBytes());
Result result = myuser.get(get);
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
myuser.close();
}
通过 startRowKey 和 endRowKey 进行扫描:
@Test
public void scanRowKey() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
scan.setStartRow("0004".getBytes());
scan.setStopRow("0006".getBytes());
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//遍历获取得到所有的列族以及所有的列的名称
KeyValue[] raw = result.raw();
for (KeyValue keyValue : raw) {
//获取所属列族
System.out.println(Bytes.toString(keyValue.getFamilyArray(),keyValue.getFamilyOffset(),keyValue.getFamilyLength()));
System.out.println(Bytes.toString(keyValue.getQualifierArray(),keyValue.getQualifierOffset(),keyValue.getQualifierLength()));
}
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
}
myuser.close();
}
通过 scan 进行全表扫描:
@Test
public void scanAllData() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
}
myuser.close();
}
过滤器的类型很多,但是可以分为两大类——比较过滤器,专用过滤器。
过滤器的作用是在服务端判断数据是否满足条件,然后只将满足条件的数据返回给客户端;
hbase 过滤器的比较运算符:
LESS <
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有
Hbase 过滤器的比较器(指定比较机制):
BinaryComparator 按字节索引顺序比较指定字节数组,采用Bytes.compareTo(byte[])
BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同
NullComparator 判断给定的是否为空
BitComparator 按位比较
RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非EQUAL
SubstringComparator 判断提供的子串是否出现在value中。
通过 RowFilter 过滤比 rowKey 0003 小的所有值出来
@Test
public void rowKeyFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("0003")));
scan.setFilter(rowFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
}
myuser.close();
}
查询比 f2 列族小的所有的列族内的数据
@Test
public void familyFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.LESS, new SubstringComparator("f2"));
scan.setFilter(familyFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "age".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
}
myuser.close();
}
只查询 name 列的值
@Test
public void qualifierFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("name"));
scan.setFilter(qualifierFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
// System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
}
myuser.close();
}
查询所有列当中包含 8 的数据
@Test
public void valueFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("8"));
scan.setFilter(valueFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
// System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
}
myuser.close();
}
SingleColumnValueFilter 会返回满足条件的整列值的所有字段
@Test
public void singleColumnFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
scan.setFilter(singleColumnValueFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
}
myuser.close();
}
与 SingleColumnValueFilter 相反,会排除掉指定的列,其他的列全部返回
查询以 00 开头的所有前缀的 rowkey
@Test
public void preFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
scan.setFilter(prefixFilter);
ResultScanner resultScanner = myuser.getScanner(scan);
for (Result result : resultScanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
}
myuser.close();
}
分页过滤器 PageFilter
@Test
public void pageFilter2() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
int pageNum = 3;
int pageSize = 2;
Scan scan = new Scan();
if (pageNum == 1) {
PageFilter filter = new PageFilter(pageSize);
scan.setStartRow(Bytes.toBytes(""));
scan.setFilter(filter);
scan.setMaxResultSize(pageSize);
ResultScanner scanner = myuser.getScanner(scan);
for (Result result : scanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
// System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
//System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
}
}else{
String startRowKey ="";
PageFilter filter = new PageFilter((pageNum - 1) * pageSize + 1 );
scan.setStartRow(startRowKey.getBytes());
scan.setMaxResultSize((pageNum - 1) * pageSize + 1);
scan.setFilter(filter);
ResultScanner scanner = myuser.getScanner(scan);
for (Result result : scanner) {
byte[] row = result.getRow();
startRowKey = new String(row);
}
Scan scan2 = new Scan();
scan2.setStartRow(startRowKey.getBytes());
scan2.setMaxResultSize(Long.valueOf(pageSize));
PageFilter filter2 = new PageFilter(pageSize);
scan2.setFilter(filter2);
ResultScanner scanner1 = myuser.getScanner(scan2);
for (Result result : scanner1) {
byte[] row = result.getRow();
System.out.println(new String(row));
}
}
myuser.close();
}
需求:使用 SingleColumnValueFilter 查询 f1 列族,name 为刘备的数据,并且同时满足 rowkey 的前缀以 00 开头的数据(PrefixFilter)
@Test
public void manyFilter() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Scan scan = new Scan();
FilterList filterList = new FilterList();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
filterList.addFilter(singleColumnValueFilter);
filterList.addFilter(prefixFilter);
scan.setFilter(filterList);
ResultScanner scanner = myuser.getScanner(scan);
for (Result result : scanner) {
//获取rowkey
System.out.println(Bytes.toString(result.getRow()));
//指定列族以及列打印列当中的数据出来
// System.out.println(Bytes.toInt(result.getValue("f1".getBytes(), "id".getBytes())));
System.out.println(Bytes.toString(result.getValue("f1".getBytes(), "name".getBytes())));
//System.out.println(Bytes.toString(result.getValue("f2".getBytes(), "phone".getBytes())));
}
myuser.close();
}
@Test
public void deleteByRowKey() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Table myuser = connection.getTable(TableName.valueOf("myuser"));
Delete delete = new Delete("0001".getBytes());
myuser.delete(delete);
myuser.close();
}
@Test
public void deleteTable() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
admin.disableTable(TableName.valueOf("myuser"));
admin.deleteTable(TableName.valueOf("myuser"));
admin.close();
}
HBase可以使用内置的Zookeeper,也可以使用外置的,在实际生产环境,为了保持统一性,一般使用外置Zookeeper。
Zookeeper在HBase中的作用:

HMaster仅仅维护者table和HRegion的元数据信息,负载很低。



首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。Trailer中又指针指向其他数据块的起始点,FileInfo记录了文件的一些meta信息。
数据模型图:

物理模型图:

与nosql数据库一样,row key是用来检索记录的主键。访问hbase table中的行,只有三种方式:
Row Key 行键可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes),在hbase内部,row key保存为字节数组。
Hbase会对表中的数据按照rowkey排序(字典顺序)
存储时,数据按照Row key的字典序(byte order)排序存储。设计key时,要充分排序存储这个特性,将经常一起读取的行存储放到一起。(位置相关性)。
注意:字典序对int排序的结果是 1,10,100,11,12,13,14,15,16,17,18,19,2,20,21 … 。要保持整形的自然序,行键必须用0作左填充。
行的一次读写是原子操作 (不论一次读写多少列)。这个设计决策能够使用户很容易的理解程序在对同一个行进行并发更新操作时的行为。
HBase表中的每个列,都归属于某个列族。列族是表的schema的一部分(而列不是),必须在使用表之前定义。
列名都以列族作为前缀。例如 courses:history , courses:math 都属于 courses 这个列族。
访问控制、磁盘和内存的使用统计都是在列族层面进行的。列族越多,在取一行数据时所要参与IO、搜寻的文件就越多,所以,如果没有必要,不要设置太多的列族。
列族下面的具体列,属于某一个ColumnFamily,类似于在mysql当中创建的具体的列。
HBase中通过row和columns确定的为一个存贮单元称为cell。每个 cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。
为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式:
用户可以针对每个列族进行设置。
由{row key, column( =
数据的版本号,每条数据可以有多个版本号,默认值为系统时间戳,类型为Long。
HBase 整体结构
StoreFile以HFile格式保存在HDFS上。
首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。Trailer中有指针指向其他数据块的起始点。
File Info中记录了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN, LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。
Data Index和Meta Index块记录了每个Data块和Meta块的起始点。
Data Block是HBase I/O的基本单元,为了提高效率,HRegionServer中有基于LRU的Block Cache机制。每个Data块的大小可以在创建一个Table的时候通过参数指定,大号的Block有利于顺序Scan,小号Block利于随机查询。每个Data块除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏。
HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。
开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是 RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。
HFile分为六个部分:
HFile的Data Block,Meta Block通常采用压缩方式存储,压缩之后可以大大减少网络IO和磁盘IO,随之而来的开销当然是需要花费cpu进行压缩和解压缩。目前HFile的压缩支持两种方式:Gzip,Lzo。
一个 HRegion 由多个 Store 组成,每个 Store 包含一个列族的所有数据 Store 包括位于内存的 Memstore 和位于硬盘的 StoreFile。
写操作先写入 Memstore,当 Memstore 中的数据量达到某个阈值,HRegionServer 启动 FlashCache 进程写入 StoreFile,每次写入形成单独一个 StoreFile
当 StoreFile 大小超过一定阈值后,会把当前的 HRegion 分割成两个,并由 HMaster 分配给相应的 HRegion 服务器,实现负载均衡
客户端检索数据时,先在memstore找,找不到再找storefile。
WAL 意为Write ahead log,类似 mysql 中的 binlog,用来做灾难恢复时用,Hlog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。
每个Region Server维护一个Hlog,而不是每个Region一个。这样不同region(来自不同table)的日志会混在一起,这样做的目的是不断追加单个文件相对于同时写多个文件而言,可以减少磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台region server下线,为了恢复其上的region,需要将region server上的log进行拆分,然后分发到其它region server上进行恢复。
HLog文件就是一个普通的Hadoop Sequence File:
HRegionServer保存着meta表以及表数据,要访问表数据,首先Client先去访问zookeeper,从zookeeper里面获取meta表所在的位置信息,即找到这个meta表在哪个HRegionServer上保存着。
接着Client通过刚才获取到的HRegionServer的IP来访问Meta表所在的HRegionServer,从而读取到Meta,进而获取到Meta表中存放的元数据。
Client通过元数据中存储的信息,访问对应的HRegionServer,然后扫描所在HRegionServer的Memstore和Storefile来查询数据。
最后HRegionServer把查询到的数据响应给Client。
查看meta表信息
hbase(main):011:0> scan 'hbase:meta'
HFile 是存储在 HDFS 上面每一个 store 文件夹下实际存储数据的文件。里面存储多种内容。包括数据本身(keyValue 键值对)、元数据记录、文件信息、数据索引、元数据索引和一个固定长度的尾部信息(记录文件的修改情况)。
键值对按照块大小(默认 64K)保存在文件中,数据索引按照块创建,块越多,索引越大。每一个 HFile 还会维护一个布隆过滤器(就像是一个很大的地图,文件中每有一种 key,就在对应的位置标记,读取时可以大致判断要 get 的 key 是否存在 HFile 中)。KeyValue 内容如下:
由于 HFile 存储经过序列化,所以无法直接查看。可以通过 HBase 提供的命令来查看存储在 HDFS 上面的 HFile 元数据内容。

合并读取数据优化,每次读取数据都需要读取三个位置,最后进行版本的合并。效率会非常低,所有系统需要对此优化。
StoreFile Compaction也即是文件合并,由于 memstore 每次刷写都会生成一个新的 HFile,文件过多读取不方便,所以会进行文件的合并,清理掉过期和删除的数据,会进行 StoreFile Compaction。
Compaction 分为两种,分别是 Minor Compaction 和 Major Compaction。MinorCompaction会将临近的若干个较小的 HFile 合并成一个较大的 HFile,并清理掉部分过期和删除的数据,有系统使用一组参数自动控制,Major Compaction 会将一个 Store 下的所有的 HFile 合并成一个大 HFile,并且会清理掉所有过期和删除的数据,由参数 hbase.hregion.majorcompaction控制,默认 7 天。

Region Split也即是分区,Region 切分分为两种,创建表格时候的预分区即自定义分区,同时系统默认还会启动一个切分规则,避免单个 Region 中的数据量太大。

细节描述:
HBase使用MemStore和StoreFile存储对表的更新。数据在更新时首先写入Log(WAL log)和内存(MemStore)中,MemStore中的数据是排序的,当MemStore累计到一定阈值时,就会创建一个新的MemStore,并且将老的MemStore添加到flush队列,由单独的线程flush到磁盘上,成为一个StoreFile。于此同时,系统会在zookeeper中记录一个redo point,表示这个时刻之前的变更已经持久化了。当系统出现意外时,可能导致内存(MemStore)中的数据丢失,此时使用Log(WAL log)来恢复checkpoint之后的数据。
StoreFile是只读的,一旦创建后就不可以再修改。因此HBase的更新其实是不断追加的操作。当一个Store中的StoreFile达到一定的阈值后,就会进行一次合并(minor_compact, major_compact),将对同一个key的修改合并到一起,形成一个大的StoreFile,当StoreFile的大小达到一定阈值后,又会对 StoreFile进行split,等分为两个StoreFile。
由于对表的更新是不断追加的,compact时,需要访问Store中全部的 StoreFile和MemStore,将他们按row key进行合并,由于StoreFile和MemStore都是经过排序的,并且StoreFile带有内存中索引,合并的过程还是比较快。
MemStore Flush也即是写缓存刷写,MemStore 刷写由多个线程控制,条件互相独立:主要的刷写规则是控制刷写文件的大小,在每一个刷写线程中都会进行监控
任何时刻,一个HRegion只能分配给一个HRegion Server。HMaster记录了当前有哪些可用的HRegion Server。以及当前哪些HRegion分配给了哪些HRegion Server,哪些HRegion还没有分配。当需要分配的新的HRegion,并且有一个HRegion Server上有可用空间时,HMaster就给这个HRegion Server发送一个装载请求,把HRegion分配给这个HRegion Server。HRegion Server得到请求后,就开始对此HRegion提供服务。
HMaster使用zookeeper来跟踪HRegion Server状态。当某个HRegion Server启动时,会首先在zookeeper上的server目录下建立代表自己的znode。由于HMaster订阅了server目录上的变更消息,当server目录下的文件出现新增或删除操作时,HMaster可以得到来自zookeeper的实时通知。因此一旦HRegion Server上线,HMaster能马上得到消息。
当HRegion Server下线时,它和zookeeper的会话断开,zookeeper而自动释放代表这台server的文件上的独占锁。HMaster就可以确定:
无论哪种情况,HRegion Server都无法继续为它的HRegion提供服务了,此时HMaster会删除server目录下代表这台HRegion Server的znode数据,并将这台HRegion Server的HRegion分配给其它还活着的节点。
master启动进行以下步骤:
由于HMaster只维护表和region的元数据,而不参与表数据IO的过程,HMaster下线仅导致所有元数据的修改被冻结(无法创建删除表,无法修改表的schema,无法进行HRegion的负载均衡,无法处理HRegion 上下线,无法进行HRegion的合并,唯一例外的是HRegion的split可以正常进行,因为只有HRegion Server参与),表的数据读写还可以正常进行。因此HMaster下线短时间内对整个HBase集群没有影响。
从上线过程可以看到,HMaster保存的信息全是可以冗余信息(都可以从系统其它地方收集到或者计算出来)
因此,一般HBase集群中总是有一个HMaster在提供服务,还有一个以上的‘HMaster’在等待时机抢占它的位置。
1.(hbase.regionserver.global.memstore.size)默认;堆大小的40% regionServer的全局memstore的大小,超过该大小会触发flush到磁盘的操作,默认是堆大小的40%,而且regionserver级别的flush会阻塞客户端读写
2.(hbase.hregion.memstore.flush.size)默认:128M 单个region里memstore的缓存大小,超过那么整个HRegion就会flush,
3.(hbase.regionserver.optionalcacheflushinterval)默认:1h 内存中的文件在自动刷新之前能够存活的最长时间
4.(hbase.regionserver.global.memstore.size.lower.limit)默认:堆大小 * 0.4 * 0.95 有时候集群的“写负载”非常高,写入量一直超过flush的量,这时,我们就希望memstore不要超过一定的安全设置。在这种情况下,写操作就要被阻塞一直到memstore恢复到一个“可管理”的大小, 这个大小就是默认值是堆大小 * 0.4 * 0.95,也就是当regionserver级别的flush操作发送后,会阻塞客户端写,一直阻塞到整个regionserver级别的memstore的大小为 堆大小 * 0.4 *0.95为止
5.(hbase.hregion.preclose.flush.size)默认为:5M 当一个 region 中的 memstore 的大小大于这个值的时候,我们又触发了region的 close时,会先运行“pre-flush”操作,清理这个需要关闭的memstore,然后 将这个 region 下线。当一个 region 下线了,我们无法再进行任何写操作。如果一个 memstore 很大的时候,flush 操作会消耗很多时间。“pre-flush” 操作意味着在 region 下线之前,会先把 memstore 清空。这样在最终执行 close 操作的时候,flush 操作会很快。
6.(hbase.hstore.compactionThreshold)默认:超过3个 一个store里面允许存的hfile的个数,超过这个个数会被写到新的一个hfile里面 也即是每个region的每个列族对应的memstore在flush为hfile的时候,默认情况下当超过3个hfile的时候就会对这些文件进行合并重写为一个新文件,设置个数越大可以减少触发合并的时间,但是每次合并的时间就会越长
把小的storeFile文件合并成大的HFile文件。清理过期的数据,包括删除的数据 将数据的版本号保存为1个。
当HRegion达到阈值,会把过大的HRegion一分为二。默认一个HFile达到10Gb的时候就会进行切分。
HBase 当中的数据最终都是存储在 HDFS 上面的,HBase 天生的支持 MR 的操作,我们可以通过 MR 直接处理 HBase 当中的数据,并且 MR 可以将处理后的结果直接存储到 HBase 当中去。
需求:读取 HBase 当中一张表的数据,然后将数据写入到 HBase 当中的另外一张表当中去。
注意:我们可以使用 TableMapper 与 TableReducer 来实现从 HBase 当中读取与写入数据。
这里我们将 myuser 这张表当中 f1 列族的 name 和 age 字段写入到 myuser2 这张表的 f1 列族当中去。
需求一:读取 myuser 这张表当中的数据写入到 HBase 的另外一张表当中去:
第一步:创建 myuser2 这张表
注意:列族的名字要与 myuser 表的列族名字相同
hbase(main):010:0> create 'myuser2','f1'
第二步:开发 MR 的程序
public class HBaseMR extends Configured implements Tool{
public static class HBaseMapper extends TableMapper<Text,Put>{
/**
*
* @param key 我们的主键rowkey
* @param value 我们一行数据所有列的值都封装在value里面了
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
byte[] bytes = key.get();
String rowKey = Bytes.toString(bytes);
Put put = new Put(key.get());
Cell[] cells = value.rawCells();
for (Cell cell : cells) {
if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell);
}
if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
put.add(cell);
}
}
}
if(!put.isEmpty()){
context.write(new Text(rowKey),put);
}
}
}
public static class HBaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put value : values) {
context.write(null,value);
}
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "hbaseMr");
job.setJarByClass(this.getClass());
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
//使用TableMapReduceUtil 工具类来初始化我们的mapper
TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),scan,HBaseMapper.class,Text.class,Put.class,job);
//使用TableMapReduceUtil 工具类来初始化我们的reducer
TableMapReduceUtil.initTableReducerJob("myuser2",HBaseReducer.class,job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
//创建HBaseConfiguration配置
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new HBaseMR(), args);
System.exit(run);
}
}
第三步:打包运行
将我们打好的 jar 包放到服务器上执行:
yarn jar hbaseStudy-1.0-SNAPSHOT.jar cn.yuan_more.hbasemr.HBaseMR
需求二:读取 HDFS 文件,写入到 HBase 表当中去
第一步:准备数据文件
准备数据文件,并将数据文件上传到 HDFS 上面去。
第二步:开发 MR 程序
public class Hdfs2Hbase extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "hdfs2Hbase");
job.setJarByClass(Hdfs2Hbase.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/hbase/input"));
job.setMapperClass(HdfsMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
TableMapReduceUtil.initTableReducerJob("myuser2",HBaseReducer.class,job);
job.setNumReduceTasks(1);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new Hdfs2Hbase(), args);
System.exit(run);
}
public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
public static class HBaseReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(Integer.parseInt(split[2])));
context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
}
}
}
需求四:通过 bulkload 的方式批量加载数据到 HBase 当中去
加载数据到 HBase 当中去的方式多种多样,我们可以使用 HBase 的 javaAPI 或者使用 sqoop 将我们的数据写入或者导入到 HBase 当中去,但是这些方式不是慢就是在导入的过程的占用 Region 资料导致效率低下,我们也可以通过 MR 的程序,将我们的数据直接转换成 HBase 的最终存储格式 HFile,然后直接 load 数据到 HBase 当中去即可。
HBase 中每张 Table 在根目录(/HBase)下用一个文件夹存储,Table 名为文件夹名,在 Table 文件夹下每个 Region 同样用一个文件夹存储,每个 Region 文件夹下的每个列族也用文件夹存储,而每个列族下存储的就是一些 HFile 文件,HFile 就是 HBase 数据在 HFDS 下存储格式,所以 HBase 存储文件最终在 hdfs 上面的表现形式就是 HFile,如果我们可以直接将数据转换为 HFile 的格式,那么我们的 HBase 就可以直接读取加载 HFile 格式的文件,就可以直接读取了。
优点:
第一步:定义 mapper 类
public class LoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put>{
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
Put put = new Put(Bytes.toBytes(split[0]));
put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
put.addColumn("f1".getBytes(),"age".getBytes(),Bytes.toBytes(Integer.parseInt(split[2])));
context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
}
}
第二步:开发 main 程序入口类
public class HBaseLoad extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final String INPUT_PATH= "hdfs://node01:8020/hbase/input";
final String OUTPUT_PATH= "hdfs://node01:8020/hbase/output_hfile";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("myuser2"));
Job job= Job.getInstance(conf);
job.setJarByClass(HBaseLoad.class);
job.setMapperClass(LoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
int run = ToolRunner.run(configuration, new HBaseLoad(), args);
System.exit(run);
}
}
第三步:将代码打成 jar 包然后运行
yarn jar original-hbaseStudy-1.0-SNAPSHOT.jar cn.yuan_more.hbasemr.HBaseLoad
第四步:开发代码,加载数据
将输出路径下面的 HFile 文件,加载到 hbase 表当中去
public class LoadData {
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
Table table = connection.getTable(TableName.valueOf("myuser2"));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
load.doBulkLoad(new Path("hdfs://node01:8020/hbase/output_hfile"), admin,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
}
}
或者我们也可以通过命令行来进行加载数据。
先将 hbase 的 jar 包添加到 hadoop 的 classpath 路径下
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
然后执行以下命令,将 hbase 的 HFile 直接导入到表 myuser2 当中来
yarn jar /servers/hbase/lib/hbase-server-1.2.0.jar completebulkload /hbase/output_hfile myuser2
Hive与HBase整合的实现是利用两者本身对外的API接口互相通信来完成的,这种相互通信是通过$HIVE_HOME/lib/hive-hbase-handler-{hive.version}.jar工具类实现的。通过HBaseStorageHandler,Hive可以获取到Hive表所对应的HBase表名,列簇和列,InputFormat、OutputFormat类,创建和删除HBase表等。Hive访问HBase中表数据,实质上是通过MapReduce读取HBase表数据,其实现是在MR中,使用HiveHBaseTableInputFormat完成对HBase表的切分,获取RecordReader对象来读取数据。对HBase表的切分原则是一个Region切分成一个Split,即表中有多少个Regions,MR中就有多少个Map;读取HBase表数据都是通过构建Scanner,对表进行全表扫描,如果有过滤条件,则转化为Filter。当过滤条件为rowkey时,则转化为对rowkey的过滤;Scanner通过RPC调用RegionServer的next()来获取数据;
hive关联hbase实际是底层是MR,速度较慢,此时可以使用spark读取hive表,进行查询操作,从而访问hbase数据。
每一个 region 维护着 startRow 与 endRowKey,如果加入的数据符合某个 region 维护的 rowKey 范围,则该数据交给这个 region 维护。
hbase(main):001:0> create 'staff','info','partition1',SPLITS => ['1000','2000','3000','4000']
完成后如图:

hbase(main):003:0> create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
创建 splits.txt 文件内容如下:
vim splits.txt
aaaa
bbbb
cccc
dddd
然后执行:
hbase(main):004:0> create 'staff3','partition2',SPLITS_FILE => '/export/servers/splits.txt'
完成后如图:

代码如下:
@Test
public void hbaseSplit() throws IOException {
//获取连接
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
Connection connection = ConnectionFactory.createConnection(configuration);
Admin admin = connection.getAdmin();
//自定义算法,产生一系列Hash散列值存储在二维数组中
byte[][] splitKeys = {{1,2,3,4,5},{'a','b','c','d','e'}};
//通过HTableDescriptor来实现我们表的参数设置,包括表名,列族等等
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("stuff4"));
//添加列族
hTableDescriptor.addFamily(new HColumnDescriptor("f1"));
//添加列族
hTableDescriptor.addFamily(new HColumnDescriptor("f2"));
admin.createTable(hTableDescriptor,splitKeys);
admin.close();
}
单独考虑预分区没有任何意义,需要结合下一小节RowKey的设计综合考虑。
一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。
rowkey设计 + 预分区 的原则: 唯一性 散列性 长度。
HBase 是三维有序存储的,通过 rowkey(行键),column key(column family 和 qualifier)和 TimeStamp(时间戳)这个三个维度可以对 HBase 中的数据进行快速定位。
HBase 中 rowkey 可以唯一标识一行记录,在 HBase 查询的时候,有以下几种方式:
HBase中的行是按照Rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。
然而糟糕的Rowkey设计是热点的源头。 热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。
大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求,这样就造成 数据热点现象。 (这一点其实和数据倾斜类似)
所以我们在向HBase中插入数据的时候,应优化RowKey的设计,使数据被写入集群的多个region,而不是一个。尽量均衡地把记录分散到不同的Region中去,平衡每个Region的压力。
默认情况下,在创建HBase表的时候会自动创建一个region分区。这个region的rowkey是没有边界的,即没有startkey和endkey,在数据写入时,所有数据都会写入这个默认的region,随着数据量的不断增加,此region已经不能承受不断增长的数据量,会进行split,分成2个region。
在此过程中,会产生两个问题:
一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按照region分区情况,在集群内做数据的负载均衡。
示例:
# create table with specific split points
hbase>create 'table1','f1',SPLITS => ['\x10\x00', '\x20\x00', '\x30\x00','\x40\x00']
# create table with four regions based on random bytes keys
hbase>create 'table2','f1', { NUMREGIONS => 8 , SPLITALGO => 'UniformSplit' }
# create table with five regions based on hex keys
hbase>create 'table3','f1', { NUMREGIONS => 10, SPLITALGO => 'HexStringSplit' }
rowkey=MD5(username).subString(0,10)+时间戳
电信公司:
移动-----------> 136xxxx9301 ----->1039xxxx631
136xxxx1234
136xxxx2341
电信
联通
user表
rowkey name age sex address
lisi1 21 m beijing
lisi2 22 m beijing
lisi3 25 m beijing
lisi4 30 m beijing
lisi5 40 f shanghai
lisi6 50 f tianjin
需求:后期想经常按照居住地和年龄进行查询?
rowkey= address+age+随机数
beijing21+随机数
beijing22+随机数
beijing25+随机数
beijing30+随机数
rowkey= address+age+随机数
一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key] [reverse_timestamp] , [key] 的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。比如需要保存一个用户的操作记录,按照操作时间倒序排序,在设计rowkey的时候,可以这样设计。
[userId反转] [Long.Max_Value - timestamp],在查询用户的所有操作记录数据的时候,直接指定反转后的userId,startRow是[userId反转] [000000000000],stopRow是[userId反转] [Long.Max_Value -timestamp]
如果需要查询某段时间的操作记录,startRow是[user反转] [Long.Max_Value - 起始时间],stopRow是[userId反转] [Long.Max_Value - 结束时间] [Long.Max_Value - 结束时间]。
电话号码,身份证,纯数字的类型,都可以使用这个方式。
HBase中RowKey可以唯一标识一行记录,在HBase查询的时候有以下几种方式:
1.通过get方式,指定RowKey获取唯一一条记录
2.通过scan方式,设置startRow和stopRow参数进行范围匹配
3.全表扫描,即直接扫描整张表中所有行记录
从字面意思来看,RowKey就是行键的意思,在曾删改查的过程中充当了主键的作用。它可以是 任意字符串,在HBase内部RowKey保存为字节数组。
HBase中的数据是按照Rowkey的ASCII字典顺序进行全局排序的,有伙伴可能对ASCII字典序印象不够深刻,下面举例说明:
假如有5个Rowkey:“012”, “0”, “123”, “234”, “3”,按ASCII字典排序后的结果为:“0”, “012”,“123”, “234”, “3”。
因此我们设计RowKey时,需要充分利用排序存储这个特性,将经常一起读取的行存储放到一起,要避免做全表扫描,因为效率特别低。
HBase中RowKey可以唯一标识一行记录,在HBase中检索数据有以下三种方式:
通过 get 方式,指定 RowKey 获取唯一一条记录
通过 scan 方式,设置 startRow 和 stopRow 参数进行范围匹配
全表扫描,即直接扫描整张表中所有行记录
下面根据一个例子分别介绍下根据RowKey进行查询的时候支持的情况。
如果我们RowKey设计为uid+phone+name,那么这种设计可以很好的支持一下的场景:
uid= 873969725 AND phone=18900000000 AND name=zhangsan
uid= 873969725 AND phone=18900000000
uid= 873969725 AND phone=189?
uid= 873969725
难以支持的场景:
phone=18900000000 AND name = zhangsan
phone=18900000000
name=zhangsan
从上面的例子中可以看出,在进行查询的时候,根据RowKey从前向后匹配,所以我们在设计RowKey的时候选择好字段之后,还应该结合我们的实际的高频的查询场景来组合选择的字段,越高频的查询字段排列越靠左。
rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10-100bytes,以byte[]形式保存,一般设计成定长,建议越短越好,不要超过16个字节。
建议尽可能短;但是也不能太短,否则rowkey前缀重复的概率增大
设计过长会降低memstore内存的利用率和HFile存储数据的效率。
在HBase的底层存储HFile中,RowKey是KeyValue结构中的一个域。假设RowKey长度100B,那么1000万条数据中,光RowKey就占用掉 100*1000w=10亿个字节 将近1G空间,这样会极大影响HFile的存储效率。
我们目前使用的服务器操作系统都是64位系统,内存是按照8B对齐的,因此设计RowKey时一般做成8B的整数倍,如16B或者24B,可以提高寻址效率。
同样地,列族、列名的命名在保证可读的情况下也应尽量短。value永远和它的key一起传输的。当具体的值在系统间传输时,它的RowKey,列名,时间戳也会一起传输(因此实际上列族命名几乎都用一个字母,比如‘c’或‘f’)。如果你的RowKey和列名和值相比较很大,那么你将会遇到一些有趣的问题。Hfile中的索引最终占据了HBase分配的大量内存。
比如设计RowKey的时候,当Rowkey 是按时间戳的方式递增,就不要将时间放在二进制码的前面,可以将 Rowkey 的高位作为散列字段,由程序循环生成,可以在低位放时间字段,这样就可以提高数据均衡分布在每个Regionserver实现负载均衡的几率。
结合前面分析的热点现象的起因思考: 如果没有散列字段,首字段只有时间信息,那就会出现所有新数据都在一个RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer上,降低查询效率。
需要注意:由于HBase中数据存储的格式是Key-Value对格式,所以如果向HBase中同一张表插入相同RowKey的数据,则原先存在的数据会被新的数据给覆盖掉(和HashMap效果相同)。
RowKey是按照字典顺序排序存储的,因此,设计RowKey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。
一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为RowKey的一部分对这个问题十分有用,可以用 Long.Max_Value-timestamp追加到key的末尾。
例如 [key] [reverse_timestamp] , [key]的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中RowKey是有序的,第一条记录是最后录入的数据。
追求的原则是:在合理范围内能尽量少的减少列簇就尽量减少列簇。
最优设计是:将所有相关性很强的key-value都放在同一个列簇下,这样既能做到查询效率最高,也能保持尽可能少的访问不同的磁盘文件
以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族hbase的列簇越少越好!尽量就是1个
问题一:Rowkey是唯一的吗?
相同的Rowkey在HBase中认为是同一条数据的多个版本,查询时默认返回最新版本的数据,所以通常Rowkey都需要保证唯一,除非用到多版本特性。
最佳设计示例:Rowkey相当于数据库的主键。Rowkey表示一条记录。Rowkey可以是一个字段也可以是多个字段接起来。Rowkey为[userid]表示每个用户只有一条记录, Rowkey为[userid] [orderid]表示每个用户有多条记录。
问题二:满足哪种查询场景?
Rowkey的设计限制了数据的查询方式,HBase有两种查询方式。
根据完整的Rowkey查询(get方式),例如
SELECT * FROM table WHERE Rowkey = ‘abcde’
说明 get方式需要知道完整的Rowkey,即组成Rowkey所有字段的值都是确定的。
根据Rowkey的范围查询(scan方式),例如
SELECT * FROM table WHERE ‘abc’ < Rowkey <’abcx’
说明 scan方式需要知道Rowkey左边的值,例如您使用英文字典查询pre开头的所有单词,也可以查询prefi开头的所有单词,不能查询中间或结尾为prefi的单词。
最佳设计示例:在有限的查询方式下如何实现复杂查询?以下方法可以帮您实现。
再新建一张表作为索引表。
使用Filter在服务端过滤不需要的数据。
使用二级索引。
使用反向scan方法实现倒序(将新数据排在前面),
scan.setReverse(true)
说明 反向scan的性能比正常scan性能差,如果大部分是倒序场景可以体现在Rowkey设计上,例如[hostname][log-event][timestamp] => [hostname][log-event][Long.MAX_VALUE – timestamp]。
问题三:数据足够分散,会存在堆积的热点现象吗?
散列的目的是将数据分散到不同的分区,不至于产生热点使某一台服务器终止,其他服务器空闲,充分发挥分布式和并发的优势。
最佳设计示例:
[userId][orderid] => [md5(userid).subStr(0,4)][userId][orderid] 。[userId][orderid] => [reverse(userid)][orderid]。[timestamp][hostname][log-event] => [bucket][timestamp][hostname][log-event]; long bucket = timestamp % numBuckets。[userId][orderid] => [userId][orderid][random(100)]。问题四:Rowkey可以再短点吗?
短的Rowkey可以减少数据量,提高数据查询和数据写入效率。
最佳设计示例:
'2015122410' => Long(2015122410) 。’淘宝‘ => tb。问题五:使用scan方式会查询出不需要的数据吗?
会的。场景举例:table1的Rowkey为column1+ column2+ column3,如果您需要查询column1= host1的所有数据,使用scan 'table1',{startkey=> 'host1',endkey=> 'host2'}语句。如果有一条记录为column1=host12,那么此记录也会查询出来。
最佳设计示例:
[column1][column2] => [rpad(column1,'x',20)][column2]。[column1][column2] => [column1][_][column2]。[hostname][log-event][timestamp]。timestamp = Long.MAX_VALUE – timestamp; [hostname][log-event][timestamp]。long bucket = timestamp % numBuckets; [bucket][timestamp][hostname][log-event]。[seller id][timestmap][order number]。[buyer id][timestmap][order number]。[order number]。[buyer id][timestmap][order number]。一张卖家维度表Rowkey设计为[seller id][timestmap][order number]。一张订单索引表Rowkey设计为[order number]。http://hbase.apache.org/book.html#cp
Hbase 作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。
比如,在旧版本的(<0.92)Hbase 中,统计数据表的总行数,需要使用 Counter 方法,执行一次 MapReduce Job 才能得到。
虽然 HBase 在数据存储层中集成了 MapReduce,能够有效用于数据表的分布式计算。然而在很多情况下,做一些简单的相加或者聚合计算的时候, 如果直接将计算过程放置在 server 端,能够减少通讯开销,从而获得很好的性能提升。于是, HBase 在 0.92 之后引入了协处理器(coprocessors),实现一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器(谓词下推)以及访问控制等。
Observer 类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。
Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。
比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。
以 HBase0.92 版本为例,它提供了三种观察者接口:
到 0.96 版本又新增一个 RegionServerObserver
下图是以 RegionObserver 为例子讲解 Observer 这种协处理器的原理:

Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。
如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。
利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内 执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。
下图是 EndPoint 的工作原理:

协处理器的加载方式有两种,我们称之为静态加载方式( Static Load)和动态加载方式( Dynamic Load)。
静态加载的协处理器称之为 System Coprocessor
动态加载的协处理器称之为 Table Coprocessor。
通过修改 hbase-site.xml 这个文件来实现, 启动全局 aggregation,能过操纵所有的表上的数据。只需要添加如下代码:
<property>
<name>hbase.coprocessor.user.region.classesname>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementationvalue>
property>
启用表 aggregation,只对特定的表生效。通过 HBase Shell 来实现。
disable 指定表
hbase> disable 'mytable'
添加 aggregation
hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>
'|org.apache.Hadoop.hbase.coprocessor.AggregateImplementation||'
重启指定表
hbase> enable 'mytable'
协处理器卸载
disable 'mytable'
alter 'mytable', METHOD => 'table_att_unset',NAME=>'coprocessor$1'
enable 'test'
由于HBase的查询比较弱,如果需要实现类似于 select name,salary,count(1),max(salary) from user group by name,salary order by salary 等这样的复杂性的统计需求,基本上不可能,或者说比较困难,所以我们在使用HBase的时候,一般都会借助二级索引的方案来进行实现。
HBase的一级索引就是rowkey,我们只能通过rowkey进行检索。如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询。
常见的二级索引我们一般可以借助各种其他的方式来实现,例如Phoenix或者solr或者ES等。
$ sudo blockdev --setra 32768 /dev/sda
提示:ra是readahead的缩写
$ sudo sysctl -w vm.swappiness=0
$ ulimit -n 查看允许最大进程数
$ ulimit -u 查看允许打开最大文件数
修改:
$ sudo vi /etc/security/limits.conf 修改打开文件数限制
末尾添加:
* soft nofile 1024000
* hard nofile 1024000
Hive - nofile 1024000
hive - nproc 1024000
$ sudo vi /etc/security/limits.d/20-nproc.conf 修改用户打开进程数限制
修改为:
#* soft nproc 4096
#root soft nproc unlimited
* soft nproc 40960
root soft nproc unlimited
属性:dfs.namenode.handler.count
解释:该属性是NameNode服务默认线程数,的默认值是10,根据机器的可用内存可以调整为50~100
属性:dfs.datanode.handler.count
解释:该属性默认值为10,是DataNode的处理线程数,如果HDFS客户端程序读写请求比较多,可以调高到1520,设置的值越大,内存消耗越多,不要调整的过高,一般业务中,510即可。
属性:dfs.replication
解释:如果数据量巨大,且不是非常之重要,可以调整为23,如果数据非常之重要,可以调整为35。
属性:dfs.blocksize
解释:块大小定义,该属性应该根据存储的大量的单个文件大小来设置,如果大量的单个文件都小于100M,建议设置成64M块大小,对于大于100M或者达到GB的这种情况,建议设置成256M,一般设置范围波动在64M~256M之间。
mapreduce.jobtracker.handler.count
该属性是Job任务线程数,默认值是10,根据机器的可用内存可以调整为50~100
属性:mapreduce.tasktracker.http.threads
解释:定义HTTP服务器工作线程数,默认值为40,对于大集群可以调整到80~100
属性:mapreduce.task.io.sort.factor
解释:文件排序时同时合并的数据流的数量,这也定义了同时打开文件的个数,默认值为10,如果调高该参数,可以明显减少磁盘IO,即减少文件读取的次数。
属性:mapreduce.map.speculative
解释:该属性可以设置任务是否可以并发执行,如果任务多而小,该属性设置为true可以明显加快任务执行效率,但是对于延迟非常高的任务,建议改为false,这就类似于迅雷下载。
属性:mapreduce.map.output.compress、mapreduce.output.fileoutputformat.compress
解释:对于大集群而言,建议设置Map-Reduce的输出为压缩的数据,而对于小集群,则不需要。
属性:
mapreduce.tasktracker.map.tasks.maximum
mapreduce.tasktracker.reduce.tasks.maximum
解释:以上两个属性分别为一个单独的Job任务可以同时运行的Map和Reduce的数量。
设置上面两个参数时,需要考虑CPU核数、磁盘和内存容量。假设一个8核的CPU,业务内容非常消耗CPU,那么可以设置map数量为4,如果该业务不是特别消耗CPU类型的,那么可以设置map数量为40,reduce数量为20。这些参数的值修改完成之后,一定要观察是否有较长等待的任务,如果有的话,可以减少数量以加快任务执行,如果设置一个很大的值,会引起大量的上下文切换,以及内存与磁盘之间的数据交换,这里没有标准的配置数值,需要根据业务和硬件配置以及经验来做出选择。
在同一时刻,不要同时运行太多的MapReduce,这样会消耗过多的内存,任务会执行的非常缓慢,我们需要根据CPU核数,内存容量设置一个MR任务并发的最大值,使固定数据量的任务完全加载到内存中,避免频繁的内存和磁盘数据交换,从而降低磁盘IO,提高性能。
大概估算公式:
map = 2 + 2/3cpu_core
reduce = 2 + 1/3cpu_core
HDFS 不是不允许追加内容么?
属性:dfs.support.append
文件:hdfs-site.xml、hbase-site.xml
解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。
属性:dfs.datanode.max.transfer.threads
文件:hdfs-site.xml
解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096
属性:dfs.image.transfer.timeout
文件:hdfs-site.xml
解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。
属性:
mapreduce.map.output.compress
mapreduce.map.output.compress.codec
文件:mapred-site.xml
解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec
属性:dfs.datanode.failed.volumes.tolerated
文件:hdfs-site.xml
解释:默认为0,意思是当DataNode中有一个磁盘出现故障,则会认为该DataNode shutdown了。如果修改为1,则一个磁盘出现故障时,数据会被复制到其他正常的DataNode上,当前的DataNode继续工作。
属性:hbase.regionserver.handler.count
文件:hbase-site.xml
解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
属性:hbase.hregion.max.filesize
文件:hbase-site.xml
解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
属性:hbase.client.write.buffer
文件:hbase-site.xml
解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。
属性:hbase.client.scanner.caching
文件:hbase-site.xml
解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
涉及文件:hbase-env.sh
参数:·-XX:+UseParallelGC·
解释:开启并行GC
参数:-XX:ParallelGCThreads=cpu_core – 1
解释:该属性设置了同时处理垃圾回收的线程数。
参数:-XX:DisableExplicitGC
解释:防止开发人员手动调用GC
参数:zookeeper.session.timeout
文件:hbase-site.xml
解释:In hbase-site.xml, set zookeeper.session.timeout to 30 seconds or less to bound failure detection (20-30 seconds is a good start)。
该值会直接关系到master发现服务器宕机的最大周期,默认值为30秒,如果该值过小,会在HBase在写入大量数据发生而GC时,导致RegionServer短暂的不可用,从而没有向ZK发送心跳包,最终导致认为从节点shutdown。一般20台左右的集群需要配置5台zookeeper。
hbase-site.xml
(1)Zookeeper会话超时时间
属性:zookeeper.session.timeout
解释:默认值为90000毫秒(90s)。当某个RegionServer挂掉,90s之后Master才能察觉到。可适当减小此值,以加快Master响应,可调整至60000毫秒。
(2)设置RPC监听数量
属性:hbase.regionserver.handler.count
解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
(3)手动控制Major Compaction
属性:hbase.hregion.majorcompaction
解释:默认值:604800000秒(7天), Major Compaction的周期,若关闭自动Major Compaction,可将其设为0
(4)优化HStore文件大小
属性:hbase.hregion.max.filesize
解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
(5)优化HBase客户端缓存
属性:hbase.client.write.buffer
解释:默认值2097152bytes(2M)用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之会消耗更小的内存。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。
(6)指定scan.next扫描HBase所获取的行数
属性:hbase.client.scanner.caching
解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
(7)BlockCache占用RegionServer堆内存的比例
属性:hfile.block.cache.size
解释:默认0.4,读请求比较多的情况下,可适当调大
(8)MemStore占用RegionServer堆内存的比例
属性:hbase.regionserver.global.memstore.size
解释:默认0.4,写请求较多的情况下,可适当调大
Client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除 -> 当StoreFiles Compact后,逐步形成越来越大的StoreFile -> 单个StoreFile大小超过一定阈值后(默认10G),触发Split操作,把当前Region Split成2个Region,Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer 上,使得原先1个Region的压力得以分流到2个Region上
由此过程可知,HBase只是增加数据,没有更新和删除操作,用户的更新和删除都是逻辑层面的,在物理层面,更新只是追加操作,删除只是标记操作。
用户写操作只需要进入到内存即可立即返回,从而保证I/O高性能。
首先一点需要明白:Hbase是基于HDFS来存储的。
HDFS:
HBase:
Hbase 中的每张表都通过行键(rowkey)按照一定的范围被分割成多个子表(HRegion),默认一个HRegion 超过256M 就要被分割成两个,由HRegionServer管理,管理哪些 HRegion 由 Hmaster 分配。HRegion 存取一个子表时,会创建一个 HRegion 对象,然后对表的每个列族(Column Family)创建一个 store 实例, 每个 store 都会有 0 个或多个 StoreFile 与之对应,每个 StoreFile 都会对应一个HFile,HFile 就是实际的存储文件,一个 HRegion 还拥有一个 MemStore实例。
热点现象:
某个小的时段内,对HBase的读写请求集中到极少数的Region上,导致这些region所在的RegionServer处理请求量骤增,负载量明显偏大,而其他的RgionServer明显空闲。
热点现象出现的原因:
HBase中的行是按照rowkey的字典顺序排序的,这种设计优化了scan操作,可以将相关的行以及会被一起读取的行存取在临近位置,便于scan。然而糟糕的rowkey设计是热点的源头。
热点发生在大量的client直接访问集群的一个或极少数个节点(访问可能是读,写或者其他操作)。大量访问会使热点region所在的单个机器超出自身承受能力,引起性能下降甚至region不可用,这也会影响同一个RegionServer上的其他region,由于主机无法服务其他region的请求。
热点现象解决办法:
为了避免写热点,设计rowkey使得不同行在同一个region,但是在更多数据情况下,数据应该被写入集群的多个region,而不是一个。常见的方法有以下这些:
加盐:在rowkey的前面增加随机数,使得它和之前的rowkey的开头不同。分配的前缀种类数量应该和你想使用数据分散到不同的region的数量一致。加盐之后的rowkey就会根据随机生成的前缀分散到各个region上,以避免热点。
哈希:哈希可以使负载分散到整个集群,但是读却是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据
反转:第三种防止热点的方法时反转固定长度或者数字格式的rowkey。这样可以使得rowkey中经常改变的部分(最没有意义的部分)放在前面。这样可以有效的随机rowkey,但是牺牲了rowkey的有序性。反转rowkey的例子以手机号为rowkey,可以将手机号反转后的字符串作为rowkey,这样的就避免了以手机号那样比较固定开头导致热点问题
时间戳反转
:一个常见的数据处理问题是快速获取数据的最近版本,使用反转的时间戳作为rowkey的一部分对这个问题十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如[key] [reverse_timestamp],[key]的最新值可以通过scan [key]获得[key]的第一条记录,因为HBase中rowkey是有序的,第一条记录是最后录入的数据。
HBase建表预分区:创建HBase表时,就预先根据可能的RowKey划分出多个region而不是默认的一个,从而可以将后续的读写操作负载均衡到不同的region上,避免热点现象。
长度原则:100字节以内,8的倍数最好,可能的情况下越短越好。因为HFile是按照 keyvalue 存储的,过长的rowkey会影响存储效率;其次,过长的rowkey在memstore中较大,影响缓冲效果,降低检索效率。最后,操作系统大多为64位,8的倍数,充分利用操作系统的最佳性能。
散列原则:高位散列,低位时间字段。避免热点问题。
唯一原则:分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问 的数据放到一块。
原则:在合理范围内能尽量少的减少列簇就尽量减少列簇,因为列簇是共享region的,每个列簇数据相差太大导致查询效率低下。
最优:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查询效率最高,也能保持尽可能少的访问不同的磁盘文件。以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在另一列族。
在 hbase 中每当有 memstore 数据 flush 到磁盘之后,就形成一个 storefile,当 storeFile的数量达到一定程度后,就需要将 storefile 文件来进行 compaction 操作。
Compact 的作用: