• flume笔记(三):自定义-自定义interceptor/自定义source/自定义sink


    目录

    自定义interceptor

    自定义source

    自定义sink


    自定义interceptor

    (1)需求

    用Flume采集服务器本地日志,按照日志类型的不同,将不同种类的日志发往不同的分析系统。

    (2)需求分析

    一台服务器产生的日志类型有很多种,不同类型的日志需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing结构。

    Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中。我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。

    在该案例中,我们以端口数据模拟日志,以是否包含”jeffry”模拟不同类型的日志,我们需要自定义interceptor区分数据中是否包含”jeffry”,将其分别发往不同的分析系统(Channel)。

     注:Multiplexing(多路复用);Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。

    (3)实现

    1)创建maven项目

    2)加入依赖

    1. <dependency>
    2. <groupId>org.apache.flume</groupId>
    3. <artifactId>flume-ng-core</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    3)定义TypeInterceptor类并实现interceptor接口

    1. import org.apache.flume.Context;
    2. import org.apache.flume.Event;
    3. import org.apache.flume.interceptor.Interceptor;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. import java.util.Map;
    7. public class TypeInterceptor implements Interceptor {
    8. //声明一个存放事件的集合
    9. private List<Event> addHerderEvents;
    10. @Override
    11. public void initialize() {
    12. //初始化存放事件的集合
    13. addHerderEvents = new ArrayList<>();
    14. }
    15. //单个事件拦截
    16. @Override
    17. public Event intercept(Event event) {
    18. //1.获取事件中的头部信息
    19. Map<java.lang.String, java.lang.String> headers = event.getHeaders();
    20. //2.获取事件中的body信息
    21. String body = new String(event.getBody());
    22. //3.根据body中是否有“jeffry”来决定添加怎么样的头部信息
    23. if (body.contains("jeffry")) {
    24. //4.添加头部信息
    25. headers.put("type","first");
    26. } else {
    27. //5.添加头部信息
    28. headers.put("type","second");
    29. }
    30. return event;
    31. }
    32. //批量事件拦截
    33. @Override
    34. public List<Event> intercept(List<Event> events) {
    35. //1.清空事件
    36. addHerderEvents.clear();
    37. //2.遍历events
    38. for (Event event:events) {
    39. //给每一个事件添加头部信息
    40. addHerderEvents.add(intercept(event));
    41. }
    42. //返回结果
    43. return addHerderEvents;
    44. }
    45. @Override
    46. public void close() {
    47. }
    48. public static class Builder implements Interceptor.Builder {
    49. @Override
    50. public Interceptor build() {
    51. return new TypeInterceptor();
    52. }
    53. @Override
    54. public void configure(Context context) {
    55. }
    56. }
    57. }

    4)编辑flume配置文件

    为hadoop01上的Flume1配置1个netcat source,1个 sink group(2个avro sink),并配置相应的 ChannelSelector和interceptor。

    1. # agent
    2. a1.sources = r1
    3. a1.sinks = k1 k2
    4. a1.channels = c1 c2
    5. # source
    6. a1.sources.r1.type = netcat
    7. a1.sources.r1.bind = hadoop01
    8. a1.sources.r1.port = 44444
    9. a1.sources.r1.interceptors = i1
    10. a1.sources.r1.interceptors.i1.type =
    11. com.zj.flume.interceptor.TypeInterceptor$Builder
    12. a1.sources.r1.selector.type = multiplexing
    13. a1.sources.r1.selector.header = type
    14. a1.sources.r1.selector.mapping.first = c1
    15. a1.sources.r1.selector.mapping.second = c2
    16. # sink
    17. a1.sinks.k1.type = avro
    18. a1.sinks.k1.hostname = hadoop02
    19. a1.sinks.k1.port = 4141
    20. a1.sinks.k2.type=avro
    21. a1.sinks.k2.hostname = hadoop03
    22. a1.sinks.k2.port = 4242
    23. # channel
    24. a1.channels.c1.type = memory
    25. a1.channels.c1.capacity = 1000
    26. a1.channels.c1.transactionCapacity = 100
    27. # channel
    28. a1.channels.c2.type = memory
    29. a1.channels.c2.capacity = 1000
    30. a1.channels.c2.transactionCapacity = 100
    31. # source and sink to the channel
    32. a1.sources.r1.channels = c1 c2
    33. a1.sinks.k1.channel = c1
    34. a1.sinks.k2.channel = c2

    为hadoop02上的Flume2配置一个avro source和一个logger sink。

    1. # agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # source
    6. a1.sources.r1.type = avro
    7. a1.sources.r1.bind = hadoop02
    8. a1.sources.r1.port = 4141
    9. # sink
    10. a1.sinks.k1.type = logger
    11. # channel
    12. a1.channels.c1.type = memory
    13. a1.channels.c1.capacity = 1000
    14. a1.channels.c1.transactionCapacity = 100
    15. # source and sink to channel
    16. a1.sinks.k1.channel = c1
    17. a1.sources.r1.channels = c1

    为hadoop03上的Flume3配置一个avro source和一个logger sink。

    1. # agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # source
    6. a1.sources.r1.type = avro
    7. a1.sources.r1.bind = hadoop03
    8. a1.sources.r1.port = 4242
    9. # sink
    10. a1.sinks.k1.type = logger
    11. # channel
    12. a1.channels.c1.type = memory
    13. a1.channels.c1.capacity = 1000
    14. a1.channels.c1.transactionCapacity = 100
    15. # source and sink to channel
    16. a1.sinks.k1.channel = c1
    17. a1.sources.r1.channels = c1

    (5)分别在hadoop01,hadoop02,hadoop03上启动flume进程,注意先后顺序。

    (6)在 hadoop01使用netcat向hadoop01:44444 发送字母和数字。

    (7)观察hadoop02和hadoop03打印的日志。

    自定义source

    介绍:

    Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些source。

    官方也提供了自定义source的接口:

    https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义MySource需要继承AbstractSource类并实现Configurable和PollableSource接口。

    实现相应方法:

    getBackOffSleepIncrement() //backoff 步长

    getMaxBackOffSleepInterval()//backoff最长时间

    configure(Context context)//初始化context(读取配置文件内容)

    process()//获取数据封装成event并写入channel,这个方法将被循环调用。

    使用场景:读取MySQL数据或者其他文件系统。

    (1)需求

    使用flume接收数据,并给每条数据添加前缀,输出到控制台。前缀可从flume配置文件中配置。

    (2)需求分析

    configure(Context context):读取配置文件(XX.conf)中的配置信息;

    process():接收数据,将数据封装成一个个的Event,写入Channel,使用for循环模拟数据生成。for(int i=0;i<5;i++);

    getBackOffSleepIncrement():暂不用;

    getMaxBackOffSleepInterval():暂不用;

    (3)实现

    1)导入依赖

    1. <dependency>
    2. <groupId>org.apache.flume</groupId>
    3. <artifactId>flume-ng-core</artifactId>
    4. <version>1.9.0</version>
    5. </dependency>

    2)编码

    1. import org.apache.flume.Context;
    2. import org.apache.flume.EventDeliveryException;
    3. import org.apache.flume.PollableSource;
    4. import org.apache.flume.conf.Configurable;
    5. import org.apache.flume.event.SimpleEvent;
    6. import org.apache.flume.source.AbstractSource;
    7. import java.util.HashMap;
    8. public class MySource extends AbstractSource implements Configurable, PollableSource {
    9. //定义配置文件将要读取的字段
    10. private Long delay;
    11. private String field;
    12. //初始化配置信息
    13. @Override
    14. public void configure(Context context) {
    15. delay = context.getLong("delay");
    16. field = context.getString("field","hello!");
    17. }
    18. @Override
    19. public Status process() throws EventDeliveryException {
    20. try {
    21. //创建事件头部信息
    22. HashMap hearderMap = new HashMap<>();
    23. //创建事件
    24. SimpleEvent event = new SimpleEvent();
    25. //循环封装事件
    26. for (int i = 0;i < 5;i++) {
    27. //给事件设置头信息
    28. event.setHeaders(hearderMap);
    29. //给事件设置内存
    30. event.setBody((field + i).getBytes());
    31. //将事件写入channel
    32. getChannelProcessor().processEvent(event);
    33. Thread.sleep(delay);
    34. }
    35. } catch (Exception e) {
    36. e.printStackTrace();
    37. return Status.BACKOFF;
    38. }
    39. return Status.READY;
    40. }
    41. @Override
    42. public long getBackOffSleepIncrement() {
    43. return 0;
    44. }
    45. @Override
    46. public long getMaxBackOffSleepInterval() {
    47. return 0;
    48. }
    49. }

    (4)测试

    1)打包

    将写好的代码打包,并放到flume的lib目录下。

    2)配置文件

    1. # agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # source
    6. a1.sources.r1.type = com.zj.source.MySource
    7. a1.sources.r1.delay = 1000
    8. #a1.sources.r1.field = jeffry
    9. # sink
    10. a1.sinks.k1.type = logger
    11. # channel
    12. a1.channels.c1.type = memory
    13. a1.channels.c1.capacity = 1000
    14. a1.channels.c1.transactionCapacity = 100
    15. # source and sink to the channel
    16. a1.sources.r1.channels = c1
    17. a1.sinks.k1.channel = c1

    3)开始任务

    1. bin/flume-ng agent -c conf/ -f conf/mysource.conf
    2. -n a1 -Dflume.root.logger=INFO,console

    结果

    自定义sink

    介绍:

    Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。

    Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。

    Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。

    官方提供的Sink类型有时候不能满足实际开发中的需求,此时我们就需要根据实际需求自定义某些Sink。

    官方提供了自定义sink的接口:

    https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink需要继承AbstractSink类并实现Configurable接口。

    实现相应方法:

    configure(Context context)//初始化context(读取配置文件内容)

    process()//从Channel读取获取数据(event),这个方法将被循环调用。

    使用场景:读取Channel数据写入MySQL或者其他文件系统。

    (1)需求

    用flume接收数据,并在Sink端给每条数据添加前缀和后缀,输出到控制台。前后缀可在flume任务配置文件中配置。

    (2)需求分析

    configure():读取任务配置文件中的配置信息。

    process():Channel中取数据,添加前后缀,写入日志。

    数据流:ABC->ABC->hello:ABC:hello

    (3)实现

    1. import org.apache.flume.*;
    2. import org.apache.flume.conf.Configurable;
    3. import org.apache.flume.sink.AbstractSink;
    4. import org.slf4j.Logger;
    5. import org.slf4j.LoggerFactory;
    6. public class MySink extends AbstractSink implements Configurable {
    7. //创建logger对象
    8. private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    9. private String prefix;
    10. private String suffix;
    11. @Override
    12. public Status process() throws EventDeliveryException {
    13. //声明返回值状态信息
    14. Status status;
    15. //获取当前sink绑定的channel
    16. Channel ch = getChannel();
    17. //获取事务
    18. Transaction txn = ch.getTransaction();
    19. //声明事件
    20. Event event;
    21. //开启事件
    22. txn.begin();
    23. //读取channel中的事件,直到读取到事件结束循环
    24. while (true) {
    25. event = ch.take();
    26. if (event != null) {
    27. break;
    28. }
    29. }
    30. try {
    31. //处理事件(打印)
    32. LOG.info(prefix + new String(event.getBody()) + suffix);
    33. //事务提交
    34. txn.commit();
    35. status = Status.READY;
    36. } catch (Exception e) {
    37. //遇到异常,事务回滚
    38. txn.rollback();
    39. status = Status.BACKOFF;
    40. } finally {
    41. //关闭事务
    42. txn.close();
    43. }
    44. return status;
    45. }
    46. @Override
    47. public void configure(Context context) {
    48. //读取配置文件内容,有默认值
    49. prefix = context.getString("prefix","hello:");
    50. //读取配置文件内容,无默认值
    51. suffix = context.getString("suffix");
    52. }
    53. }

    (4)测试

    1)打包

    将写好的代码打包,并放到flume的lib目录下。

    2)配置文件

    1. # agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # source
    6. a1.sources.r1.type = netcat
    7. a1.sources.r1.bind = hadoop01
    8. a1.sources.r1.port = 44444
    9. # sink
    10. a1.sinks.k1.type = com.zj.sink.MySink
    11. #a1.sinks.k1.prefix = jeffry:
    12. a1.sinks.k1.suffix = :jeffry
    13. # channel
    14. a1.channels.c1.type = memory
    15. a1.channels.c1.capacity = 1000
    16. a1.channels.c1.transactionCapacity = 100
    17. # source and sink to the channel
    18. a1.sources.r1.channels = c1
    19. a1.sinks.k1.channel = c1

    3)开启任务

    1. bin/flume-ng agent -c conf/ -f conf/mysink.conf
    2. -n a1 -Dflume.root.logger=INFO,console
    $ nc hadoop01 44444

    结果

    本文为学习笔记!!!

  • 相关阅读:
    redis(基础 && redis缓存)
    段码屏学习
    6-JS的Fetch 跨域问题
    【枚举】AcWing 1236. 递增三元组
    老生常谈React的diff算法原理-面试版
    项目中升级mysql遇到的若干问题
    22个Vue 源码中的工具函数
    Web应用开发介绍
    生活中常见的嵌入式产品都有哪些?
    PCL B样条曲线拟合(2d/3d)
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/127027937