• 四、业务数据解析


    一、项目背景

    1、背景需求

    项目描述:公司智能工厂产品中一个智慧运输项目,还有设备全生命管理、能源分析、智能楼宇,

    开发前期是一个工厂园区的定制项目,为根据国家政策,补助实现工业数字化转型, 他们全国有好几个工业园,每个园区都比较大,园区除他们自己的工厂外,还有其他同行业其他小厂。他们准备自己维护设备管理、物业管理、车辆管理等。

    这个项目的需求:
    主要是开发一个的用户付费运输货物,自己维护园区中货物转送管理与收益,相当于外面货拉拉业务有自己专门的小程序。

    我们主要负责技术实现:
    1)用户叫车下订单运输后,后台可实时监控车辆位置,同时会进行相关用户/订单/车辆的统计分析,类似货拉拉。

    2)为实现用户车辆订单的分析做好基础,把业务库中四个表同步到hbase表。 不在业务库上分析,不能影响正常业务,我们解耦数据,

    3)利用sparkSQL对hbase中几个业务表进行统计分析。

    实现流程:

    • 1)将业务库中四张表车辆表、用户表,订单表以及司管方表, 通过maxwell实时的解析mysql的binlog数据,将数据发送到kafka,
    • 2)sparkSteaing消费kafka,foreachPartion实时存储数据到hbase。
    • 3)保存数据到Hbase ,考虑我rowkey设计,设置压缩,实现数据的均匀的分区, 避免Hbase表的数据倾斜与热点的问题,前创建hbase表时,

    2、数据源描述

    1)hbase建表预分区

    1)rowkey加密然后截取12位,前面拼上四位数字。预分区的时候使用到了 | ,|是字典码值最大的一个,0000|、0001|、0002|、…、0008|
    2)使用phoenix来映射hbase的数据,实现数据的查询操作,不用rowkey去查询hbase里面的数据,

    1)hbase建表
    数据库四张表对应hbase的四张表,通过rowkey设计,实现数据的均匀的分区,避免Hbase数据的倾斜,(加盐,反转,hash)

    rowkey:使用id:timestamp =>使用md5进行加密,加密后截取12位==rowkey的长度就是12位。0001+12位字符串 ==> 刚好16位
    预分区的规则:0000|、0001|、0002|、…、0008|,|是字典码值最大的一个
    2)使用phoenix来映射hbase的数据,实现数据的查询操作,不用rowkey去查询hbase里面的数据,

    2)司机表driver_info具体字段

    乘客表renter_info具体字段
    订单表order_info_201907具体字段
    司管方表opt_alliance_business具体字段

    司机表driver_info:司机id ,电话,车辆id,、车辆类型、注册时间、

    乘客表renter_info:乘客id, 乘客姓名,电话,注册时间

    订单表order_info:订单id, 车辆id、乘客id、订单类型(0实时订单1预约订单)、订单状态cancel:(0未取消,1用户取消,2司机取消,3超时未接单)、订单里程、支付费用、行驶花费时间、订单创建时间、完成时间

    二、maxwell监控binlog

    maxwell的基本介绍

    maxwell是一款专业解析mysql的binlog的数据同步的工具,功能与canal类似(canal是阿里开源的一款数据实时同步工具),我们可以通过maxwell或者canal来实现binlog的实时解析,实现数据的实时同步。官方网址:http://maxwells-daemon.io/

    maxwell与canal的比较
    主要区别:

    • 1、虽然Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
    • 2、Canal是服务端,数据过来了并不能直接写出去,需要一个客户端:syncClient去获取数据;Maxwell即是服务端也是客户端
    • 3、Maxwell支持Bootstrap,即刷全量的数据,而Canal不支持。
    • 4、Maxwell只支持Json,而Canel数据格式自由

    个人选择Maxwell:
    a、服务端和客户端是一体的
    b、Maxwell是轻量级的,出错风险低,Canal经常出错
    c、虽然部署的是单台,但是具有断点还原能力,出错好解决
    d、Maxwell代码质量非常好,且社区更加的活跃

    maxwell解析

    通过maxwell来解析mysql的binlog日志, 实时捕获mysql数据库当中的数据,发送到kafka当中,然后我们就可以通过sparkStreaming程序来实现解析kafka当中的json格式的数据,保存到Hbase当中去

    maxwell来解析mysql的binlog日志:

    插入数据:
    {“database”:“test”,“table”:“myuser”,“type”:“insert”,“ts”:1583758029,“xid”:287,“xoffset”:0,“data”:{“id”:1,“name”:“zhangsan”,“age”:null}}
    更新数据:
    {“database”:“test”,“table”:“myuser”,“type”:“update”,“ts”:1583758071,“xid”:349,“commit”:true,“data”:{“id”:1,“name”:“xiaolaodi”,“age”:null},“old”:{“name”:“zhangsan”}}
    删除数据的json:
    {“database”:“test”,“table”:“myuser”,“type”:“delete”,“ts”:1583758092,“xid”:378,“commit”:true,“data”:{“id”:1,“name”:“xiaolaodi”,“age”:null}}

    node03执行以下命令开发maxwell配置文件

    cd /kkb/install/maxwell-1.22.1
    vim travel.properties
    
    • 1
    • 2

    log_level=INFO
    producer=kafka
    kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092
    host=node03.kaikeba.com
    user=maxwell
    password=123456
    producer_ack_timeout = 600000
    port=3306
    ######### output format stuff ###############
    output_binlog_position=ture
    output_server_id=true
    output_thread_id=ture
    output_commit_info=true
    output_row_query=true
    output_ddl=false
    output_nulls=true
    output_xoffset=true
    output_schema_id=true
    ######### output format stuff ###############
    kafka_topic= veche
    kafka_partition_hash=murmur3
    kafka_key_format=hash
    kafka.compression.type=snappy
    kafka.retries=5
    kafka.acks=all
    producer_partition_by=primary_key
    ############ kafka stuff #############
    ############## misc stuff ###########
    bootstrapper=async
    ############## misc stuff ##########
    ############## filter ###############
    filter=exclude:., include: travel.order_info_201904,include: travel.order_info_201905,include: travel.order_info_201906,include: travel.order_info_201907,include: travel.order_info_201908,include: travel.order_info_201906,include: travel.order_info_201910,include: travel.order_info_201911,include: travel.order_info_201912,include: travel.renter_info,include: travel.driver_info ,include: travel.opt_alliance_business

    node01执行以下命令创建kafka的topic

    cd /kkb/install/kafka_2.11-1.0.1/ bin/kafka-topics.sh --create --topic veche --partitions 3 --replication-factor 1 --zookeeper node01:2181,node02:2181,node03:2181

    三、sparkSreaming 存储数据

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    四、项目经验

    1、maxwell会造成数据倾斜?

    创建kafka的topic的时候,有三个分区,所有的数据全部都取到了1号分区,就会造成数据倾斜的问题???

    maxwell默认的分区规则使用的是dbname.hashCode % partitioner个数不同的数据库里面的数据,放到不同的分区里面去,相同的数据库里面的数据,
    一定会将数据放到相同的分区里面。

    默认,按表名分区,一个表的数据全部放在一个分区
    去修改成 按主键分区

    filter没懂。。
    filter=exclude:., include: travel.order_info_201904,include: travel.opt_alliance_business
    ############## filter ###############
    过滤我们不需要所有的数据库以及所有的数据库表,需要travel下面的四张表

    2、HBASE一个数据多次写入

    如果HBASE设计预分区散列之后这样如果同一个数据多次写入,
    会出现重复数据吧 如果用唯一值且是主键可以保证数据不重复吧

    不会出现重复数据 如果是同一个数据,且时间戳是一样的,不会出现重复数据,会进行数据的覆盖。

    flink消费kafka的时候保证仅一次语义是不是flink的方法内部已经帮我们实现了

    对的,可以将kafka的消费状态给保存起来,
    flink天生支持state的保存
    如果实时的将数据写入es,那么kibana可以实时的看到报表嘛
    可以的,没问题的

  • 相关阅读:
    ggcor替代包:linkET,相关图,mantel test可视化
    数据分析:智能企业七步曲(一)
    一般处理程序ashx接入微信服务器配置
    DBA常用命令
    条码在WMS仓储管理系统中的应用,体现在哪些方面
    C++实战-Linux多线程(入门到精通)
    基于jmuxer的音频播放组件封装
    字符串|459.重复的子字符串
    利用Freemaker导出word时的常用语法示例
    【Web安全靶场】sqli-labs-master 21-37 Advanced-Injection
  • 原文地址:https://blog.csdn.net/TU_JCN/article/details/126207654