• 生产环境下Flume配置


    五台集群三台Flume的配置方案

    鉴于网上关于flume的知识理论太多太杂,缺少生产环境下的配置方案,所以特发此篇我们集群的配置方案供大家参考。

    1.基本信息

    flume采用三节点的集群模式
    flume资源配置:export JAVA_OPTS="-Xms8192m -Xmx8192m -Dcom.sun.management.jmxremote"
    
    • 1
    • 2

    2.flume必备的测试conf

    ##给三个组件取个名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    ##测试source选择
    ##配置netcat监听本地9999端口的数据(推荐测试贼好用)
    ##netcat的安装命令yum -y install nc
    ##客户端本地连接写入数据:nc localhost 9999 
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 9999
    
    ##从kafka获取数据
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 2000
    a1.sources.r1.batchDurationMillis = 5000
    a1.sources.r1.kafka.bootstrap.servers = *:9092,*:9092,*:9092
    a1.sources.r1.kafka.topics = nats_robotindex
    a1.sources.r1.kafka.consumer.group.id = robotindex_interceptor
    
    ##这是我测试常用的拦截器方案,看需求选择
    ##自定义拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.yogo.flume.TimeStampInterceptor$Builder
    
    ##正则拦截器
    a1.sources.r1.interceptors = regex 
    a1.sources.r1.interceptors.regex.type=REGEX_FILTER 
    ##这样配置的拦截器就只会接收消息中不带有rm或者kill的Event
    a1.sources.r1.interceptors.regex.regex=(rm)|(kill) 
    a1.sources.r1.interceptors.regex.excludeEvents=true
    
    ##channels二选一
    ##memory内存
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 100000
    
    ##file文件
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /cluster/vdb/flume/checkpoint/robot_index
    a1.channels.c1.dataDirs = /cluster/vdb/flume/data/robot_index/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 15
    
    ##输出方式
    ##输出到控制台(推荐 可以看到event的header信息)
    a1.sinks.k1.type = logger
    
    ##输出到kafka
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = *:9092,*:9092,*:9092
    a1.sinks.k1.kafka.topic = first
    a1.sinks.k1.kafka.producer.acks = 1
    
    ##输出到hdfs
    a1.sinks.k1.hdfs.filePrefix = 105-
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/robot/db/t_robot_index/%Y-%m-%d
    a1.sinks.k1.hdfs.round = false
    a1.sinks.k1.hdfs.rollInterval = 3600
    ##rollSize根据需求更改,我们用了lzo压缩,要求落盘的文件块大
    a1.sinks.k1.hdfs.rollSize = 393216000
    a1.sinks.k1.hdfs.batchSize = 5000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    ## 控制输出文件是原生文件 
    ##我们集群用的lzo压缩,单纯测试可以不带
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    3.flume读取生产kafka配置

    ## 组件
    a1.channels = c1
    a1.sinks = k1
    
    ## channel1  使用kafkachannel省去source
    ## 我们生产的kafka日产2亿条日志数据,使用以下配置未出现消息堆积情况
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.parseAsFlumeEvent = false
    a1.channels.c1.kafka.bootstrap.servers = *:9092,*:9092,*:9092
    a1.channels.c1.kafka.topic = topic
    a1.channels.c1.kafka.consumer.group.id = topic_group
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 15
    
    ## sink1
    a1.sinks.k1.hdfs.filePrefix = 105-
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/robot/db/robot_behavior/%Y-%m-%d
    a1.sinks.k1.hdfs.round = false
    
    ##hdfs落盘
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.rollSize = 393216000
    a1.sinks.k1.hdfs.batchSize = 5000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    ## 控制输出文件是原生文件
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    4.flume针对生产业务的配置(数据不能丢)

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 20000
    a1.sources.r1.batchDurationMillis = 1000
    a1.sources.r1.kafka.bootstrap.servers = *:9092,*:9092,*:9092
    a1.sources.r1.kafka.topics = topic
    a1.sources.r1.kafka.consumer.group.id = topic_group
    
    
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /cluster/vdb/flume/checkpoint/robot
    a1.channels.c1.dataDirs = /cluster/vdb/flume/data/robot/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.transactionCapacity = 20000
    a1.channels.c1.checkpointInterval = 60000
    a1.channels.c1.keep-alive = 15
    
    
    ##hdfs
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/robot/db/roboterror_behavior/%Y-%m-%d
    a1.sinks.k1.hdfs.round = false
    
    ##hdfs落盘
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.rollSize = 393216000
    a1.sinks.k1.hdfs.batchSize = 5000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.minBlockReplicas = 1
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    
    ## 控制输出文件是原生文件
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = lzop
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    以上配置可以直接拿来测试和使用,如果还有想了解生产环境下flume配置信息的可以留言。

  • 相关阅读:
    MFC Windows 程序设计[258]之枚举控件集合(附源码)
    word-break: break-all VS word-wrap: break-word
    arouter拦截器内路由跳转--postcard.setDestination
    【广州华锐视点】VR飞行员驾驶模拟实训系统
    Codeforces Round 734
    知识问答产品利器:文本分段器实现自动知识加工
    Jenkins实战:docker compose 搭建Jenkins
    Windows应急响应排查
    【Java从入门到大牛】多线程
    GIN 介绍
  • 原文地址:https://blog.csdn.net/qq_37698495/article/details/127850969