• java实现hbase数据导出


    1. HBase-client方式实现

    1.1 依赖

     <!--HBase依赖坐标-->
     <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.6</version>
            <exclusions><!--排除依赖:不加入这句会报错-->
                <exclusion>
                    <groupId>*</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    1.2 配置及代码

    1.2.1 get方式
    public class HBaseService {
        private static final Logger logger = LoggerFactory.getLogger(HBaseService.class);
    
        /**
         * 配置文件读取的配置信息
         */
        static Configuration configuration = HBaseConfiguration.create();
    
        /**
         * 链接信息
         */
        private static Connection conn = null;
    
        static {
            try {
                conn = ConnectionFactory.createConnection(configuration);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        /**
         * 进行数据的查询以及写入到文件中(通过get方式查询获得数据并写入文件)
         * @param rowKey rowKey信息
         * @param tableName 表名
         * @param dirName 文件目录
         * @param fileExist 文件是否存在的标志
         */
        public static void addInfoToFile(String rowKey, String tableName, String dirName, boolean fileExist){
            Table table = null;
            ResultScanner result = null;
            try {
                Connection connection = ConnectionFactory.createConnection(configuration);
                table = connection.getTable(TableName.valueOf(tableName));
                List<Get> gets = new ArrayList<>();
                Get get = new Get(Bytes.toBytes(rowKey));
                gets.add(get);
                // result的集合
                Result[] resultArr = table.get(gets);
                Map<String, Map<String,String>> dataMap = new HashMap<>();
                for (Result r : resultArr) {
                    String rowKey1 = Bytes.toString(r.getRow());
                    Map<String, String> columnDataMap;
                    if (dataMap.containsKey(rowKey1)){
                        columnDataMap = dataMap.get(rowKey1);
                    }else {
                        columnDataMap = new HashMap<>();
                    }
                    for (Cell kv : r.rawCells()) {
                        String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
                        String value = Base64Encoder.encode(CellUtil.cloneValue(kv));
                        columnDataMap.put(qualifire, value);
                        dataMap.put(rowKey1, columnDataMap);
                    }
                }
    
                 if (MapUtil.isNotEmpty(dataMap)){
                     for (String r : dataMap.keySet()) {
                         Map<String, String> columnMap = dataMap.get(r);
                         StrBuilder lineStr = new StrBuilder();
                         lineStr.append(r + "||");
                         for (String s : columnMap.keySet()) {
                             lineStr.append(s + ":" + columnMap.get(s) + "\t");
                         }
                         String fileName = dirName + File.separator + "data.txt";
                         File f = new File(fileName);
                         if (!f.exists()){
                             try {
                                 f.createNewFile();
                             }catch (IOException e){
                                 logger.error("创建文件失败,异常信息:{}", e.getMessage());
                             }
                         }
                         BufferedWriter writer = new BufferedWriter(
                                 new FileWriter(fileName, true));
    
                         writer.write(lineStr.toString()  + "\n");
                         logger.info("写入rowkey:{}的波形数据到:{}", r, fileName);
                         writer.close();
                     }
                }
            }catch (Exception e){
                logger.error("写入rowkey:{}的波形数据到:{}失败,错误的信息:{}", rowKey, dirName, e.getMessage());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    1.3.1 Scan方式
       /**
         * 通过scan的方式进行数据获取
         * @param rowKey rowkey
         * @param startKey 开始的rowKey
         * @param stopKey 结束的rowKey
         * @param regexStr rowKey的正则匹配表达式
         */
        public static void findRowKey(String rowKey, String startKey, String stopKey, String regexStr){
            Table table = null;
            ResultScanner result = null;
            try {
                TableName[] tbs = conn.getAdmin().listTableNames();
                FilterList filters = new FilterList();
                table = conn.getTable(TableName.valueOf("Vibration_WaveData"));
                Scan scan = new Scan();
                // 通过正则匹配的方式+rowkey进行数据过滤
                RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
                RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
                // 设置start和stop Rowkey 可以提供检索效率
                scan.setStartRow(startKey.getBytes());
                scan.setStopRow(stopKey.getBytes());
                scan.setFilter(rowFilter);
                // 每次从服务器端获取的行数
                scan.setCaching(100000);
                ResultScanner result1 = table.getScanner(scan);
                for (Result r : result1) {
                    for (KeyValue kv : r.raw()) {
                        System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, timestamp:%s.",
                                Bytes.toString(kv.getRow()),
                                Bytes.toString(kv.getFamily()),
                                Bytes.toString(kv.getQualifier()),
                                Bytes.toString(kv.getValue()),
                                kv.getTimestamp()));
                    }
                }
                result1.close();
                conn.close();
            }catch (Exception e){
                System.out.println(e.getMessage());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    2. mapReduce实现

    2.1 依赖

     <!--hadoop依赖坐标-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.6</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
                <version>2.7.6</version>
            </dependency>
            <dependency>
                <groupId>commons-cli</groupId>
                <artifactId>commons-cli</artifactId>
                <version>1.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.6</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.2 配置文件

    hbase-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
            <!-- 指定 hbase 是分布式的 -->
            <name>hbase.cluster.distributed</name>
            <value>true</value>
        </property>
        <property>
            <!-- 指定 zk 的地址,多个用“,”分割 -->
            <name>hbase.zookeeper.quorum</name>
            <value>192.168.1.100:2181,192.168.1.102:2181</value>
        </property>
    
        <!-- 开启 uber 模式,默认关闭 -->
        <property>
            <name>mapreduce.job.ubertask.enable</name>
            <value>true</value>
        </property>
        <!-- uber 模式中最大的 mapTask 数量,可向下修改 -->
        <property>
            <name>mapreduce.job.ubertask.maxmaps</name>
            <value>9</value>
        </property>
        <!-- uber 模式中最大的 reduce 数量,可向下修改 -->
        <property>
            <name>mapreduce.job.ubertask.maxreduces</name>
            <value>1</value>
        </property>
        <!-- uber 模式中最大的输入数据量,默认使用 dfs.blocksize 的值,可向下修改 -->
        <property>
            <name>mapreduce.job.ubertask.maxbytes</name>
            <value></value>
        </property>
    </configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    2.3 导出的代码

    public class ReadHbaseDataByMRToHDFS {
    static Configuration configuration = HBaseConfiguration.create();
        /**
         * 进行hbase数据导出的操作
         * @param tableName 表名
         * @param dirName   文件夹名称
         * @param startRow  开始的row key
         * @param stopRow   结束的row key
         * @param regexStr  进行匹配的字符
         */
        public void exportHbaseData(String tableName, String dirName, String startRow, String stopRow, String regexStr) {
        
            logger.info("开始进行HBase数据导出,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{}", tableName, dirName, startRow, stopRow, regexStr);
            System.setProperty("HADOOP_USER_NAME", "root");
            // 一次rpc请求的超时时间,如果某次RPC请求超过该值,客户端就会主动管理Socket
            configuration.set("hbase.rpc.timeout", "600000");
            // ,该参数是表示HBase客户端发起一次scan操作的rpc调用至得到响应之间总的超时时间
            configuration.set("hbase.client.scanner.timeout.period", "600000");
            configuration.set("mapreduce.job.ubertask.maxmaps", "10");
            configuration.set("mapreduce.job.ubertask.maxreduces", "1");
            configuration.set("mapreduce.task.io.sort.mb", "1024");
            configuration.set("mapred.map.tasks", "10");
            try {
                Job job = Job.getInstance(configuration);
                job.setJarByClass(ReadHbaseDataByMRToHDFS.class);
                //设置reduce个数
                job.setNumReduceTasks(0);
                //设置map
                Scan scan = new Scan();
    // 设置start和stop rowkey以及regex提高检索效率
                RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
                scan.setStartRow(startRow.getBytes()).setStopRow(stopRow.getBytes());
                RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
                scan.setFilter(rowFilter);
                // 每次从服务器端获取的行数
                scan.setCaching(900000);
                //参数false,关于添加依赖jar
                TableMapReduceUtil.initTableMapperJob(tableName,
                        scan,
                        ReadHBaseDataByMRToHDFSMapper.class,
                        Text.class,
                        NullWritable.class,
                        job,
                        false);
    
                //输出目录
                FileOutputFormat.setOutputPath(job, new Path(dirName));
                //提交
                boolean isDone = job.waitForCompletion(true);
                if (isDone){
                    Thread.sleep(3000);
                    logger.info("进行HBase数据导出成功,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},状态:{}", tableName, dirName, startRow, stopRow, regexStr, isDone);
                }
              } catch (Exception e) {
                logger.error("进行HBase数据导出时出现异常,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},异常信息:{}",
                        tableName, dirName, startRow, stopRow, regexStr, e.getMessage());
            }
        }
    
        /**
         * 参数
         * ImmutableBytesWritable
         * Result :HBase中的数据每次取出来是一个Result:就是一个rowkey做一个result
         * <p>
         * keyOut:
         * valueOut:
         */
        static class ReadHBaseDataByMRToHDFSMapper extends TableMapper<Text, NullWritable> {
            Text outKey = new Text();
            @Override
            protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
                List<Cell> cells = value.listCells();
                Map<String, Map<String, String>> cellMap = new HashMap<>();
                //一个cell一条数据 包含一个column
                for (Cell cell : cells) {
                    String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
                    Map<String, String> columnMap = new HashMap<>();
                    if (cellMap.containsKey(rowkey)){
                        columnMap = cellMap.get(rowkey);
                    }
                    // String family = Bytes.toString(CellUtil.cloneFamily(cell));
                    String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String columnValue = Base64Encoder.encode(CellUtil.cloneValue(cell));
                    columnMap.put(column, columnValue);
                    cellMap.put(rowkey, columnMap);
                    // long timeStamp = cell.getTimestamp();
                   // outKey.set(rowkey + "\t\t" + column + "\t\t" + columnValue + "\n");
                }
                if (CollUtil.isNotEmpty(cellMap)){
                    String lineStr = "";
                    for (String s : cellMap.keySet()) {
                        Map<String, String> columnMap = cellMap.get(s);
                        lineStr = s + "||";
                        for (String c : columnMap.keySet()) {
                            lineStr += c + ":" + columnMap.get(c) + "\t";
                        }
                    }
                    outKey.set(lineStr);
                    context.write(outKey, NullWritable.get());
                    outKey.clear();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
  • 相关阅读:
    MYSQL_Explain概述、详解id、select_type、table、type、possible_keys、key、key_len、ref、rows、Extra列
    Vue3 学习笔记 —— 局部/全局组件、递归组件、动态组件、异步组件
    Go语言学习之路(二)
    Redis布隆过滤器
    Educational Codeforces Round 156 (Rated for Div. 2)
    AI大模型之路 第二篇: Word2Vec介绍
    【学习笔记】EC-Final 2022 K. Magic
    微信小程序开发学习文档(万字总结,一篇搞定前端开发)
    Java项目-苍穹外卖-Day11-Apache ECharts数据统计
    wireshark数据包内容查找功能详解
  • 原文地址:https://blog.csdn.net/github_38924695/article/details/134003247