• 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析


    08:离线分析:Hbase表设计及构建

    • 目标掌握Hbase表的设计及创建表的实现

    • 路径

      • step1:基础设计
      • step2:Rowkey设计
      • step3:分区设计
      • step4:建表
    • 实施

      • 基础设计

        • Namespace:MOMO_CHAT

        • Table:MOMO_MSG

        • Family:C1

        • Qualifier:与数据中字段名保持一致

          image-20210905200550740

      • Rowkey设计

        • 查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录

          • 发件人账号
          • 收件人账号
          • 时间
        • 设计规则:业务、唯一、长度、散列、组合

        • 设计实现

          • 加盐方案:CRC、Hash、MD5、MUR
          • => 8位、16位、32位
          MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
          
          • 1
      • 分区设计

        • Rowkey前缀:MD5编码,由字母和数字构成
        • 数据并发量:高
        • 分区设计:使用HexSplit16进制划分多个分区
      • 建表

        • 启动Hbase:start-hbase.sh
        • 进入客户端:hbase shell
        #创建NS
        create_namespace 'MOMO_CHAT'
        #建表
        create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
        
        • 1
        • 2
        • 3
        • 4

        image-20210905192807020

    • 小结

      • 掌握Hbase表的设计及创建表的实现

    09:离线分析:Kafka消费者构建

    • 目标实现离线消费者的开发

    • 路径

      • 整体实现的路径

        //入口:调用实现消费Kafka,将数据写入Hbase
        public void main(){
            //step1:消费Kafka
            consumerKafka();
            
        }
        
        //用于消费Kafka数据
        public void consumerKafka(){
            prop = new Properties()
        	KafkaConsumer consumer = new KafkaConsumer(prop)
            consumer.subscribe("MOMO_MSG")
            ConsumerRecords  records = consumer.poll
            //基于每个分区来消费和处理
                recordTopicPartitionOffsetKeyValue
            	//step2:写入Hbase
                writeToHbase(value)
            //提交这个分区的offset
             commitSycn(offset+1)
        }
        
        
        //用于将value的数据写入Hbase方法
        public void writeToHbase(){
            //step1:构建连接
            //step2:构建Table对象
            //step3:构建Put对象
            //获取rowkey
           rowkey = getRowkey(value)
            Put put = new Put(rowkey)
            put.添加每一列
            table.put()
        }
        
        public String getRowkey(){
            value.getSender
            value.getReceiver
            value.getTime
                rowkey = MD5+sender+receiverId +time
                return rowkey
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
    • 实施

          /**
           * 用于消费Kafka的数据,将合法数据写入Hbase
           */
          private static void consumerKafkaToHbase() throws Exception {
              //构建配置对象
              Properties props = new Properties();
              //指定服务端地址
              props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
              //指定消费者组的id
              props.setProperty("group.id", "momo");
              //关闭自动提交
              props.setProperty("enable.auto.commit", "false");
              //指定K和V反序列化的类型
              props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              //构建消费者的连接
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
              //指定订阅哪些Topic
              consumer.subscribe(Arrays.asList("MOMO_MSG"));
              //持续拉取数据
              while (true) {
                  //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
                  //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
                  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                  //todo:3-处理拉取到的数据:打印
                  //取出每个分区的数据进行处理
                  Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
                  //对每个分区的数据做处理
                  for (TopicPartition partition : partitions) {
                      List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
                      //处理这个分区的数据
                      long offset = 0;
                      for (ConsumerRecord<String, String> record : partRecords) {
                          //获取Topic
                          String topic = record.topic();
                          //获取分区
                          int part = record.partition();
                          //获取offset
                          offset = record.offset();
                          //获取Key
                          String key = record.key();
                          //获取Value
                          String value = record.value();
                          System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                          //将Value数据写入Hbase
                          if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                              writeToHbase(value);
                          }
                      }
                      //手动提交分区的commit offset
                      Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                      consumer.commitSync(offsets);
                  }
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
    • 小结

      • 实现离线消费者的开发

    10:离线分析:Hbase连接构建

    • 目标实现Hbase连接的构建

    • 实施

          private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      	private static Connection conn;
          private static Table table;
          private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
          private static byte[] family = Bytes.toBytes("C1");//列族
      
          // 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
          static{
              try {
                  //构建配置对象
                  Configuration conf = HBaseConfiguration.create();
                  conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
                  //构建连接
                  conn = ConnectionFactory.createConnection(conf);
                  //获取表对象
                  table = conn.getTable(tableName);
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • 小结

      • 实现Hbase连接的构建

    11:离线分析:Rowkey的构建

    • 目标实现Rowkey的构建

    • 实施

      private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
              //转换时间戳
              long time = format.parse(stime).getTime();
              String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
              //构建MD5
              String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
              //合并返回
              return prefix+"_"+suffix;
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
    • 小结

      • 实现Rowkey的构建

    12:离线分析:Put数据列构建

    • 目标实现Put数据列的构建

    • 实施

      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
      put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    • 小结

      • 实现Put数据列的构建

    13:离线分析:存储运行测试

    • 目标测试运行消费Kafka数据动态写入Hbase

    • 实施

      • 启动消费者程序

      • 启动Flume程序

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
        • 1
        • 2
      • 启动模拟数据

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        10
        
        • 1
        • 2
        • 3
        • 4
      • 观察Hbase结果

        image-20210905213457245

    • 小结

      • 测试运行消费Kafka数据动态写入Hbase

    14:离线分析:Hive关联测试

    • 目标使用Hive关联Hbase实现离线分析

    • 路径

      • step1:关联
      • step2:查询
    • 实施

      • 启动Hive和yarn

        start-yarn.sh
        hive-daemon.sh metastore
        hive-daemon.sh hiveserver2
        start-beeline.sh
        
        • 1
        • 2
        • 3
        • 4
      • 关联

        create database MOMO_CHAT;
        use MOMO_CHAT;
        create external table if not exists MOMO_CHAT.MOMO_MSG (
          id string,
          msg_time string ,
          sender_nickyname string , 
          sender_account string , 
          sender_sex string , 
          sender_ip string ,
          sender_os string , 
          sender_phone_type string ,
          sender_network string , 
          sender_gps string , 
          receiver_nickyname string ,
          receiver_ip string ,
          receiver_account string ,
          receiver_os string ,
          receiver_phone_type string ,
          receiver_network string ,
          receiver_gps string ,
          receiver_sex string ,
          msg_type string ,
          distance string ,
          message string 
        ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
        with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, 
        C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,
        C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,
        C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,
        C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
      • 分析查询

        --基础查询
        select 
          msg_time,sender_nickyname,receiver_nickyname,distance 
        from momo_msg limit 10;
        
        --查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000
        select 
          * 
        from momo_msg 
        where sender_account='13280256412' 
        and receiver_account='15260978785' 
        and substr(msg_time,0,10) = '2021-09-29';
        
        --统计每个小时的消息数
        select
          substr(msg_time,0,13) as hour,
          count(*) as cnt
        from momo_msg
        group by substr(msg_time,0,13);
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
    • 小结

      • 使用Hive关联Hbase实现离线分析

    15:离线分析:Phoenix关联测试

    • 目标使用Phoenix关联Hbase实现即时查询

    • 路径

      • step1:关联
      • step2:查询
    • 实施

      • 启动

        cd /export/server/phoenix-5.0.0-HBase-2.0-bin/
        bin/sqlline.py node1:2181
        
        • 1
        • 2
      • 关联

        create view if not exists MOMO_CHAT.MOMO_MSG (
          "id" varchar primary key,
          C1."msg_time" varchar ,
          C1."sender_nickyname" varchar , 
          C1."sender_account" varchar , 
          C1."sender_sex" varchar , 
          C1."sender_ip" varchar ,
          C1."sender_os" varchar , 
          C1."sender_phone_type" varchar ,
          C1."sender_network" varchar , 
          C1."sender_gps" varchar , 
          C1."receiver_nickyname" varchar ,
          C1."receiver_ip" varchar ,
          C1."receiver_account" varchar ,
          C1."receiver_os" varchar ,
          C1."receiver_phone_type" varchar ,
          C1."receiver_network" varchar ,
          C1."receiver_gps" varchar ,
          C1."receiver_sex" varchar ,
          C1."msg_type" varchar ,
          C1."distance" varchar ,
          C1."message" varchar
        );
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
      • 即时查询

        --基础查询
        select 
          "id",c1."sender_account",c1."receiver_account" 
        from momo_chat.momo_msg 
        limit 10;
        
        --查询每个发送人发送的消息数
        select 
          c1."sender_account" ,
          count(*) as cnt 
        from momo_chat.momo_msg 
        group by c1."sender_account";
        
        --查询每个发送人聊天的人数
        select 
          c1."sender_account" ,
          count(distinct c1."receiver_account") as cnt 
        from momo_chat.momo_msg 
        group by c1."sender_account" 
        order by cnt desc;
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
    • 小结

      • 使用Phoenix关联Hbase实现即时查询
  • 相关阅读:
    【spring cloud】(三)服务降级——Hystrix
    一、什么是 HarmonyOS ?
    jwt+redis实现登录认证
    HDFS、Yarn、Hive…MRS中使用Ranger实现权限管理全栈式实践
    世界上最便宜好用的服务器低至 $9.99 / 年
    js实现一行半文本的截取
    汽车电气架构
    OpenSSL在i.MX8MP开发板上的应用
    error: cannot jump from this goto statement to its label
    Vue3.0 如何写自定义指令
  • 原文地址:https://blog.csdn.net/xianyu120/article/details/133811744