Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。Flume构建在日志流之上一个简单灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。使用Flume这套架构实现对日志流数据的实时在线分析。Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。本次课程使用的是apache-flume-1.9.0-bin.tar.gz
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2qzeqgFT-1629453218230)(assets/20201021093100281.png)]
[root@CentOS ~]# tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
[root@CentOS ~]# cd /usr/apache-flume-1.9.0-bin/
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng version
Flume 1.9.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: d4fcab4f501d41597bc616921329a4339f73585e
Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
From source with checksum 35db629a3bda49d23e9b3690c80737f9
# 声明组件信息
.sources =
.sinks =
.channels =
# 组件配置
.sources.
模板结构是必须掌握的,掌握该模板的目的是为了便于后期的查阅和配置。
、
、
、
表示组件的名字,系统有哪些可以使用的组件需要查阅文档.
查阅
:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
①helloword.properties
单个Agent的配置,将该配置文件放置在flume安装目录下的conf目录下。
# 声明基本组件 Source Channel Sink
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
1、安装一下
yum -y install nmap-ncat
,这样方便后续的测试。
2、需要安装yum -y install telnet
,方便做测试。
②启动a1 采集组件
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/helloword.properties -Dflume.root.logger=INFO,console
附注启动命令参数
Usage: ./bin/flume-ng [options]...
commands:
help display this help text
agent run a Flume agent
avro-client run an avro Flume client
version show Flume version info
global options:# 全局属性
--conf,-c use configs in directory
--classpath,-C append to the classpath
--dryrun,-d do not actually start Flume, just print the command
--plugins-path colon-separated list of plugins.d directories. See the
plugins.d section in the user guide for more details.
Default: $FLUME_HOME/plugins.d
-Dproperty=value sets a Java system property value
-Xproperty=value sets a Java -X option
agent options:
--name,-n the name of this agent (required)
--conf-file,-f specify a config file (required if -z missing)
--zkConnString,-z specify the ZooKeeper connection to use (required if -f missing)
--zkBasePath,-p specify the base path in ZooKeeper for agent configs
--no-reload-conf do not reload config file if changed
--help,-h display help text
avro-client options:
--rpcProps,-P RPC client properties file with server connection params
--host,-H hostname to which events will be sent
--port,-p port of the avro source
--dirname directory to stream to avro source
--filename,-F text file to stream to avro source (default: std input)
--headerFile,-R File containing event headers as key/value pairs on each new line
--help,-h display help text
Either --rpcProps or both --host and --port must be specified.
Note that if directory is specified, then it is always included first
in the classpath.
③测试a1
[root@CentOS apache-flume-1.9.0-bin]# telnet CentOS 44444
Trying 192.168.52.134...
Connected to CentOS.
Escape character is '^]'.
hello world
2020-02-05 11:44:43,546 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D hello world. }
通常用于远程采集数据(RPC服务),内部启动一个Avro 服务器,用于接收来自Avro Client的请求,并且将接收数据存储到Chanel中。
属性 | 默认值 | 含义 |
---|---|---|
channels | 需要对接Channel | |
type | 表示组件类型,必须给avro | |
bind | 绑定IP | |
port | 绑定监听端口 |
#声明组件
a1.sources = s1
# 配置组件
a1.sources.s1.type = avro
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 对接channel
a1.sources.s1.channels = c1
.sources =
# 组件配置
.sources.. =
# 声明基本组件 Source Channel Sink example2.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = avro
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example2.properties -Dflume.root.logger=INFO,console
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng avro-client --host CentOS --port 44444 --filename /root/t_employee
可以将指令在控制台输出采集过来。通常需要将Flume的agent目标采集服务部署在一起。
属性 | 默认值 | 描述 |
---|---|---|
channels | 需要对接Channel | |
type | 必须指定为exec | |
command | 要执行的命令 |
# 声明基本组件 Source Channel Sink example3.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /root/t_user
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example3.properties -Dflume.root.logger=INFO,console
[root@CentOS ~]# tail -f t_user
采集静态目录下,新增文本文件,采集完成后会修改文件后缀,但是不会删除采集的源文件,如果用户只想采集一次,可以修改该source默认行为。通常需要将Flume的agent目标采集服务部署在一起。
属性 | 默认值 | 说明 |
---|---|---|
channels | 对接的Channel | |
type | 必须修改为spooldir | |
spoolDir | 给定需要采集的目录 | |
fileSuffix | .COMPLETED | 使用该值修改采集完成文件名 |
deletePolicy | never | 可选值never /immediate |
includePattern | ^.*$ | 表示匹配所有文件 |
ignorePattern | ^$ | 表示不匹配的文件 |
# 声明基本组件 Source Channel Sink example4.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /root/spooldir
a1.sources.s1.fileHeader = true
a1.sources.s1.deletePolicy = immediate
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example4.properties -Dflume.root.logger=INFO,console
实时监测动态文本行的追加,并且记录采集的文件读取的位置了偏移量,即使下一次再次采集,可以实现增量采集。通常需要将Flume的agent目标采集服务部署在一起。
属性 | 默认值 | 说明 |
---|---|---|
channels | 对接的通道 | |
type | 必须指定为TAILDIR | |
filegroups | 以空格分隔的文件组列表。 | |
filegroups. | 文件组的绝对路径。正则表达式(而非文件系统模式)只能用于文件名。 | |
positionFile | ~/.flume/taildir_position.json | 记录采集文件的位置信息,实现增量采集 |
# 声明基本组件 Source Channel Sink example5.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = TAILDIR
a1.sources.s1.filegroups = g1 g2
a1.sources.s1.filegroups.g1 = /root/taildir/.*\.log$
a1.sources.s1.filegroups.g2 = /root/taildir/.*\.java$
a1.sources.s1.headers.g1.type = log
a1.sources.s1.headers.g2.type = java
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example5.properties -Dflume.root.logger=INFO,console
参数 | 默认值 | 说明 |
---|---|---|
channels | ||
type | 必须为org.apache.flume.source.kafka.KafkaSource | |
kafka.topics | Kafka使用者将从中读取消息的主题的逗号分隔列表。 | |
kafka.bootstrap.servers | 来源使用的Kafka集群中的Broker列表 | |
kafka.topics.regex | 正则表达式,用于定义订阅源的主题集。此属性的优先级高于kafka.topics,并且覆盖kafka.topics(如果存在)。 | |
batchSize | 1000 | 批量写入通道的最大消息数 |
# 声明基本组件 Source Channel Sink example9.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 100
a1.sources.s1.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.servers = CentOS:9092
a1.sources.s1.kafka.topics = topic01
a1.sources.s1.kafka.consumer.group.id = g1
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example9.properties -Dflume.root.logger=INFO,console
通常用于测试/调试目的。
可以将采集的数据写入到本地文件
# 声明基本组件 Source Channel Sink example6.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll
a1.sinks.sk1.sink.rollInterval = 0
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example6.properties
可以将数据写入到HDFS文件系统。
# 声明基本组件 Source Channel Sink example7.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = hdfs
a1.sinks.sk1.hdfs.path = /flume-hdfs/%y-%m-%d
a1.sinks.sk1.hdfs.rollInterval = 0
a1.sinks.sk1.hdfs.rollSize = 0
a1.sinks.sk1.hdfs.rollCount = 0
a1.sinks.sk1.hdfs.useLocalTimeStamp = true
a1.sinks.sk1.hdfs.fileType = DataStream
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
将数据写入Kafka的Topic中
# 声明基本组件 Source Channel Sink example8.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sk1.kafka.bootstrap.servers = CentOS:9092
a1.sinks.sk1.kafka.topic = topic01
a1.sinks.sk1.kafka.flumeBatchSize = 20
a1.sinks.sk1.kafka.producer.acks = 1
a1.sinks.sk1.kafka.producer.linger.ms = 1
a1.sinks.sk1.kafka.producer.compression.type = snappy
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DJfyaTTW-1629453218234)(assets/image-20201021114843362.png)]
# 声明基本组件 Source Channel Sink example10.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 100
a1.sources.s1.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.servers = CentOS:9092
a1.sources.s1.kafka.topics = topic01
a1.sources.s1.kafka.consumer.group.id = g1
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = avro
a1.sinks.sk1.hostname = CentOS
a1.sinks.sk1.port = 44444
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
# 声明基本组件 Source Channel Sink example9.properties
a2.sources = s1
a2.sinks = sk1
a2.channels = c1
# 配置Source组件,从Socket中接收文本数据
a2.sources.s1.type = avro
a2.sources.s1.bind = CentOS
a2.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a2.sinks.sk1.type = file_roll
a2.sinks.sk1.sink.directory = /root/file_roll
a2.sinks.sk1.sink.rollInterval = 0
# 配置Channel通道,主要负责数据缓冲
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a2.sources.s1.channels = c1
a2.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --conf-file conf/example10.properties --name a2
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --conf-file conf/example10.properties --name a1
[root@CentOS kafka_2.11-2.2.0]# ./bin/kafka-console-producer.sh --broker-list CentOS:9092 --topic topic01
快
将Source数据直接写入内存,不安全,可能会导致数据丢失。
参数 | 默认值 | 说明 |
---|---|---|
type | 只可以写memory | |
capacity | 100 | 通道中存储的最大事件数 |
transactionCapacity | 100 | 每一次source或者Sink组件写入Channel或者读取Channel的批量大小 |
transactionCapacity <= capacity
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
参数 | 默认值 | 说明 |
---|---|---|
type | 组件类型名称,必须为jdbc | |
db.type | DERBY | 数据库供应商,必须是DERBY。 |
事件存储在数据库支持的持久性存储中。 JDBC通道当前支持嵌入式Derby。这是一种持久通道,非常适合可恢复性很重要的流程。-存储非常重要的数据,的时候可以使用jdbc channel
a1.channels.c1.type = jdbc
1、如果用户配置HIVE_HOME环境,需要用户移除hive的lib下的derby或者flume的lib下的derby(仅仅删除一方即可)
2、默认情况下,flume使用的是复制|广播模式的通道选择器。
参数 | 默认值 | 说明 |
---|---|---|
type | 组件类型名称,必须为org.apache.flume.channel.kafka.KafkaChannel | |
kafka.bootstrap.servers | 该通道使用的Kafka集群中的Broker列表。 | |
kafka.topic | flume-channel | 该频道将使用的Kafka主题 |
kafka.consumer.group.id | flume | Consumer用于向Kafka注册的消费者组ID |
将Source采集的数据写入外围系统的Kafka集群。
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = CentOS:9092
a1.channels.c1.kafka.topic = topic_channel
a1.channels.c1.kafka.consumer.group.id = g1
# 声明基本组件 Source Channel Sink example10.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = CentOS:9092
a1.channels.c1.kafka.topic = topic_channel
a1.channels.c1.kafka.consumer.group.id = g1
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
参数 | 默认值 | 说明 |
---|---|---|
type | 组件类型名称,必须是file | |
checkpointDir | ~/.flume/file-channel/checkpoint | 将存储检查点文件的目录 |
dataDirs | ~/.flume/file-channel/data | 用逗号分隔的目录列表,用于存储日志文件 |
使用文件系统作为通道的实现,能够实现对缓冲数据的持久化。
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/flume/checkpoint
a1.channels.c1.dataDirs = /root/flume/data
作用于Source组件,对Source封装的Event数据进行拦截
或者是装饰
,Flume内建了许多拦截器:
测试装饰拦截器
# 声明基本组件 Source Channel Sink example11.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 添加拦截器
a1.sources.s1.interceptors = i1 i2 i3 i4 i5 i6
a1.sources.s1.interceptors.i1.type = timestamp
a1.sources.s1.interceptors.i2.type = host
a1.sources.s1.interceptors.i3.type = static
a1.sources.s1.interceptors.i3.key = from
a1.sources.s1.interceptors.i3.value = baizhi
a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.s1.interceptors.i4.headerName = uuid
a1.sources.s1.interceptors.i5.type = remove_header
a1.sources.s1.interceptors.i5.withName = from
a1.sources.s1.interceptors.i6.type = search_replace
a1.sources.s1.interceptors.i6.searchPattern = ^jiangzz
a1.sources.s1.interceptors.i6.replaceString = baizhi
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/example11.properties -Dflume.root.logger=INFO,console
测试过滤和抽取拦截器
# 声明基本组件 Source Channel Sink example12.properties
a1.sources = s1
a1.sinks = sk1
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 添加拦截器
a1.sources.s1.interceptors = i1 i2
a1.sources.s1.interceptors.i1.type = regex_extractor
a1.sources.s1.interceptors.i1.regex = ^(INFO|ERROR)
a1.sources.s1.interceptors.i1.serializers = s1
a1.sources.s1.interceptors.i1.serializers.s1.name = loglevel
a1.sources.s1.interceptors.i2.type = regex_filter
a1.sources.s1.interceptors.i2.regex = .*baizhi.*
a1.sources.s1.interceptors.i2.excludeEvents = false
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = logger
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
当一个Source组件对接多个Channel组件的时候,通道选择器
决定了Source的数据如何路由到Channel中,如果用户不指定通道选择器,默认系统会将Source数据广播给所有的Channel(默认使用replicating模式)。
# 声明基本组件 Source Channel Sink example13.properties
a1.sources = s1
a1.sinks = sk1 sk2
a1.channels = c1 c2
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll_1
a1.sinks.sk1.sink.rollInterval = 0
a1.sinks.sk2.type = file_roll
a1.sinks.sk2.sink.directory = /root/file_roll_2
a1.sinks.sk2.sink.rollInterval = 0
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = jdbc
# 进行组件间的绑定
a1.sources.s1.channels = c1 c2
a1.sinks.sk1.channel = c1
a1.sinks.sk2.channel = c2
等价写法:
# 声明基本组件 Source Channel Sink example14.properties
a1.sources = s1
a1.sinks = sk1 sk2
a1.channels = c1 c2
# 通道选择器 复制模式
a1.sources.s1.selector.type = replicating
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll_1
a1.sinks.sk1.sink.rollInterval = 0
a1.sinks.sk2.type = file_roll
a1.sinks.sk2.sink.directory = /root/file_roll_2
a1.sinks.sk2.sink.rollInterval = 0
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = jdbc
# 进行组件间的绑定
a1.sources.s1.channels = c1 c2
a1.sinks.sk1.channel = c1
a1.sinks.sk2.channel = c2
# 声明基本组件 Source Channel Sink example15.properties
a1.sources = s1
a1.sinks = sk1 sk2
a1.channels = c1 c2
# 通道选择器 复制模式
a1.sources.s1.selector.type = multiplexing
a1.sources.s1.selector.header = level
a1.sources.s1.selector.mapping.INFO = c1
a1.sources.s1.selector.mapping.ERROR = c2
a1.sources.s1.selector.default = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = regex_extractor
a1.sources.s1.interceptors.i1.regex = ^(INFO|ERROR)
a1.sources.s1.interceptors.i1.serializers = s1
a1.sources.s1.interceptors.i1.serializers.s1.name = level
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll_1
a1.sinks.sk1.sink.rollInterval = 0
a1.sinks.sk2.type = file_roll
a1.sinks.sk2.sink.directory = /root/file_roll_2
a1.sinks.sk2.sink.rollInterval = 0
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = jdbc
# 进行组件间的绑定
a1.sources.s1.channels = c1 c2
a1.sinks.sk1.channel = c1
a1.sinks.sk2.channel = c2
这里需要删除hive安装目录下的derby的驱动jar!
Flume使用Sink Group将多个Sink实例封装成一个逻辑的Sink组件,内部通过Sink Processors实现Sink Group的故障和负载均衡。
# 声明基本组件 Source Channel Sink example16.properties
a1.sources = s1
a1.sinks = sk1 sk2
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll_1
a1.sinks.sk1.sink.rollInterval = 0
a1.sinks.sk1.sink.batchSize = 1
a1.sinks.sk2.type = file_roll
a1.sinks.sk2.sink.directory = /root/file_roll_2
a1.sinks.sk2.sink.rollInterval = 0
a1.sinks.sk2.sink.batchSize = 1
# 配置Sink Porcessors
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sk1 sk2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
a1.sinks.sk2.channel = c1
如果想看到负载均衡效果,
sink.batchSize
和transactionCapacity
必须配置成1
# 声明基本组件 Source Channel Sink example17.properties
a1.sources = s1
a1.sinks = sk1 sk2
a1.channels = c1
# 配置Source组件,从Socket中接收文本数据
a1.sources.s1.type = netcat
a1.sources.s1.bind = CentOS
a1.sources.s1.port = 44444
# 配置Sink组件,将接收数据打印在日志控制台
a1.sinks.sk1.type = file_roll
a1.sinks.sk1.sink.directory = /root/file_roll_1
a1.sinks.sk1.sink.rollInterval = 0
a1.sinks.sk1.sink.batchSize = 1
a1.sinks.sk2.type = file_roll
a1.sinks.sk2.sink.directory = /root/file_roll_2
a1.sinks.sk2.sink.rollInterval = 0
a1.sinks.sk2.sink.batchSize = 1
# 配置Sink Porcessors
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sk1 sk2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.sk1 = 20
a1.sinkgroups.g1.processor.priority.sk2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# 配置Channel通道,主要负责数据缓冲
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1
# 进行组件间的绑定
a1.sources.s1.channels = c1
a1.sinks.sk1.channel = c1
a1.sinks.sk2.channel = c1
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-sdkartifactId>
<version>1.9.0version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.12version>
<scope>testscope>
dependency>
参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests {
private RpcClient client;
@Before
public void before(){
client= RpcClientFactory.getDefaultInstance("CentOS",44444);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","baizhi");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
①故障转移
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests02_FailoverClient {
private RpcClient client;
@Before
public void before(){
Properties props = new Properties();
props.put("client.type", "default_failover");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
props.put("hosts.h1", "CentOSA:44444");
props.put("hosts.h2","CentOSB:44444");
props.put("hosts.h3", "CentOSC:44444");
client= RpcClientFactory.getInstance(props);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","zhangsan");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
②负载均衡
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests02_LoadBalancing {
private RpcClient client;
@Before
public void before(){
Properties props = new Properties();
props.put("client.type", "default_loadbalance");
// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");
// host/port pair for each host alias
props.put("hosts.h1", "CentOSA:44444");
props.put("hosts.h2", "CentOSB:44444");
props.put("hosts.h3", "CentOSC:44444");
props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.
props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms
client= RpcClientFactory.getInstance(props);
}
@Test
public void testSend() throws EventDeliveryException {
Event event= EventBuilder.withBody("this is body".getBytes());
HashMap<String, String> header = new HashMap<String, String>();
header.put("from","lisi");
event.setHeaders(header);
client.append(event);
}
@After
public void after(){
client.close();
}
}
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-sdkartifactId>
<version>1.9.0version>
dependency>
<dependency>
<groupId>org.apache.flume.flume-ng-clientsgroupId>
<artifactId>flume-ng-log4jappenderartifactId>
<version>1.9.0version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.5version>
dependency>
<dependency>
<groupId>junitgroupId>
<artifactId>junitartifactId>
<version>4.12version>
<scope>testscope>
dependency>
log4j.appender.flume= org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume.Hosts = CentOSA:44444 CentOSB:44444 CentOSC:44444
log4j.appender.flume.Selector = RANDOM
log4j.logger.com.baizhi = DEBUG,flume
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%p %d %c %m %n
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestLog {
private static Log log= LogFactory.getLog(TestLog.class);
public static void main(String[] args) {
log.debug("你好!_debug");
log.info("你好!_info");
log.warn("你好!_warn");
log.error("你好!_error");
}
}
参考:https://github.com/gilt/logback-flume-appender
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.1.5.RELEASEversion>
parent>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starterartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
<scope>testscope>
dependency>
<dependency>
<groupId>org.apache.flumegroupId>
<artifactId>flume-ng-sdkartifactId>
<version>1.9.0version>
dependency>
dependencies>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
<encoder>
<pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%npattern>
<charset>UTF-8charset>
encoder>
appender>
<appender name="flume" class="com.gilt.logback.flume.FlumeLogstashV1Appender">
<flumeAgents>
CentOS:44444,
CentOS:44444,
CentOS:44444
flumeAgents>
<flumeProperties>
connect-timeout=4000;
request-timeout=8000
flumeProperties>
<batchSize>1batchSize>
<reportingWindow>1reportingWindow>
<additionalAvroHeaders>
myHeader=myValue
additionalAvroHeaders>
<application>smapleappapplication>
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%npattern>
layout>
appender>
<root level="ERROR">
<appender-ref ref="STDOUT" />
root>
<logger name="com.baizhi.service" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="flume" />
logger>
configuration>
import com.baizhi.service.IUserSerivice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class UserService implements IUserSerivice {
private static final Logger LOG= LoggerFactory.getLogger(UserService.class);
@Override
public String sayHello(String name) {
LOG.info("hello "+name);
return "hello "+name;
}
}
@SpringBootApplication
public class FlumeAplication {
public static void main(String[] args) {
SpringApplication.run(FlumeAplication.class,args);
}
}
@SpringBootTest(classes = {KafkaSpringBootApplication.class})
@RunWith(SpringRunner.class)
public class KafkaTempolateTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private IOrderService orderService;
@Test
public void testOrderService(){
orderService.saveOrder("002","baizhi xxxxx ");
}
@Test
public void testKafkaTemplate(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
}
});
}
}
.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class UserService implements IUserSerivice {
private static final Logger LOG= LoggerFactory.getLogger(UserService.class);
@Override
public String sayHello(String name) {
LOG.info("hello "+name);
return "hello "+name;
}
}
```java
@SpringBootApplication
public class FlumeAplication {
public static void main(String[] args) {
SpringApplication.run(FlumeAplication.class,args);
}
}
@SpringBootTest(classes = {KafkaSpringBootApplication.class})
@RunWith(SpringRunner.class)
public class KafkaTempolateTests {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private IOrderService orderService;
@Test
public void testOrderService(){
orderService.saveOrder("002","baizhi xxxxx ");
}
@Test
public void testKafkaTemplate(){
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
}
});
}
}