• Flink实时数仓之用户埋点系统(一)


    需求分析

    数据采集

    用户行为采集

    1. 行为数据:页面浏览、点击、在线日志等数据
    2. 活跃数据:用户注册、卸载安装、活跃等数据
    3. App性能日志:卡顿、异常等数据

    业务数据采集

    1. 业务数据:支付等
    2. 维度表:渠道、商品等

    行为日志分析

    用户行为日志

    日志结构大致可分为两类,一是页面日志,二是启动日志和在线日志。

    页面日志

    页面日志,以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录和多个用户在该页面所做的动作记录,以及若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

    {
       "common": {                     -- 环境信息
          "imei":"xxx",
          "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
          "acc_id": "aad3",             -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
          "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
          "qid": "kuaishou1",           -- 渠道
          "group_qid":"kuaishou",  -- 渠道分组
          "asc_qid":"xiaoh",                -- 归因渠道号 (可以为空,上报空值率)
          "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
          "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
          "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
          "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
          "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
          "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
          "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
          "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
          "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
          "ip":"127.0.0.1",            --ip
          "is_new": 1,                 -- 是否为新用户 0老用户,1新用户(安装后启动的第一天用户都为新用户,第一天之后都为老用户) 
          "code": "xxx",              -- 平台标识 
          "lab_code": "实验A",         -- 实验code 
          "lab_group_code": "note"     -- 实验分组code  
       },
       "actions": [{                   -- 页面动作信息
          "page_url": "/good_detail",  -- 页面url(取相对路径)
          "action_type": "show",       -- 动作类型:展现传“show”、点击传“click”、关闭传“close” (不可为空,非show、click、close报错 空值报错)
          "event": "Vip",              -- 事件类型
          "sub_event": "Me"            -- 事件子类型
        }],
       "pages": [{                        -- 页面信息
          "during_time": 7648,            -- 持续时间毫秒
          "page_url": "/good_detail",     -- 页面url(取相对路径)
          "last_page_url":"",             -- 上一个页面url(取相对路径,首次访问为空)
          "event": "Vip",                 -- 事件类型
          "sub_event": "Me",              -- 页面名称
          "last_sub_event": "login"       -- 上页的名称  
       }] 
        "ts": 1585744374423             --日志上报时间戳
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    启动日志

    启动日志以启动为单位,一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

    {
      "common": {
          "imei":"idfv",
          "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
          "acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
          "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
          "qid": "xxx",                -- 渠道
          "group_qid":"xxxx",    -- 渠道分组
          "asc_qid":"",                --归因渠道号 (可以为空,上报空值率)
          "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
          "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
          "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
          "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
          "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
          "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
          "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
          "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
          "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
          "ip":"127.0.0.1",            --ip
          "is_new": 1,                 -- 是否为新用户 0老用户,1新用户
          "code": "xxx",               -- 平台标识 
          "lab_code": "实验A",         -- 实验code 
          "lab_group_code": "note"     -- 实验分组code  
      },
      "start": {   
        "start_way": 0,          --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动
        "entry": "icon",          --启动途径。icon:手机图标  notice:通知   install:安装后启动
        "loading_time": 18803    --启动加载时间
      },
      "ts": 1585744304000        --日志上报时间戳
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    APP在线日志

    App在线日志以启动-关闭为单位,一次启动-关闭行为,生成一条启动-关闭日志。

    {
      "common": {
          "imei":"idfv",
          "device_id":"12323",         --客户端唯一识别id,安卓传安卓唯一id,ios传idfa (可以为空,上报空值率)
          "acc_id": "aad3",            -- APP注册ID,如:aad32c13aa6d008c_1D19l (可以为空,上报空值率)
          "app_type_id":"DFTT",        --App软件唯一识别符,如:步多多为100001 (不可为空,非DFTT/ZYSRF/SXG/6位数字则报错,空值则报错)
          "qid": "xxx",           -- 渠道
          "group_qid":"xxxx",  -- 渠道分组
          "asc_qid":"",                -- 归因渠道号 (可以为空,上报空值率)
          "app_ver":"v2.1.134",        --App版本号 ,如:1.1.1 最大99.99.99(不可为空,非标准格式及空值 报错)
          "os":"IOS",                  --操作系统,如:Android、iOS (不可为空,非:Android/iOS及空值 报错 )
          "os_version":"11.0",         --操作系统版本号,如:7.0 (不可为空,空值 报错)
          "device":"xiao mi 6",        --公共参数)客户端手机型号,如:xiaomi6 (不可为空,空值 报错 )
          "device_brand":"xiaomi",     --机型品牌,如:HUAWEI (不可为空,空值 报错)
          "pixel":"1080*1920",         --屏幕分辨率,如:1080*1920 (不可为空 ,空值 报错)
          "network":"5g",              --网络环境,如:wifi、4g、3g、2g、other (不可为空 ,非:wifi、4g、3g、2g、other及空值 报错)
          "is_tourist":1 ,             --是否是游客,如:1表示游客、0表示非游客、2表示未登录
          "obatch_id":"ddadccae",      --本次启动唯一ID:每次启动时生成一个唯一批次号,直到下次启动才变更 (可以为空 ,上报空值率)  
          "ip":"127.0.0.1",            --ip
          "is_new": 1,                 -- 是否为新用户 0老用户,1新用户 
          "code": "xxx",              -- 平台标识 
          "lab_code": "实验A",         -- 实验code 
          "lab_group_code": "note"     -- 实验分组code  
      },
      "online": {   
        "start_way": 0,            --启动方式。 0:热启动  1:代表首次安装首次启动  2:冷启动
        "start_time":  18803111 ,  --开始时间(毫秒)
        "end_time":  188033 ,      --退出时间(毫秒)
        "online_time": 18803      --在线时长(毫秒)
      },
      "ts": 1585744304000        --日志上报时间戳
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    新老用户的判断规则
    APP 端:用户安装 App 后,第一次打开 App 的当天,Android/iOS SDK 会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。
    即:第一天触发的 APP 端所有事件中,is_new = 1。即第一天之后触发的 APP 端所有事件中,is_new = 0。
    对于此类日志,如果首日之后用户清除了手机本地缓存中的标记,再次启动 APP 会重新设置一个首日为 true 的标记,导致本应为 0 的 is_new 字段被置为1
    前端处理规则
    is_new(1:新用户,0:老用户)用户安装 App 后,第一次打开 App 的当天,即第一天触发的 APP 端所有事件中,is_new = 1,第一天之后,该标记则为 false,即第一天之后触发的 APP 端所有事件中,is_new = 0。首日之后用户清除了手机本地缓存中的标记,is_new = 1此时由后端处理

    业务数据分析

    1)用户订单、支付、退款等业务的新增、修改、删除操作都会生成一个binlog日志,通过MaxWell采集这些日志到Kafka消息队列中

    用户Insert数据

    类型:“type”: “insert”

    {
        "database":"databaseA",
        "table":"t_pay_order",
        "type":"insert",
        "ts":1686540443,
        "xid":16179,
        "commit":true,
        "data":{
            "uid":"1660557015483727879",
            "order_no":"P202305221603541978152962",
            "pay_order_id":"",
            "way_code":"APPLE_APP",
            "amount":1800,
            "currency":"cny",
            "state":1,
            "product_id": 1,
            "product_name":"商品1",
            "product_num":1,
            "body":"xxxxx",
            "user_id":"1660533343607898114",
            "refund_state":0,
            "refund_times":0,
            "refund_amount":0,
            "subscribed":1,
            "expired_time":"2023-06-21 16:03:39",
            "success_time":null,
            "create_time":"2023-05-22 08:03:38.805000",
            "update_time":"2023-06-12 02:01:26.996053",
            "err_code":"21011",
            "err_msg":"订单已退款或已订阅过期"
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    用户Update数据

    {
        "database":"note_data",
        "table":"t_pay_order",
        "type":"update",
        "ts":1686535286,
        "xid":4853,
        "commit":true,
        "data":{
            "uid":"1660557015483727876",
            "order_no":"P202305221603541978152961",
            "pay_order_id":"",
            "way_code":"APPLE_APP",
            "amount":1800,
            "currency":"cny",
            "state":3,
            "product_id":"VIP_Moth_18",
            "product_name":"月度VIP",
            "product_num":1,
            "body":"月度会员",
            "user_id":"1660533343607898114",
            "refund_state":0,
            "refund_times":0,
            "refund_amount":0,
            "subscribed":1,
            "expired_time":"2023-06-21 16:03:39",
            "success_time":null,
            "create_time":"2023-05-22 08:03:38.805000",
            "update_time":"2023-06-12 02:01:26.996053",
            "err_code":"21011",
            "err_msg":"订单已退款或已订阅过期"
        },
        "old":{
            "pay_order_id":"rfsddfx",
            "update_time":"2023-06-09 10:32:59.769593"
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    技术选型

    1. 数据采集与传输:Nginx、Flume、Kafka、MaxWell
    2. 数据存储:HDFS、HBASE、Redis
    3. 计算引擎:Flink
    4. 数据存储:ClickHouse
    5. 任务调度:Flink On Yarn

    Nginx配置

    作用

    • 收集用户埋点日志:生成log_file文件。
    • 收集post请求中的request_body,在/data/logs/nginx/user_data/文件夹下生成log日志

    配置

    http {
        include       mime.types;
        default_type  application/octet-stream;
        log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ';
        log_format data_json escape=json ' $request_body ';
        access_log  logs/access.log  main;
        sendfile        on;
        #tcp_nopush     on;
    
        #keepalive_timeout  0;
        keepalive_timeout  65;
    
        map $time_iso8601 $logdate {
            '~^(?<ymd>\d{4}-\d{2}-\d{2})' $ymd;
            default    'date-not-found';
        }
    
       server {
            listen      8090;
            server_name 127.0.0.1;
    
            access_log  /data/logs/nginx/user_data/user_big_data-$logdate.log  data_json;
            error_log /data/logs/nginx/user_data/user_big_data_error-$logdate.log  error;
            
            location / {
                proxy_pass  http://127.0.0.1:8090/api/log/;
            }
    
            location /api/log/ {
                return 200;
            }
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Flume配置

    作用

    • 采集文件到kafka队列中,这里的source(数据源)是文件,channel(通道),sink(输出源)是kafka

    关键配置

    #定义组件
    a1.sources = r1
    a1.channels = c1
    
    #配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /data/logs/nginx/user_data/user_data/.*log
    a1.sources.r1.positionFile =  /opt/apache-flume-1.9.0-bin/opt/taildir_position.json
    a1.sources.r1.interceptors =  i1
    a1.sources.r1.interceptors.i1.type = com.sinozo.data.flume.interceptor.ETLInterceptor$Builder
    
    #配置channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers =127.0.0.1:9092
    a1.channels.c1.kafka.topic = topic_log
    a1.channels.c1.parseAsFlumeEvent = false
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    
    #组装 
    a1.sources.r1.channels = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    MaxWell

    作用
    实时收集mysql中的binlog数据,输出到kafka队列中

    关键配置

    #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
    producer=kafka
    #目标Kafka集群地址
    kafka.bootstrap.servers=localhost:9092
    #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
    kafka_topic=topic_db
    
    #配置只监听note_data库下t_pay_order表
    exclude_dbs=*
    include_dbs=note_data
    include_tables=t_pay_order
    
    #MySQL相关配置
    host=localhosts
    user=maxwell
    password=maxwell
    jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    启动命令

    #!/bin/bash
    MAXWELL_HOME=/opt/maxwell-1.29.2
    
    status_maxwell(){
        result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
        return $result
    }
    
    start_maxwell(){
        status_maxwell
        if [[ $? -lt 1 ]]; then
            echo "启动Maxwell"
            $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --filter="exclude: *.*, include: db.*, exclude: *.*, include: *.t_pay_order"  --daemon
        else
            echo "Maxwell正在运行"
        fi
    }
    
    stop_maxwell(){
        status_maxwell
        if [[ $? -gt 0 ]]; then
            echo "停止Maxwell"
            ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
        else
            echo "Maxwell未在运行"
        fi
    }
    
    case $1 in
        start )
            start_maxwell
        ;;
        stop )
            stop_maxwell
        ;;
        restart )
           stop_maxwell
           start_maxwell
        ;;
    esac
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    Hadoop

    作用
    HDFS作为存储的基础组件,防止flink计算过程中的checkPoint检查点数据以及状态数据
    Yarn作为调度组件,对flink的jobManager、taskManager内存等资源进行动态分配、并对taskManager进行监控

    Flink

    作用
    作为实时计算引擎,对业务数据、用户埋点数据进行分组、统计等计算

    架构图

    Flink On Yarn架构图

  • 相关阅读:
    探索 GO 项目依赖包管理与Go Module常规操作
    阅读报告Vision-and-Language Navigation综述(2022ACL)
    主机基本安全加固
    谷歌对低代码/无代码的新押注这次会有回报吗?
    【公网远程手机Android服务器】安卓Termux搭建Web服务器
    前端培训丁鹿学堂:vue3组合式api之toRaw和customRef
    Dart 语法总结
    pg故障修复记录
    n皇后问题,不用递归
    国家信息安全水平考试NISP证书(一级、二级、三级)
  • 原文地址:https://blog.csdn.net/qq_33661044/article/details/136535303