• 112.HBase Endpoint类型的Coprocessor开发与部署


    112.1 准备环境

    • 下载Protobuf2.5.0版本的安装包:
    https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.gz
    
    • 1
    • 选择一台服务器安装Protobuf
    [root@ip-186-31-5-38 ~]# wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.gz
    
    • 1
    • 安装Protobuf所需要的依赖包
    yum install -y autoconf automake libtool curl make g++ unzip gcc-c++
    
    • 1
    • 解压protobuf-2.5.0.tar.gz
    [root@ip-186-31-5-38 ~]# tar -zxvf protobuf-2.5.0.tar.gz
    [root@ip-186-31-5-38 ~]# cd protobuf-2.5.0
    [root@ip-186-31-5-38 protobuf-2.5.0]# ./configure --prefix=/usr/local/protobuf
    [root@ip-186-31-5-38 protobuf-2.5.0]# make && make install
    
    • 1
    • 2
    • 3
    • 4
    • Protobuf环境变量配置
    export PROTOBUF_HOME=/usr/local/protobuf
    export PATH=$PROTOBUF_HOME/bin:$PATH
    
    • 1
    • 2
    • 环境变量生效
    [root@ip-186-31-5-38 protobuf-2.5.0]# source /etc/profile
    
    • 1
    • HBase测试表
    create 'fayson_coprocessor', {NAME => 'info'}
    put 'fayson_coprocessor','001','info:sales',12.3
    put 'fayson_coprocessor','002','info:sales',24.5
    put 'fayson_coprocessor','003','info:sales',10.5
    put 'fayson_coprocessor','004','info:sales',11.5
    put 'fayson_coprocessor','005','info:sales',10.5
    put 'fayson_coprocessor','001','info:age',22
    put 'fayson_coprocessor','002','info:age',33
    put 'fayson_coprocessor','003','info:age',26
    put 'fayson_coprocessor','004','info:age',28
    put 'fayson_coprocessor','005','info:age',56
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    112.2 生成序列化类

    • MyFirstCoprocessor.proto文件
    [root@ip-186-31-5-171 hbase-coprocessor]# vim MyFirstCoprocessor.proto 
    syntax = "proto2";
    option java_package = "com.cloudera.hbase.coprocessor.server";
    option java_outer_classname = "MyFirstCoprocessor";
    option java_generic_services = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    message MyCoprocessRequest {
        required string family = 1;
        required string columns = 2;
    }
    message MyCoprocessResponse {
        required int64 count = 1;
        required double maxnum = 3;
        required double minnum = 4;
        required double sumnum = 5;
    }
    service AggregationService {
      rpc getAggregation(MyCoprocessRequest)
        returns (MyCoprocessResponse);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 生成Java类
    [root@ip-186-31-5-38 hbase-coprocessor]# protoc --java_out=./ MyFirstCoprocessor.proto 
    [root@ip-186-31-5-38 hbase-coprocessor]# ll
    total 4
    drwxr-xr-x 3 root root  22 May 14 16:34 com
    -rw-r--r-- 1 root root 609 May 14 16:33 MyFirstCoprocessor.proto
    [root@ip-186-31-5-38 hbase-coprocessor]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    112.3 服务端实现

    • pom.xml文件内容
    
        org.apache.hadoop
        hadoop-client
        2.6.0-cdh5.11.2
    
    
        org.apache.hadoop
        hadoop-common
        2.6.0-cdh5.11.2
    
    
        org.apache.hbase
        hbase-client
        1.2.0-cdh5.11.2
    
    
        org.apache.hbase
        hbase-examples
        1.2.0-cdh5.11.2
    
    
        com.google.protobuf
        protobuf-java
        2.5.0
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • Protobuf生成的java类拷贝至指定的包目录下与MyFirstCoprocessor.proto文件指定的java_package包目录一致
    • MyFirstCoprocessorEndpoint实现类
    package com.cloudera.hbase.coprocessor.server;
    import com.google.protobuf.RpcCallback;
    import com.google.protobuf.RpcController;
    import com.google.protobuf.Service;
    import org.apache.commons.collections.map.HashedMap;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.protobuf.ResponseConverter;
    import org.apache.hadoop.hbase.regionserver.InternalScanner;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    public class MyFirstCoprocessorEndPoint extends MyFirstCoprocessor.AggregationService implements Coprocessor, CoprocessorService {
        protected static final Log log = LogFactory.getLog(MyFirstCoprocessorEndPoint.class);
        private RegionCoprocessorEnvironment env;
        @Override
        public void getAggregation(RpcController controller, MyFirstCoprocessor.MyCoprocessRequest request, RpcCallback done) {
            Scan scan = new Scan();
            scan.addFamily(Bytes.toBytes(request.getFamily()));
            //传入列的方式   sales:MAX,sales:MIN,sales:AVG,slaes:SUM,sales:COUNT
            String colums = request.getColumns();
            //记录所有要扫描的列
            Map> columnMaps = new HashedMap();
            for (String columnAndType : colums.split(",")) {
                String column = columnAndType.split(":")[0];
                String type = columnAndType.split(":")[1];
                List typeList = null;
                if (columnMaps.containsKey(column)) {
                    typeList = columnMaps.get(column);
                } else {
                    typeList = new ArrayList<>();
                    //将column添加到Scan中
                    scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
                }
                typeList.add(type);
                columnMaps.put(column, typeList);
            }
            InternalScanner scanner = null;
            MyFirstCoprocessor.MyCoprocessResponse response = null;
            Double max = null;
            Double min = null;
            Double sumVal = null;
            long counter = 0L;
            try {
                scanner = this.env.getRegion().getScanner(scan);
                List results = new ArrayList<>();
                boolean hasMore = false;
                scanner = env.getRegion().getScanner(scan);
                do {
                    hasMore = scanner.next(results);
                    if (results.size() > 0) {
                        ++counter;
                    }
                    log.info("counter:" + counter);
                    log.info("results size:" + results.size());
                    for (Cell cell : results) {
                        String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                        log.info("Column Name: " + column);
                        log.info("Cell Value:" + new String(CellUtil.cloneValue(cell)));
                        Double temp = Double.parseDouble(new String(CellUtil.cloneValue(cell)));
                        if (columnMaps.containsKey(column)) {
                            List types = columnMaps.get(column);
                            for (String type : types) {
                                switch (type.toUpperCase()) {
                                    case "MIN":
                                        min = min != null && (temp == null || compare(temp, min) >= 0) ? min : temp;
                                        log.info("MIN Value: " + min.doubleValue());
                                        break;
                                    case "MAX":
                                        max = max != null && (temp == null || compare(temp, max) <= 0) ? max : temp;
                                        break;
                                    case "SUM":
                                        if (temp != null) {
                                            sumVal = add(sumVal, temp);
                                        }
                                        break;
                                    default:
                                        break;
                                }
                            }
                        }
                    }
                    results.clear();
                } while (hasMore);
                response = MyFirstCoprocessor.MyCoprocessResponse.newBuilder()
                        .setMaxnum(max!=null?max.doubleValue():Double.MAX_VALUE)
                        .setMinnum(min!=null?min.doubleValue():Double.MIN_NORMAL)
                        .setCount(counter)
                        .setSumnum(sumVal!=null?sumVal.doubleValue():Double.MIN_NORMAL).build();
            } catch (IOException e) {
                e.printStackTrace();
                ResponseConverter.setControllerException(controller, e);
            } finally {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            done.run(response);
        }
        public static int compare(Double l1, Double l2) {
            if (l1 == null ^ l2 == null) {
                return l1 == null ? -1 : 1; // either of one is null.
            } else if (l1 == null)
                return 0; // both are null
            return l1.compareTo(l2); // natural ordering.
        }
        public double divideForAvg(Double d1, Long l2) {
            return l2 != null && d1 != null?d1.doubleValue() / l2.doubleValue():0.0D / 0.0;
        }
        public Double add(Double d1, Double d2) {
            return d1 != null && d2 != null ? Double.valueOf(d1.doubleValue() + d2.doubleValue()) : (d1 == null ? d2 : d1);
        }
        @Override
        public void start(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
            if (coprocessorEnvironment instanceof RegionCoprocessorEnvironment) {
                this.env = (RegionCoprocessorEnvironment) coprocessorEnvironment;
            } else {
                throw new CoprocessorException("Must be loaded on a table region!");
            }
        }
        @Override
        public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
        }
        @Override
        public Service getService() {
            return this;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143

    112.4 客户端实现

    • MyFirstCoprocessExample.java类
    package com.cloudera.hbase.coprocessor.client;
    import com.cloudera.hbase.coprocessor.server.MyFirstCoprocessor;
    import com.cloudera.hbase.coprocessor.server.MyFirstCoprocessorEndPoint;
    import com.google.common.util.concurrent.AtomicDouble;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.client.coprocessor.Batch;
    import org.apache.hadoop.hbase.client.coprocessor.DoubleColumnInterpreter;
    import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
    import java.io.IOException;
    import java.util.concurrent.atomic.AtomicLong;
    
    public class MyFirstCoprocessExample {
        public static void main(String[] args) {
            String table_name = "fayson_coprocessor";
            //初始化HBase配置
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            configuration.setStrings("hbase.zookeeper.quorum", "ip-186-31-5-38.ap-southeast-1.compute.internal,ip-186-31-8-230.ap-southeast-1.compute.internal,ip-186-31-5-171.ap-southeast-1.compute.internal");
            try {
                //创建一个HBase的Connection
                Connection connection = ConnectionFactory.createConnection(configuration);
                TableName tableName = TableName.valueOf(table_name);
                if(!connection.getAdmin().tableExists(tableName)) {
                    System.out.println(table_name + "does not exist....");
                    System.exit(0);
                }
                Table table = connection.getTable(tableName);
                //删除表上的协处理器
                deleteCoprocessor(connection, table, MyFirstCoprocessorEndPoint.class);
                //为指定的表添加协处理器
                String hdfspath = "hdfs://nameservice3/hbase/coprocessor/hbase-demo-1.0-SNAPSHOT.jar";
                setupToExistTable(connection, table, hdfspath, MyFirstCoprocessorEndPoint.class);
                //客户端调用Region端的协处理器
                execFastEndpointCoprocessor(table, "info", "sales:MAX,sales:MIN,sales:AVG,sales:SUM,sales:COUNT");
                //关闭连接
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        /**
         * 删除HBase表上的协处理器
         * @param connection
         * @param table
         * @param cls
         */
        public static void deleteCoprocessor(Connection connection, Table table, Class... cls) {
            System.out.println("begin delete " + table.getName().toString() + " Coprocessor......");
            try {
                HTableDescriptor hTableDescriptor = table.getTableDescriptor();
                for(Class cass : cls) {
                    hTableDescriptor.removeCoprocessor(cass.getCanonicalName());
                }
                connection.getAdmin().modifyTable(table.getName(), hTableDescriptor);
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("end delete " + table.getName().toString() + " Coprocessor......");
        }
        /**
         *
         * @param connection
         * @param table
         * @param jarPath
         * @param cls
         */
        public static void setupToExistTable(Connection connection, Table table, String jarPath, Class... cls) {
            try {
                if(jarPath != null && !jarPath.isEmpty()) {
                    Path path = new Path(jarPath);
                    HTableDescriptor hTableDescriptor = table.getTableDescriptor();
                    for(Class cass : cls) {
                        hTableDescriptor.addCoprocessor(cass.getCanonicalName(), path, Coprocessor.PRIORITY_USER, null);
                    }
                    connection.getAdmin().modifyTable(table.getName(), hTableDescriptor);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        /**
         * 效率最高的方式,在方式二的基础上优化
         * 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback)方法获取表的总条数
         * @param table HBase表名
         * @return 返回表的总条数
         */
        public static long execFastEndpointCoprocessor(Table table, String family, String columns) {
            long start_t = System.currentTimeMillis();
            //定义总的 rowCount 变量
            AtomicLong totalRowCount = new AtomicLong();
            AtomicDouble maxValue = new AtomicDouble(Double.MIN_VALUE);
            AtomicDouble minValue = new AtomicDouble(Double.MAX_VALUE);
            AtomicDouble sumValue = new AtomicDouble();
            try {
                Batch.Callback callback = new Batch.Callback() {
                    @Override
                    public void update(byte[] bytes, byte[] bytes1, MyFirstCoprocessor.MyCoprocessResponse myCoprocessResponse) {
                        //更新Count值
                        totalRowCount.getAndAdd(myCoprocessResponse.getCount());
                        //更新最大值
                        if(myCoprocessResponse.getMaxnum() > maxValue.doubleValue()) {
                            maxValue.compareAndSet(maxValue.doubleValue(), myCoprocessResponse.getMaxnum());
                        }
                        //更新最小值
                        if(myCoprocessResponse.getMinnum() < minValue.doubleValue()) {
                            minValue.compareAndSet(minValue.doubleValue(), myCoprocessResponse.getMinnum());
                        }
                        //更新求和
                        sumValue.getAndAdd(myCoprocessResponse.getSumnum());
                    }
                };
                table.coprocessorService(MyFirstCoprocessor.AggregationService.class, null, null, new Batch.Call() {
                    @Override
                    public MyFirstCoprocessor.MyCoprocessResponse call(MyFirstCoprocessor.AggregationService aggregationService) throws IOException {
                        MyFirstCoprocessor.MyCoprocessRequest requet = MyFirstCoprocessor.MyCoprocessRequest.newBuilder().setFamily(family).setColumns(columns).build();
                        BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>();
                        aggregationService.getAggregation(null, requet, rpcCallback);
                        MyFirstCoprocessor.MyCoprocessResponse response = rpcCallback.get();
                        return response;
                    }
                }, callback);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            System.out.println("耗时:" + (System.currentTimeMillis() - start_t));
            System.out.println("totalRowCount:" + totalRowCount.longValue());
            System.out.println("maxValue:" + maxValue.doubleValue());
            System.out.println("minValue:" + minValue.doubleValue());
            System.out.println("sumValue:" + sumValue.doubleValue());
            System.out.println("avg:" + new DoubleColumnInterpreter().divideForAvg(sumValue.doubleValue(), totalRowCount.longValue()));
            return totalRowCount.longValue();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141

    112.5 部署及调用

    • mvn编译
    mvn clean package
    
    • 1
    • 上传HDFS的/hbase/coprocessor目录
    [root@ip-186-31-5-38 ~]# export HADOOP_USER_NAME=hbase         
    [root@ip-186-31-5-38 ~]# hadoop fs -mkdir -p /hbase/coprocessor
    [root@ip-186-31-5-38 ~]# hadoop fs -put hbase-demo-1.0-SNAPSHOT.jar /hbase/coprocessor
    [root@ip-186-31-5-38 ~]# hadoop fs -ls /hbase/coprocessor
    
    • 1
    • 2
    • 3
    • 4

    大数据视频推荐:
    CSDN
    大数据语音推荐:
    ELK7 stack开发运维
    企业级大数据技术应用
    大数据机器学习案例之推荐系统
    自然语言处理
    大数据基础
    人工智能:深度学习入门到精通

  • 相关阅读:
    性能测试工具:Jmeter介绍
    干测试这些年,去过阿里也去过小公司,给年轻测试员们一个忠告....
    ArcGIS综合制图教程,简单上手!
    队列(上取整应用)
    mybatis复习
    图神经网络入门(理论篇)
    处理器ChannelHandler的线程安全问题
    Autofac 注入仓储模式
    OpenMLDB新手入门开源介绍
    手把手教你深度学习和实战-----全连接神经网络
  • 原文地址:https://blog.csdn.net/m0_47454596/article/details/126164333