官方地址
访问HBase的方式是使用scan或get获取数据,在获取到的数据上进行业务运算。但是在数据量非常大的时候,比如一个有上亿行及十万个列的数据集,再按常用的方式移动获取数据就会遇到性能问题。客户端也需要有强大的计算能力以及足够的内存来处理这么多的数据。
此时就可以考虑使用Coprocessor(协处理器)。将业务运算代码封装到Coprocessor中并在RegionServer上运行,即在数据实际存储位置执行,最后将运算结果返回到客户端。利用协处理器,用户可以编写运行在 HBase Server 端的代码。
Hbase Coprocessor类似以下概念
触发器和存储过程:一个Observer Coprocessor有些类似于关系型数据库中的触发器,通过它我们可以在一些事件(如Get或是Scan)发生前后执行特定的代码。Endpoint Coprocessor则类似于关系型数据库中的存储过程,因为它允许我们在RegionServer上直接对它存储的数据进行运算,而非是在客户端完成运算。
MapReduce:MapReduce的原则就是将运算移动到数据所处的节点。Coprocessor也是按照相同的原则去工作的。
AOP:如果熟悉AOP的概念的话,可以将Coprocessor的执行过程视为在传递请求的过程中对请求进行了拦截,并执行了一些自定义代码。
Observer
协处理器与触发器(trigger)类似:在一些特定事件发生时回调函数(也被称作钩子函数,hook)被执行。这些事件包括一些用户产生的事件,也包括服务器端内部自动产生的事件。
协处理器框架提供的接口如下
Endpoint
这类协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器在Regionserver中执行一段代码,并将 RegionServer 端执行结果返回给客户端进一步处理。
Endpoint常见用途
聚合操作
需求:通过协处理器Observer实现Hbase当中t1表插入数据,指定的另一张表t2也需要插入相对应的数据。
create 't1','info'
create 't2','info'
实现思路:通过Observer协处理器捕捉到t1插入数据时,将数据复制一份并保存到t2表中
开发步骤:
(1)编写Observer协处理器,添加依赖
- <dependency>
- <groupId>org.apache.hbasegroupId>
- <artifactId>hbase-serverartifactId>
- <version>1.3.1version>
- dependency>
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.Durability;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.client.Put;
- import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
- import org.apache.hadoop.hbase.coprocessor.ObserverContext;
- import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
- import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
- import org.apache.hadoop.hbase.util.Bytes;
-
- import java.io.IOException;
- import java.util.List;
-
-
- //重写prePut方法,监听到向t1表插入数据时,执行向t2表插入数据的代码
- public class MyProcessor extends BaseRegionObserver {
- @Override
- public void prePut(ObserverContext
e, Put put, WALEdit edit, Durability durability) throws IOException { - //把自己需要执行的逻辑定义在此处,向t2表插入数据,数据具体是什么内容与Put一样
- //获取t2表table对象
- final HTable t2 = (HTable) e.getEnvironment().getTable(TableName.valueOf("t2"));
- //解析t1表的插入对象put
- final Cell cell = put.get(Bytes.toBytes("info"), Bytes.toBytes("name")).get(0);
- //table对象.put
- final Put put1 = new Put(put.getRow());
- put1.add(cell);
- t2.put(put1); //执行向t2表插入数据
- t2.close();
- }
- }
(2)打成Jar包,上传HDFS
- cd /opt/lagou/softwares
- mv original-hbaseStudy-1.0-SNAPSHOT.jar processor.jar
- hdfs dfs -mkdir -p /processor
- hdfs dfs -put processor.jar /processor
(3)挂载协处理器
- hbase(main):056:0> describe 't1'
- hbase(main):055:0> alter 't1',METHOD => 'table_att','Coprocessor'=>'hdfs://linux121:9000/processor/processor.jar|com.lagou.hbase.processor.MyProcessor|1001|'
-
- #再次查看't1'表,
- hbase(main):043:0> describe 't1'
(4)验证协处理器
# 向t1表中插入数据(shell方式验证)
put 't1','rk1','info:name','lisi'
(5)卸载协处理器
- disable 't1'
- alter 't1',METHOD=>'table_att_unset',NAME=>'coprocessor$1' enable 't2'