• Flume学习笔记(2)—— Flume进阶


    Flume进阶

    Flume 事务

    事务处理流程如下:

    Put

    • doPut:将批数据先写入临时缓冲区putList
    • doCommit:检查channel内存队列是否足够合并。
    • doRollback:channel内存队列空间不足,回滚数据

    Take

    • doTake:将数据取到临时缓冲区takeList,并将数据发送到HDFS
    • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
    • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列

    Flume Agent 内部原理

    ChannelSelector

    ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel

    其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)

    • ReplicatingSelector 会将同一个 Event 发往所有的 Channel
    • Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel

    SinkProcessor

    SinkProcessor 共有三种类型 , 分别是 DefaultSinkProcessor 、LoadBalancingSinkProcessor、FailoverSinkProcessor

    • DefaultSinkProcessor 对应的是单个的Sink
    • LoadBalancingSinkProcessor 可以实现负载均衡的功能
    • FailoverSinkProcessor 可以实现错误恢复的功能

    Flume 拓扑结构

    简单串联

    将多个 flume 顺序连接起来,从最初的 source 开始到最终 sink 传送的目的存储系统

    不建议桥接过多的 flume 数量, flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统

    复制和多路复用

    (单 source,多 channel、sink)

    Flume 支持将事件流向一个或者多个目的地

    这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的地

    负载均衡和故障转移

    Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能、

    这里的agent1有三个sink,分别连接agent2,agent3,agent4,即使其中有的sink出现了故障,数据还是能同步到hdfs

    聚合

    业务中常用,比如说日志采集功能:

    日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器,产生的日志处理起来也非常麻烦

    可以采用聚合的方式,每台服务器部署一个 flume 采集日志,传送到一个集中收集日志的flume,再由此 flume 上传到 hdfs、hive、hbase 等,进行日志分析

    Flume实战案例

    复制和多路复用

    需求:使用 Flume-1 监控文件变动

    1. Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS
    2. Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem

    实现流程:
    1.在job下创建文件夹group1,并在其中创建配置文件flume-file-flume.conf

    配置文件中需要有1个source,2个channel,2个sink

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.sinks = k1 k2
    4. a1.channels = c1 c2
    5. # 将数据流复制给所有 channel
    6. a1.sources.r1.selector.type = replicating
    7. # Describe/configure the source
    8. a1.sources.r1.type = exec
    9. a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2-bin/logs/hive.log
    10. a1.sources.r1.shell = /bin/bash -c
    11. # Describe the sink
    12. # sink 端的 avro 是一个数据发送者
    13. a1.sinks.k1.type = avro
    14. a1.sinks.k1.hostname = hadoop102
    15. a1.sinks.k1.port = 4141
    16. a1.sinks.k2.type = avro
    17. a1.sinks.k2.hostname = hadoop102
    18. a1.sinks.k2.port = 4142
    19. # Describe the channel
    20. a1.channels.c1.type = memory
    21. a1.channels.c1.capacity = 1000
    22. a1.channels.c1.transactionCapacity = 100
    23. a1.channels.c2.type = memory
    24. a1.channels.c2.capacity = 1000
    25. a1.channels.c2.transactionCapacity = 100
    26. # Bind the source and sink to the channel
    27. a1.sources.r1.channels = c1 c2
    28. a1.sinks.k1.channel = c1
    29. a1.sinks.k2.channel = c2

    该配置文件的作用是将数据发送到两个不同的sink,再由sink发送到其他的agent进行处理

    2.创建配置文件flume-flume-hdfs.conf

    1. # Name the components on this agent
    2. a2.sources = r1
    3. a2.sinks = k1
    4. a2.channels = c1
    5. # Describe/configure the source
    6. # source 端的 avro 是一个数据接收服务
    7. a2.sources.r1.type = avro
    8. a2.sources.r1.bind = hadoop102
    9. a2.sources.r1.port = 4141
    10. # Describe the sink
    11. a2.sinks.k1.type = hdfs
    12. a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
    13. #上传文件的前缀
    14. a2.sinks.k1.hdfs.filePrefix = flume2-
    15. #是否按照时间滚动文件夹
    16. a2.sinks.k1.hdfs.round = true
    17. #多少时间单位创建一个新的文件夹
    18. a2.sinks.k1.hdfs.roundValue = 1
    19. #重新定义时间单位
    20. a2.sinks.k1.hdfs.roundUnit = hour
    21. #是否使用本地时间戳
    22. a2.sinks.k1.hdfs.useLocalTimeStamp = true
    23. #积攒多少个 Event 才 flush 到 HDFS 一次
    24. a2.sinks.k1.hdfs.batchSize = 100
    25. #设置文件类型,可支持压缩
    26. a2.sinks.k1.hdfs.fileType = DataStream
    27. #多久生成一个新的文件
    28. a2.sinks.k1.hdfs.rollInterval = 30
    29. #设置每个文件的滚动大小大概是 128M
    30. a2.sinks.k1.hdfs.rollSize = 134217700
    31. #文件的滚动与 Event 数量无关
    32. a2.sinks.k1.hdfs.rollCount = 0
    33. # Describe the channel
    34. a2.channels.c1.type = memory
    35. a2.channels.c1.capacity = 1000
    36. a2.channels.c1.transactionCapacity = 100
    37. # Bind the source and sink to the channel
    38. a2.sources.r1.channels = c1
    39. a2.sinks.k1.channel = c1

    source绑定上一个agent的sink1,然后上传到hdfs

    3.创建配置文件:flume-flume-dir.conf

    1. # Name the components on this agent
    2. a3.sources = r1
    3. a3.sinks = k1
    4. a3.channels = c2
    5. # Describe/configure the source
    6. a3.sources.r1.type = avro
    7. a3.sources.r1.bind = hadoop102
    8. a3.sources.r1.port = 4142
    9. # Describe the sink
    10. a3.sinks.k1.type = file_roll
    11. a3.sinks.k1.sink.directory = /home/why/data/flumeDemo/test1
    12. # Describe the channel
    13. a3.channels.c2.type = memory
    14. a3.channels.c2.capacity = 1000
    15. a3.channels.c2.transactionCapacity = 100
    16. # Bind the source and sink to the channel
    17. a3.sources.r1.channels = c2
    18. a3.sinks.k1.channel = c2

    参数说明:

    sink类型为file_roll:Flume 1.11.0 User Guide — Apache Flume

    可以将events保存到本地文件系统

    • directory:本地文件系统保存数据的路径(注意,该路径必须已经存在才可以)

    4.分别启动相应的flume进程:

    nohup bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf &

    nohup bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf &

    nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf &
    5.在hdfs和文件夹中都能看到相应的内容:

    hdfs:

    文件系统:

    负载均衡和故障转移

    需求:使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用FailoverSinkProcessor,实现故障转移的功能

    实现流程:

    1.在/opt/module/flume/job 目录下创建 group2 文件夹,创建配置文件flume-netcat-flume.conf

    配置 1 个 netcat source 和 1 个 channel、1 个 sink group(2 个 sink),分别输送给flume-flume-console1 和 flume-flume-console2

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.channels = c1
    4. a1.sinkgroups = g1
    5. a1.sinkgroups.g1.processor.type = failover
    6. a1.sinkgroups.g1.processor.priority.k1 = 5
    7. a1.sinkgroups.g1.processor.priority.k2 = 10
    8. a1.sinkgroups.g1.processor.maxpenalty = 10000
    9. a1.sinks = k1 k2
    10. # Describe/configure the source
    11. a1.sources.r1.type = netcat
    12. a1.sources.r1.bind = localhost
    13. a1.sources.r1.port = 44444
    14. # Describe the sink
    15. a1.sinks.k1.type = avro
    16. a1.sinks.k1.hostname = hadoop102
    17. a1.sinks.k1.port = 4141
    18. a1.sinks.k2.type = avro
    19. a1.sinks.k2.hostname = hadoop102
    20. a1.sinks.k2.port = 4142
    21. # Describe the channel
    22. a1.channels.c1.type = memory
    23. a1.channels.c1.capacity = 1000
    24. a1.channels.c1.transactionCapacity = 100
    25. # Bind the source and sink to the channel
    26. a1.sources.r1.channels = c1
    27. a1.sinkgroups.g1.sinks = k1 k2
    28. a1.sinks.k1.channel = c1
    29. a1.sinks.k2.channel = c1

    参数说明:Flume 1.11.0 User Guide — Apache Flume

    通过sink groups在一个agent中定义多个sink,并可以配置sink processor使用:Flume 1.11.0 User Guide — Apache Flume

    2.创建 flume-flume-console1.conf

    1. # Name the components on this agent
    2. a2.sources = r1
    3. a2.sinks = k1
    4. a2.channels = c1
    5. # Describe/configure the source
    6. a2.sources.r1.type = avro
    7. a2.sources.r1.bind = hadoop102
    8. a2.sources.r1.port = 4141
    9. # Describe the sink
    10. a2.sinks.k1.type = logger
    11. # Describe the channel
    12. a2.channels.c1.type = memory
    13. a2.channels.c1.capacity = 1000
    14. a2.channels.c1.transactionCapacity = 100
    15. # Bind the source and sink to the channel
    16. a2.sources.r1.channels = c1
    17. a2.sinks.k1.channel = c1

    sink输出到本地的控制台

    3.创建 flume-flume-console2.conf

    1. # Name the components on this agent
    2. a3.sources = r1
    3. a3.sinks = k1
    4. a3.channels = c2
    5. # Describe/configure the source
    6. a3.sources.r1.type = avro
    7. a3.sources.r1.bind = hadoop102
    8. a3.sources.r1.port = 4142
    9. # Describe the sink
    10. a3.sinks.k1.type = logger
    11. # Describe the channel
    12. a3.channels.c2.type = memory
    13. a3.channels.c2.capacity = 1000
    14. a3.channels.c2.transactionCapacity = 100
    15. # Bind the source and sink to the channel
    16. a3.sources.r1.channels = c2
    17. a3.sinks.k1.channel = c2

    sink输出到本地的控制台

    4.执行指令:

    bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

    bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

    bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

    5.使用nc localhost 44444发送数据

    由于console2设置的优先级高于console1,因此数据由console2接收到;

    接下来将console2进程kill掉,数据就由console1接收了:

    聚合

    需求:

    hadoop102 上的 Flume-1 监控文件/home/why/data/flumeDemo/test3/test3.log

    hadoop103 上的 Flume-2 监控某一个端口的数据流

    Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3,Flume-3 将最终数据打印到控制台

    实现流程:

    1.首先在三台服务器的job文件夹先创建目录group3

    2.在hadoop102上,创建配置文件flume1-logger-flume.conf,source用于监控log日志文件,sink用于输出数据到下一级的Flume

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # Describe/configure the source
    6. a1.sources.r1.type = exec
    7. a1.sources.r1.command = tail -F /home/why/data/flumeDemo/test3/test3.log
    8. a1.sources.r1.shell = /bin/bash -c
    9. # Describe the sink
    10. a1.sinks.k1.type = avro
    11. a1.sinks.k1.hostname = hadoop104
    12. a1.sinks.k1.port = 4141
    13. # Describe the channel
    14. a1.channels.c1.type = memory
    15. a1.channels.c1.capacity = 1000
    16. a1.channels.c1.transactionCapacity = 100
    17. # Bind the source and sink to the channel
    18. a1.sources.r1.channels = c1
    19. a1.sinks.k1.channel = c1

    3.在hadoop103上,创建配置文件flume2-netcat-flume.conf,source用于监控端口44444的数据流,sink用于将数据传输到下一级的flume

    1. # Name the components on this agent
    2. a2.sources = r1
    3. a2.sinks = k1
    4. a2.channels = c1
    5. # Describe/configure the source
    6. a2.sources.r1.type = netcat
    7. a2.sources.r1.bind = localhost
    8. a2.sources.r1.port = 44444
    9. # Describe the sink
    10. a2.sinks.k1.type = avro
    11. a2.sinks.k1.hostname = hadoop104
    12. a2.sinks.k1.port = 4141
    13. # Use a channel which buffers events in memory
    14. a2.channels.c1.type = memory
    15. a2.channels.c1.capacity = 1000
    16. a2.channels.c1.transactionCapacity = 100
    17. # Bind the source and sink to the channel
    18. a2.sources.r1.channels = c1
    19. a2.sinks.k1.channel = c1

    注意,这两个agent的sink目的地都是hadoop104这一个服务器,因此hostname和port都相同

    4.在hadoop104上创建配置文件flume3-flume-logger.conf,source用于接收flume1和flume2发送来的数据流,sink用于输出数据到控制台;

    1. # Name the components on this agent
    2. a3.sources = r1
    3. a3.sinks = k1
    4. a3.channels = c1
    5. # Describe/configure the source
    6. a3.sources.r1.type = avro
    7. a3.sources.r1.bind = hadoop104
    8. a3.sources.r1.port = 4141
    9. # Describe the sink
    10. a3.sinks.k1.type = logger
    11. # Describe the channel
    12. a3.channels.c1.type = memory
    13. a3.channels.c1.capacity = 1000
    14. a3.channels.c1.transactionCapacity = 100
    15. # Bind the source and sink to the channel
    16. a3.sources.r1.channels = c1
    17. a3.sinks.k1.channel = c1

    5.分别在三台服务器上执行指令

    hadoop104: bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

    hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

    hadoop103:bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

    6.在hadoop102上向日志文件中追加内容:

    echo "hello" > /home/why/data/flumeDemo/test3/test3.log

    在hadoop103中通过nc hadoop103 44444向44444端口发送数据;

    然后在hadoop104中即可接收到数据:

  • 相关阅读:
    ubuntu下使用gcc编译c程序: “error: stray ‘\357’ in program“
    WinForm的前世今生
    2.1进程与线程【操作系统】【王道听课笔记与复述】
    《TCP/IP网络编程》阅读笔记--并发多进程服务端的使用
    后端私钥加密,前端使用wxapp_rsa.js用公钥解密
    vue实现el-menu与el-tabs联动
    JS中的栈和堆
    浩源鼎盛科技:你真的了解抖音推送机制吗?
    测试必备工具-Postman实战教程
    随想录一刷Day52——动态规划
  • 原文地址:https://blog.csdn.net/qq_51235856/article/details/134465832