• 数仓学习笔记(1)——用户行为数据采集


    目录

    一、数据仓库概述

    二、项目需求及架构设计

    1、需求分析

    2、项目框架

    2.1 技术选型

    三、数据生成模块

    1、目标数据

    四、数据采集模块

    1、Kafka

    1.1 Kafka常用命令

    1.2 项目经验之Kafka机器数量计算 

    2、Flume

    2.1 组件选型

    2.2 日志采集Flume配置

    2、消费Kafka数据Flume

    2.1消费者Flume配置

    2.2 Flume时间戳拦截器

    3、常见问题及解决方案

    3.1 2NN不能显示完整信息


    一、数据仓库概述

    二、项目需求及架构设计

    1、需求分析

    2、项目框架

    2.1 技术选型

    三、数据生成模块

    1、目标数据

    我们要收集和分析的数据主要包括页面数据事件数据、曝光数据、启动数据和错误数据。

    四、数据采集模块

    各类安装及编写脚本这里就不进行过多说明了

    1、Kafka

    1.1 Kafka常用命令

    Kafka Topic列表在Flume安装后再进行创建

    1.2 项目经验之Kafka机器数量计算 

    2、Flume

    2.1 组件选型

    2.2 日志采集Flume配置

    开始的部分安装已经完成,安装的具体内容可以去查看文档,这里不做具体说明,但是要注意要每一个集群的启动顺序,不然貌似无法启动完成

    2、消费Kafka数据Flume

    2.1消费者Flume配置

    文件配置如下:

    1. ## 组件
    2. a1.sources=r1
    3. a1.channels=c1
    4. a1.sinks=k1
    5. ## source1
    6. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    7. a1.sources.r1.batchSize = 5000
    8. a1.sources.r1.batchDurationMillis = 2000
    9. a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    10. a1.sources.r1.kafka.topics=topic_log
    11. a1.sources.r1.interceptors = i1
    12. a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.TimeStampInterceptor$Builder
    13. ## channel1
    14. a1.channels.c1.type = file
    15. a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    16. a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    17. ## sink1
    18. a1.sinks.k1.type = hdfs
    19. a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    20. a1.sinks.k1.hdfs.filePrefix = log-
    21. a1.sinks.k1.hdfs.round = false
    22. #控制生成的小文件
    23. a1.sinks.k1.hdfs.rollInterval = 10
    24. a1.sinks.k1.hdfs.rollSize = 134217728
    25. a1.sinks.k1.hdfs.rollCount = 0
    26. ## 控制输出文件是原生文件。
    27. a1.sinks.k1.hdfs.fileType = CompressedStream
    28. a1.sinks.k1.hdfs.codeC = lzop
    29. ## 拼装
    30. a1.sources.r1.channels = c1
    31. a1.sinks.k1.channel= c1

    2.2 Flume时间戳拦截器

    由于Flume默认会用Linux系统时间,作为输出到HDFS路径的时间。如果数据是23:59分产生的。Flume消费Kafka里面的数据时,有可能已经是第二天了,那么这部门数据会被发往第二天的HDFS路径。我们希望的是根据日志里面的实际时间,发往HDFS的路径,所以下面拦截器作用是获取日志中的实际时间。

    解决的思路:拦截json日志,通过fastjson框架解析json,获取实际时间ts。将获取的ts时间写入拦截器header头,header的key必须是timestamp,因为Flume框架会根据这个key的值识别为时间,写入到HDFS。

    1. package com.atguigu.flume.interceptor;
    2. import com.alibaba.fastjson.JSONObject;
    3. import org.apache.flume.Context;
    4. import org.apache.flume.Event;
    5. import org.apache.flume.interceptor.Interceptor;
    6. import java.nio.charset.StandardCharsets;
    7. import java.util.ArrayList;
    8. import java.util.List;
    9. import java.util.Map;
    10. public class TimeStampInterceptor implements Interceptor {
    11. private ArrayList events = new ArrayList<>();
    12. @Override
    13. public void initialize() {
    14. }
    15. @Override
    16. public Event intercept(Event event) {
    17. Map headers = event.getHeaders();
    18. String log = new String(event.getBody(), StandardCharsets.UTF_8);
    19. JSONObject jsonObject = JSONObject.parseObject(log);
    20. String ts = jsonObject.getString("ts");
    21. headers.put("timestamp", ts);
    22. return event;
    23. }
    24. @Override
    25. public List intercept(List list) {
    26. events.clear();
    27. for (Event event : list) {
    28. events.add(intercept(event));
    29. }
    30. return events;
    31. }
    32. @Override
    33. public void close() {
    34. }
    35. public static class Builder implements Interceptor.Builder {
    36. @Override
    37. public Interceptor build() {
    38. return new TimeStampInterceptor();
    39. }
    40. @Override
    41. public void configure(Context context) {
    42. }
    43. }
    44. }

    3、常见问题及解决方案

    3.1 2NN不能显示完整信息

    1)问题描述

    访问2NN页面http://hadoop104:9868,看不到详细信息

    2)解决办法

    (1)在浏览器上按F12,查看问题原因。定位bug在61行

    (2)找到要修改的文件

    1. [atguigu@hadoop102 static]$ pwd
    2. /opt/module/hadoop-3.1.3/share/hadoop/hdfs/webapps/static
    3. [atguigu@hadoop102 static]$ vim dfs-dust.js
    4. :set nu
    5. 修改61行
    6. return new Date(Number(v)).toLocaleString();

    (3)分发dfs-dust.js

    [atguigu@hadoop102 static]$ xsync dfs-dust.js

    (4)在http://hadoop104:9868/status.html 页面强制刷新

  • 相关阅读:
    【前端基础小案例】HTML+CSS打造精美选项卡菜单效果
    python实现灰色关联法(GRA)
    CF Round 479 (Div. 3)--E. Cyclic Components(DFS求无向图中独立环的个数)
    这几个高效软件简直是打工人的宝藏软件
    项目管理PRINCE2核心知识点整理
    46_StringBuilder类
    FA_03.添加自定义属性控制fridaserver启动和停止
    阿里云/腾讯云国际站代理:阿里云服务器介绍
    python中的if ... is None else ...含义与用法
    简易版 vue实现
  • 原文地址:https://blog.csdn.net/qq_64557330/article/details/126231089