| 1.Flume概述 2.Flume安装部署 3.案例1 4.案例2 5.案例3 |
1.1 Flume定义Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单。
|
1.2 Flume基础架构Flume组成架构如下图所示。
1.2.1 AgentAgent是一个JVM进程,它以事件的形式将数据从源头送至目的。 Agent主要有3个部分组成,Source、Channel、Sink。 1.2.2 SourceSource是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。 1.2.3 SinkSink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。 Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 1.2.4 ChannelChannel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。 Flume自带两种Channel:Memory Channel和File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。 File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。 1.2.5 Event传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组。
|
| 在Flume中,"event"是指数据流中的一条记录或事件。它可以是一个文本行、一个JSON对象、一个日志条目或任何其他形式的数据。每个event都包含一个payload(有效负载)和一组可选的header(头部)。
自定义拦截器 实现功能(控制条输入hello 到hdfs hi到kafka 其他控制台输出)
导入依赖
|
| Java代码
|
| 代码解释 在该示例中,拦截器名为InterceptorDemo,实现了Interceptor接口,其中包含以下方法: initialize(): 初始化方法,在这里创建了一个用于存储处理后事件的数据结构,即ArrayList intercept(Event event): 该方法用于处理单个事件,根据事件内容进行判断和处理,将对应的类型信息添加到事件的header中,然后返回该事件。 intercept(List close(): 关闭方法,清空opList并将其置为空。 在该示例中,还包含一个静态内部类Builder,该类实现了Interceptor.Builder接口,用于构建和配置拦截器。在当前示例中,Builder类的方法没有实际操作,因此configure(Context context)留空,build()方法返回InterceptorDemo的一个新实例。 通过使用这个拦截器示例,可以在Flume的配置中指定使用该拦截器来进行事件的预处理和转换操作。 |
| 打包jar
放到lib目录下
|
| 新建文件
|
| 编辑
myinterceptor.sources=s1 myinterceptor.channels=helloChannel hiChannel otherChannel myinterceptor.sinks=helloSink hiSink otherSink myinterceptor.sources.s1.type=netcat myinterceptor.sources.s1.bind=localhost myinterceptor.sources.s1.port=7777 myinterceptor.sources.s1.interceptors=myinterceptors #java类的路径 myinterceptor.sources.s1.interceptors.myinterceptors.type=org.example.InterceptorDemo$Builder myinterceptor.sources.s1.selector.type=multiplexing myinterceptor.sources.s1.selector.mapping.hello=helloChannel myinterceptor.sources.s1.selector.mapping.hi=hiChannel myinterceptor.sources.s1.selector.mapping.other=otherChannel myinterceptor.sources.s1.selector.header=type myinterceptor.channels.helloChannel.type=memory myinterceptor.channels.hiChannel.type=memory myinterceptor.channels.otherChannel.type=memory myinterceptor.sinks.helloSink.type=hdfs myinterceptor.sinks.helloSink.hdfs.fileType=DataStream myinterceptor.sinks.helloSink.hdfs.filePrefix=helloContent myinterceptor.sinks.helloSink.hdfs.fileSuffix=.txt myinterceptor.sinks.helloSink.hdfs.path=hdfs://192.168.3.129:9000/kb23hello/ myinterceptor.sinks.hiSink.type=org.apache.flume.sink.kafka.KafkaSink myinterceptor.sinks.hiSink.topic=hitopic #myinterceptor.sinks.hiSink.batchSize=640 myinterceptor.sinks.hiSink.brokerList=192.168.3.129:9092 myinterceptor.sinks.otherSink.type=logger myinterceptor.sources.s1.channels=helloChannel hiChannel otherChannel myinterceptor.sinks.helloSink.channel=helloChannel myinterceptor.sinks.hiSink.channel=hiChannel myinterceptor.sinks.otherSink.channel=otherChannel 含义 以下是每个配置属性的含义: myinterceptor.sources:指定拦截器的数据源的名称,这里为"s1"。 myinterceptor.channels:指定拦截器的通道的名称,这里分别为"helloChannel"、"hiChannel"和"otherChannel"。 myinterceptor.sinks:指定拦截器的接收器的名称,这里分别为"helloSink"、"hiSink"和"otherSink"。 以下是每个组件的详细配置: myinterceptor.sources.s1:指定名为"s1"的数据源,使用netcat类型接收网络数据。 myinterceptor.channels.helloChannel、myinterceptor.channels.hiChannel和myinterceptor.channels.otherChannel:分别指定以内存为类型的通道用于数据传输。 myinterceptor.sinks.helloSink:指定名为"helloSink"的接收器,使用hdfs类型将数据输出到HDFS中。 myinterceptor.sinks.hiSink:指定名为"hiSink"的接收器,使用kafka类型将数据发送到Kafka主题。 myinterceptor.sinks.otherSink:指定名为"otherSink"的接收器,将数据输出到日志。 其他配置属性根据组件的类型和需求进行了相应的设置,如绑定地址、端口号、选择器类型、路径、主题名等。 总之,这个Flume配置文件在接收数据源的同时,使用内存通道将数据传递给不同类型的接收器,包括将数据输出到HDFS和Kafka主题,以及输出到日志。具体的业务逻辑和处理过程可能需要根据实际需求进行定制和配置。 |
| 新建kafka hitopic (启动hadoop 集群;zookeeper和kafka) kafka-topics.sh --create --zookeeper 192.168.3.129:2181 --topic hitopic --partitions 3 --replication-factor 1 kafka-topics.sh --zookeeper 192.168.3.129:2181 --list
|
| Flume190路径下执行命令 ./bin/flume-ng agent --name myinterceptor --conf ./conf/ --conf-file ./conf/myconf2/netcat-myinterceptor.conf -Dflume.root.logger=INFO,console |
| 启动端口 nc -lk 7777
|
| 开启客户端 telnet localhost 7777
|
| 检测结果(其他) 7777已经在控制台输出 |
| 新建文件,放置数据原 目录为
新建文件,放置checkpointDir和dataDirs 后面会解释
新建
|
| events.sources=eventsSource events.channels=eventsChannel events.sinks=eventsSink # Describe/configure the source events.sources.eventsSource.type=spooldir events.sources.eventsSource.spoolDir=/opt/kb23/flumelogfile/events #反序列化类型为LINE events.sources.eventsSource.deserializer=LINE #反序列化 每行的最大长度 events.sources.eventsSource.deserializer.maxLineLength=32000 events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv events.sources.eventsSource.interceptors=head_filter events.sources.eventsSource.interceptors.head_filter.type=regex_filter events.sources.eventsSource.interceptors.head_filter.regex=^event_id* events.sources.eventsSource.interceptors.head_filter.excludeEvents=true # Use a channel which buffers events in memory #events.channels.eventsChannel.type=memory events.channels.eventsChannel.type=file events.channels.eventsChannel.checkpointDir=/opt/kb23/checkpoint/events events.channels.eventsChannel.dataDirs=/opt/kb23/data/events # Describe the sink events.sinks.eventsSink.type=logger # Bind the source and sink to the channel events.sources.eventsSource.channels=eventsChannel events.sinks.eventsSink.channel=eventsChannel |
| 这段代码是一个Flume配置文件,用于描述一个简单的数据流程。它包含了以下组件配置: events.sources.eventsSource:指定了一个名为"eventsSource"的数据源,类型为spooldir,表示从指定的文件夹中读取数据。 events.sources.eventsSource.spoolDir:指定了待读取文件的目录路径。 events.sources.eventsSource.deserializer:指定了数据源的反序列化类型为LINE,表示每行是一个事件。 events.sources.eventsSource.includePattern:使用正则表达式来匹配待读取文件的名称模式。 events.sources.eventsSource.interceptors.head_filter:指定了一个名为"head_filter"的拦截器,用于根据事件的头部信息过滤事件。 events.sources.eventsSource.interceptors.head_filter.type:指定了"head_filter"拦截器的类型为regex_filter,表示使用正则表达式过滤器。 events.sources.eventsSource.interceptors.head_filter.regex:指定了正则表达式,以匹配以"event_id"开头的事件。 events.sources.eventsSource.interceptors.head_filter.excludeEvents:指定了是否排除匹配的事件。 events.channels.eventsChannel:指定了一个名为"eventsChannel"的通道,类型为file,表示使用文件系统进行缓存。 events.channels.eventsChannel.checkpointDir:指定了通道的检查点目录。 events.channels.eventsChannel.dataDirs:指定了通道的数据目录。 events.sinks.eventsSink:指定了一个名为"eventsSink"的接收器,类型为logger,表示将事件输出到日志。 events.sources.eventsSource.channels=eventsChannel:将数据源连接到通道。 events.sinks.eventsSink.channel=eventsChannel:将接收器连接到通道。 总之,这个Flume配置文件描述了一个数据流程,从指定文件夹中读取数据,并使用拦截器对事件进行过滤处理,然后将事件通过文件系统通道缓存,在接收器中将事件输出到日志中。具体的业务逻辑和处理过程可能需要根据实际需求进行定制和配置。 |
| 执行命令 flume190目录下 ./bin/flume-ng agent --name myinterceptor --conf ./conf/ --conf-file ./conf/myconf2/events-flume-logger.conf -Dflume.root.logger=INFO,console
|
| 控制台输出结果
|
| 完成后
检查对应目录
|
| 输出对应文件
|
| 上产源数据
|
| 新建
|
|
events.sources=eventsSource events.channels=eventsChannel events.sinks=eventsSink # Describe/configure the source events.sources.eventsSource.type=spooldir events.sources.eventsSource.spoolDir=/opt/kb23/flumelogfile/events # #反序列化类型为LINE events.sources.eventsSource.deserializer=LINE # #反序列化 每行的最大长度 events.sources.eventsSource.deserializer.maxLineLength=32000 events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv # #正则过滤器 events.sources.eventsSource.interceptors=head_filter events.sources.eventsSource.interceptors.head_filter.type=regex_filter events.sources.eventsSource.interceptors.head_filter.regex=^event_id* events.sources.eventsSource.interceptors.head_filter.excludeEvents=true # # Use a channel which buffers events in memory #events.channels.eventsChannel.type=memory events.channels.eventsChannel.type=file events.channels.eventsChannel.checkpointDir=/opt/kb23/checkpoint/events events.channels.eventsChannel.dataDirs=/opt/kb23/data/events # # # Describe the sink events.sinks.eventsSink.type=logger events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink events.sinks.eventsSink.topic=events events.sinks.eventsSink.brokerList=192.168.3.129:9092 events.sinks.eventsSink.batchSize=640 # # # # Bind the source and sink to the channel events.sources.eventsSource.channels=eventsChannel events.sinks.eventsSink.channel=eventsChannel ~ |
| 创建kafka topic kafka-topics.sh --create --zookeeper 192.168.3.129:2181 --topic events --partitions 3 --replication-factor 1 执行命令 flume190目录下
|
| 执行结果 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.3.129:9092 --topic events 过滤结果
Kafa 消息队列接收消息
|
| 检查
检查对应目录
|
| 输出对应文件
|