目录
(1)需求
用Flume采集服务器本地日志,按照日志类型的不同,将不同种类的日志发往不同的分析系统。
(2)需求分析
一台服务器产生的日志类型有很多种,不同类型的日志需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing结构。
Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中。我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
在该案例中,我们以端口数据模拟日志,以是否包含”jeffry”模拟不同类型的日志,我们需要自定义interceptor区分数据中是否包含”jeffry”,将其分别发往不同的分析系统(Channel)。

注:Multiplexing(多路复用);Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。
(3)实现
1)创建maven项目
2)加入依赖
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.9.0</version>
- </dependency>
3)定义TypeInterceptor类并实现interceptor接口
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
-
- public class TypeInterceptor implements Interceptor {
- //声明一个存放事件的集合
- private List<Event> addHerderEvents;
-
- @Override
- public void initialize() {
- //初始化存放事件的集合
- addHerderEvents = new ArrayList<>();
-
- }
-
- //单个事件拦截
- @Override
- public Event intercept(Event event) {
- //1.获取事件中的头部信息
- Map<java.lang.String, java.lang.String> headers = event.getHeaders();
- //2.获取事件中的body信息
- String body = new String(event.getBody());
- //3.根据body中是否有“jeffry”来决定添加怎么样的头部信息
- if (body.contains("jeffry")) {
- //4.添加头部信息
- headers.put("type","first");
- } else {
- //5.添加头部信息
- headers.put("type","second");
- }
- return event;
- }
-
- //批量事件拦截
- @Override
- public List<Event> intercept(List<Event> events) {
- //1.清空事件
- addHerderEvents.clear();
- //2.遍历events
- for (Event event:events) {
- //给每一个事件添加头部信息
- addHerderEvents.add(intercept(event));
- }
- //返回结果
- return addHerderEvents;
- }
-
- @Override
- public void close() {
-
- }
-
- public static class Builder implements Interceptor.Builder {
-
- @Override
- public Interceptor build() {
- return new TypeInterceptor();
- }
-
- @Override
- public void configure(Context context) {
-
- }
- }
- }
4)编辑flume配置文件
为hadoop01上的Flume1配置1个netcat source,1个 sink group(2个avro sink),并配置相应的 ChannelSelector和interceptor。
- # agent
- a1.sources = r1
- a1.sinks = k1 k2
- a1.channels = c1 c2
-
- # source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = hadoop01
- a1.sources.r1.port = 44444
-
- a1.sources.r1.interceptors = i1
- a1.sources.r1.interceptors.i1.type =
- com.zj.flume.interceptor.TypeInterceptor$Builder
-
- a1.sources.r1.selector.type = multiplexing
- a1.sources.r1.selector.header = type
- a1.sources.r1.selector.mapping.first = c1
- a1.sources.r1.selector.mapping.second = c2
-
- # sink
- a1.sinks.k1.type = avro
- a1.sinks.k1.hostname = hadoop02
- a1.sinks.k1.port = 4141
- a1.sinks.k2.type=avro
- a1.sinks.k2.hostname = hadoop03
- a1.sinks.k2.port = 4242
-
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # channel
- a1.channels.c2.type = memory
- a1.channels.c2.capacity = 1000
- a1.channels.c2.transactionCapacity = 100
-
- # source and sink to the channel
- a1.sources.r1.channels = c1 c2
- a1.sinks.k1.channel = c1
- a1.sinks.k2.channel = c2
为hadoop02上的Flume2配置一个avro source和一个logger sink。
- # agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # source
- a1.sources.r1.type = avro
- a1.sources.r1.bind = hadoop02
- a1.sources.r1.port = 4141
-
- # sink
- a1.sinks.k1.type = logger
-
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # source and sink to channel
- a1.sinks.k1.channel = c1
- a1.sources.r1.channels = c1
为hadoop03上的Flume3配置一个avro source和一个logger sink。
- # agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # source
- a1.sources.r1.type = avro
- a1.sources.r1.bind = hadoop03
- a1.sources.r1.port = 4242
-
- # sink
- a1.sinks.k1.type = logger
-
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # source and sink to channel
- a1.sinks.k1.channel = c1
- a1.sources.r1.channels = c1
(5)分别在hadoop01,hadoop02,hadoop03上启动flume进程,注意先后顺序。
(6)在 hadoop01使用netcat向hadoop01:44444 发送字母和数字。
(7)观察hadoop02和hadoop03打印的日志。
介绍:
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。
官方也提供了自定义source的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义MySource需要继承AbstractSource类并实现Configurable和PollableSource接口。
实现相应方法:
getBackOffSleepIncrement() //backoff 步长
getMaxBackOffSleepInterval()//backoff最长时间
configure(Context context)//初始化context(读取配置文件内容)
process()//获取数据封装成event并写入channel,这个方法将被循环调用。
使用场景:读取MySQL数据或者其他文件系统。
(1)需求
使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置。
(2)需求分析
configure(Context context):读取配置文件(XX.conf)中的配置信息;
process():接收数据,将数据封装成一个个的Event,写入Channel,使用for循环模拟数据生成。for(int i=0;i<5;i++);
getBackOffSleepIncrement():暂不用;
getMaxBackOffSleepInterval():暂不用;
(3)实现
1)导入依赖
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
- <version>1.9.0</version>
- </dependency>
2)编码
- import org.apache.flume.Context;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.PollableSource;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.event.SimpleEvent;
- import org.apache.flume.source.AbstractSource;
-
- import java.util.HashMap;
-
- public class MySource extends AbstractSource implements Configurable, PollableSource {
- //定义配置文件将要读取的字段
- private Long delay;
- private String field;
-
- //初始化配置信息
- @Override
- public void configure(Context context) {
- delay = context.getLong("delay");
- field = context.getString("field","hello!");
- }
-
- @Override
- public Status process() throws EventDeliveryException {
- try {
- //创建事件头部信息
- HashMap
hearderMap = new HashMap<>(); - //创建事件
- SimpleEvent event = new SimpleEvent();
- //循环封装事件
- for (int i = 0;i < 5;i++) {
- //给事件设置头信息
- event.setHeaders(hearderMap);
- //给事件设置内存
- event.setBody((field + i).getBytes());
- //将事件写入channel
- getChannelProcessor().processEvent(event);
- Thread.sleep(delay);
- }
- } catch (Exception e) {
- e.printStackTrace();
- return Status.BACKOFF;
- }
- return Status.READY;
- }
-
- @Override
- public long getBackOffSleepIncrement() {
- return 0;
- }
-
- @Override
- public long getMaxBackOffSleepInterval() {
- return 0;
- }
- }
(4)测试
1)打包
将写好的代码打包,并放到flume的lib目录下。
2)配置文件
- # agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # source
- a1.sources.r1.type = com.zj.source.MySource
- a1.sources.r1.delay = 1000
- #a1.sources.r1.field = jeffry
-
- # sink
- a1.sinks.k1.type = logger
-
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
3)开始任务
- bin/flume-ng agent -c conf/ -f conf/mysource.conf
- -n a1 -Dflume.root.logger=INFO,console
结果
介绍:
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。
官方提供的Sink类型有时候不能满足实际开发中的需求,此时我们就需要根据实际需求自定义某些Sink。
官方提供了自定义sink的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink需要继承AbstractSink类并实现Configurable接口。
实现相应方法:
configure(Context context)//初始化context(读取配置文件内容)
process()//从Channel读取获取数据(event),这个方法将被循环调用。
使用场景:读取Channel数据写入MySQL或者其他文件系统。
(1)需求
用flume接收数据,并在Sink端给每条数据添加前缀和后缀,输出到控制台。前后缀可在flume任务配置文件中配置。
(2)需求分析
configure():读取任务配置文件中的配置信息。
process():从Channel中取数据,添加前后缀,写入日志。
数据流:ABC->ABC->hello:ABC:hello
(3)实现
- import org.apache.flume.*;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.sink.AbstractSink;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- public class MySink extends AbstractSink implements Configurable {
- //创建logger对象
- private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
- private String prefix;
- private String suffix;
-
- @Override
- public Status process() throws EventDeliveryException {
- //声明返回值状态信息
- Status status;
- //获取当前sink绑定的channel
- Channel ch = getChannel();
- //获取事务
- Transaction txn = ch.getTransaction();
- //声明事件
- Event event;
- //开启事件
- txn.begin();
- //读取channel中的事件,直到读取到事件结束循环
- while (true) {
- event = ch.take();
- if (event != null) {
- break;
- }
- }
- try {
- //处理事件(打印)
- LOG.info(prefix + new String(event.getBody()) + suffix);
- //事务提交
- txn.commit();
- status = Status.READY;
- } catch (Exception e) {
- //遇到异常,事务回滚
- txn.rollback();
- status = Status.BACKOFF;
- } finally {
- //关闭事务
- txn.close();
- }
- return status;
- }
-
- @Override
- public void configure(Context context) {
- //读取配置文件内容,有默认值
- prefix = context.getString("prefix","hello:");
- //读取配置文件内容,无默认值
- suffix = context.getString("suffix");
-
- }
- }
(4)测试
1)打包
将写好的代码打包,并放到flume的lib目录下。
2)配置文件
- # agent
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
-
- # source
- a1.sources.r1.type = netcat
- a1.sources.r1.bind = hadoop01
- a1.sources.r1.port = 44444
-
- # sink
- a1.sinks.k1.type = com.zj.sink.MySink
- #a1.sinks.k1.prefix = jeffry:
- a1.sinks.k1.suffix = :jeffry
-
- # channel
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000
- a1.channels.c1.transactionCapacity = 100
-
- # source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
3)开启任务
- bin/flume-ng agent -c conf/ -f conf/mysink.conf
- -n a1 -Dflume.root.logger=INFO,console
$ nc hadoop01 44444
结果
本文为学习笔记!!!