• Flume学习笔记(3)—— Flume 自定义组件


    前置知识:

    Flume学习笔记(1)—— Flume入门-CSDN博客

    Flume学习笔记(2)—— Flume进阶-CSDN博客

    Flume 自定义组件

    自定义 Interceptor

    需求分析:使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统

    需要使用Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值

    实现流程:

    代码

    导入依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.flumegroupId>
    4. <artifactId>flume-ng-coreartifactId>
    5. <version>1.9.0version>
    6. dependency>
    7. dependencies>

    自定义拦截器的代码:

    1. package com.why.interceptor;
    2. import org.apache.flume.Context;
    3. import org.apache.flume.Event;
    4. import org.apache.flume.interceptor.Interceptor;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. import java.util.Map;
    8. public class TypeInterceptor implements Interceptor {
    9. //存放事件的集合
    10. private List addHeaderEvents;
    11. @Override
    12. public void initialize() {
    13. //初始化集合
    14. addHeaderEvents = new ArrayList<>();
    15. }
    16. //单个事件拦截
    17. @Override
    18. public Event intercept(Event event) {
    19. //获取头信息
    20. Map headers = event.getHeaders();
    21. //获取body信息
    22. String body = new String(event.getBody());
    23. //根据数据中是否包含”why“来分组
    24. if(body.contains("why"))
    25. {
    26. headers.put("type","first");
    27. }else {
    28. headers.put("type","second");
    29. }
    30. return event;
    31. }
    32. //批量事件拦截
    33. @Override
    34. public List intercept(List events) {
    35. //清空集合
    36. addHeaderEvents.clear();
    37. //遍历events
    38. for(Event event : events)
    39. {
    40. //给每一个事件添加头信息
    41. addHeaderEvents.add(intercept(event));
    42. }
    43. return addHeaderEvents;
    44. }
    45. @Override
    46. public void close() {
    47. }
    48. //构建生成器
    49. public static class TypeBuilder implements Interceptor.Builder{
    50. @Override
    51. public Interceptor build() {
    52. return new TypeInterceptor();
    53. }
    54. @Override
    55. public void configure(Context context) {
    56. }
    57. }
    58. }

    将代码打包放入flume安装路径下的lib文件夹中

    配置文件

    在job文件夹下创建group4目录,添加配置文件;

    为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.sinks = k1 k2
    4. a1.channels = c1 c2
    5. # Describe/configure the source
    6. a1.sources.r1.type = netcat
    7. a1.sources.r1.bind = localhost
    8. a1.sources.r1.port = 44444
    9. a1.sources.r1.interceptors = i1
    10. a1.sources.r1.interceptors.i1.type = com.why.interceptor.TypeInterceptor$TypeBuilder
    11. a1.sources.r1.selector.type = multiplexing
    12. a1.sources.r1.selector.header = type
    13. a1.sources.r1.selector.mapping.first = c1
    14. a1.sources.r1.selector.mapping.second = c2
    15. # Describe the sink
    16. a1.sinks.k1.type = avro
    17. a1.sinks.k1.hostname = hadoop103
    18. a1.sinks.k1.port = 4141
    19. a1.sinks.k2.type=avro
    20. a1.sinks.k2.hostname = hadoop104
    21. a1.sinks.k2.port = 4242
    22. # Use a channel which buffers events in memory
    23. a1.channels.c1.type = memory
    24. a1.channels.c1.capacity = 1000
    25. a1.channels.c1.transactionCapacity = 100
    26. # Use a channel which buffers events in memory
    27. a1.channels.c2.type = memory
    28. a1.channels.c2.capacity = 1000
    29. a1.channels.c2.transactionCapacity = 100
    30. # Bind the source and sink to the channel
    31. a1.sources.r1.channels = c1 c2
    32. a1.sinks.k1.channel = c1
    33. a1.sinks.k2.channel = c2

    hadoop103:配置一个 avro source 和一个 logger sink

    1. a1.sources = r1
    2. a1.sinks = k1
    3. a1.channels = c1
    4. a1.sources.r1.type = avro
    5. a1.sources.r1.bind = hadoop103
    6. a1.sources.r1.port = 4141
    7. a1.sinks.k1.type = logger
    8. a1.channels.c1.type = memory
    9. a1.channels.c1.capacity = 1000
    10. a1.channels.c1.transactionCapacity = 100
    11. a1.sinks.k1.channel = c1
    12. a1.sources.r1.channels = c1

    hadoop104:配置一个 avro source 和一个 logger sink

    1. a1.sources = r1
    2. a1.sinks = k1
    3. a1.channels = c1
    4. a1.sources.r1.type = avro
    5. a1.sources.r1.bind = hadoop104
    6. a1.sources.r1.port = 4242
    7. a1.sinks.k1.type = logger
    8. a1.channels.c1.type = memory
    9. a1.channels.c1.capacity = 1000
    10. a1.channels.c1.transactionCapacity = 100
    11. a1.sinks.k1.channel = c1
    12. a1.sources.r1.channels = c1

    执行指令

    hadoop102:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf

    hadoop103:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.logger=INFO,console

    hadoop104:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.logger=INFO,console

    然后hadoop102通过nc连接44444端口,发送数据:

    在hadoop103和104上分别接收到:

    自定义 Source

    官方提供的文档:Flume 1.11.0 Developer Guide — Apache Flume

    给出的示例代码如下:

    1. public class MySource extends AbstractSource implements Configurable, PollableSource {
    2. private String myProp;
    3. @Override
    4. public void configure(Context context) {
    5. String myProp = context.getString("myProp", "defaultValue");
    6. // Process the myProp value (e.g. validation, convert to another type, ...)
    7. // Store myProp for later retrieval by process() method
    8. this.myProp = myProp;
    9. }
    10. @Override
    11. public void start() {
    12. // Initialize the connection to the external client
    13. }
    14. @Override
    15. public void stop () {
    16. // Disconnect from external client and do any additional cleanup
    17. // (e.g. releasing resources or nulling-out field values) ..
    18. }
    19. @Override
    20. public Status process() throws EventDeliveryException {
    21. Status status = null;
    22. try {
    23. // This try clause includes whatever Channel/Event operations you want to do
    24. // Receive new data
    25. Event e = getSomeData();
    26. // Store the Event into this Source's associated Channel(s)
    27. getChannelProcessor().processEvent(e);
    28. status = Status.READY;
    29. } catch (Throwable t) {
    30. // Log exception, handle individual exceptions as needed
    31. status = Status.BACKOFF;
    32. // re-throw all Errors
    33. if (t instanceof Error) {
    34. throw (Error)t;
    35. }
    36. } finally {
    37. txn.close();
    38. }
    39. return status;
    40. }
    41. }

    需要继承AbstractSource,实现Configurable, PollableSource

    实战需求分析

    使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置

    代码

    1. package com.why.source;
    2. import org.apache.flume.Context;
    3. import org.apache.flume.EventDeliveryException;
    4. import org.apache.flume.PollableSource;
    5. import org.apache.flume.conf.Configurable;
    6. import org.apache.flume.event.SimpleEvent;
    7. import org.apache.flume.source.AbstractSource;
    8. import java.util.HashMap;
    9. import java.util.concurrent.ConcurrentMap;
    10. public class MySource extends AbstractSource implements PollableSource, Configurable {
    11. //定义配置文件将来要读取的字段
    12. private Long delay;
    13. private String field;
    14. //获取数据封装成 event 并写入 channel,这个方法将被循环调用
    15. @Override
    16. public Status process() throws EventDeliveryException {
    17. try {
    18. //事件头信息
    19. HashMap headerMap = new HashMap<>();
    20. //创建事件
    21. SimpleEvent event = new SimpleEvent();
    22. //循环封装事件
    23. for (int i = 0; i < 5; i++) {
    24. //设置头信息
    25. event.setHeaders(headerMap);
    26. //设置事件内容
    27. event.setBody((field + i).getBytes());
    28. //将事件写入Channel
    29. getChannelProcessor().processEvent(event);
    30. Thread.sleep(delay);
    31. }
    32. }catch (InterruptedException e) {
    33. throw new RuntimeException(e);
    34. }
    35. return Status.READY;
    36. }
    37. //backoff 步长
    38. @Override
    39. public long getBackOffSleepIncrement() {
    40. return 0;
    41. }
    42. //backoff 最长时间
    43. @Override
    44. public long getMaxBackOffSleepInterval() {
    45. return 0;
    46. }
    47. //初始化 context(读取配置文件内容)
    48. @Override
    49. public void configure(Context context) {
    50. delay = context.getLong("delay");
    51. field = context.getString("field","Hello");
    52. }
    53. }

    打包放到flume安装路径下的lib文件夹中;

    配置文件

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # Describe/configure the source
    6. a1.sources.r1.type = com.why.source.MySource
    7. a1.sources.r1.delay = 1000
    8. a1.sources.r1.field = why
    9. # Describe the sink
    10. a1.sinks.k1.type = logger
    11. # Use a channel which buffers events in memory
    12. a1.channels.c1.type = memory
    13. a1.channels.c1.capacity = 1000
    14. a1.channels.c1.transactionCapacity = 100
    15. # Bind the source and sink to the channel
    16. a1.sources.r1.channels = c1
    17. a1.sinks.k1.channel = c1

    执行指令

    hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.logger=INFO,console

    结果如下:

    自定义 Sink

    Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

    Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件

    官方文档:Flume 1.11.0 Developer Guide — Apache Flume

    接口实例:

    1. public class MySink extends AbstractSink implements Configurable {
    2. private String myProp;
    3. @Override
    4. public void configure(Context context) {
    5. String myProp = context.getString("myProp", "defaultValue");
    6. // Process the myProp value (e.g. validation)
    7. // Store myProp for later retrieval by process() method
    8. this.myProp = myProp;
    9. }
    10. @Override
    11. public void start() {
    12. // Initialize the connection to the external repository (e.g. HDFS) that
    13. // this Sink will forward Events to ..
    14. }
    15. @Override
    16. public void stop () {
    17. // Disconnect from the external respository and do any
    18. // additional cleanup (e.g. releasing resources or nulling-out
    19. // field values) ..
    20. }
    21. @Override
    22. public Status process() throws EventDeliveryException {
    23. Status status = null;
    24. // Start transaction
    25. Channel ch = getChannel();
    26. Transaction txn = ch.getTransaction();
    27. txn.begin();
    28. try {
    29. // This try clause includes whatever Channel operations you want to do
    30. Event event = ch.take();
    31. // Send the Event to the external repository.
    32. // storeSomeData(e);
    33. txn.commit();
    34. status = Status.READY;
    35. } catch (Throwable t) {
    36. txn.rollback();
    37. // Log exception, handle individual exceptions as needed
    38. status = Status.BACKOFF;
    39. // re-throw all Errors
    40. if (t instanceof Error) {
    41. throw (Error)t;
    42. }
    43. }
    44. return status;
    45. }
    46. }

    自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口

    实战需求分析

    使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置

    代码

    1. package com.why.sink;
    2. import org.apache.flume.*;
    3. import org.apache.flume.conf.Configurable;
    4. import org.apache.flume.sink.AbstractSink;
    5. import org.slf4j.Logger;
    6. import org.slf4j.LoggerFactory;
    7. public class MySink extends AbstractSink implements Configurable {
    8. //创建 Logger 对象
    9. private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    10. //前后缀
    11. private String prefix;
    12. private String suffix;
    13. @Override
    14. public Status process() throws EventDeliveryException {
    15. //声明返回值状态信息
    16. Status status;
    17. //获取当前 Sink 绑定的 Channel
    18. Channel ch = getChannel();
    19. //获取事务
    20. Transaction txn = ch.getTransaction();
    21. //声明事件
    22. Event event;
    23. //开启事务
    24. txn.begin();
    25. //读取 Channel 中的事件,直到读取到事件结束循环
    26. while (true) {
    27. event = ch.take();
    28. if (event != null) {
    29. break;
    30. }
    31. }
    32. try {
    33. //处理事件(打印)
    34. LOG.info(prefix + new String(event.getBody()) + suffix);
    35. //事务提交
    36. txn.commit();
    37. status = Status.READY;
    38. } catch (Exception e) {
    39. //遇到异常,事务回滚
    40. txn.rollback();
    41. status = Status.BACKOFF;
    42. } finally {
    43. //关闭事务
    44. txn.close();
    45. }
    46. return status;
    47. }
    48. @Override
    49. public void configure(Context context) {
    50. prefix = context.getString("prefix", "hello");
    51. suffix = context.getString("suffix");
    52. }
    53. }

    打包放到flume安装路径下的lib文件夹中;

    配置文件

    1. # Name the components on this agent
    2. a1.sources = r1
    3. a1.sinks = k1
    4. a1.channels = c1
    5. # Describe/configure the source
    6. a1.sources.r1.type = netcat
    7. a1.sources.r1.bind = localhost
    8. a1.sources.r1.port = 44444
    9. # Describe the sink
    10. a1.sinks.k1.type = com.why.sink.MySink
    11. a1.sinks.k1.prefix = why:
    12. a1.sinks.k1.suffix = :why
    13. # Use a channel which buffers events in memory
    14. a1.channels.c1.type = memory
    15. a1.channels.c1.capacity = 1000
    16. a1.channels.c1.transactionCapacity = 100
    17. # Bind the source and sink to the channel
    18. a1.sources.r1.channels = c1
    19. a1.sinks.k1.channel = c1

    执行指令

    hadoop102上:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.logger=INFO,console

    结果如下:

  • 相关阅读:
    docker搭建maven私服
    Vue实现过渡动画的三种方式
    php 进程通信系列 (三)信号量
    malloc和new的本质区别
    promise async和await的方法与使用
    05 robotFrameWork+selenium2library 一维数组的使用
    es中的匹配显示问题
    <Linux> shell运行原理及Linux权限的理解
    【go学习合集】进阶数据类型2 -------->切片
    go的netpoll学习
  • 原文地址:https://blog.csdn.net/qq_51235856/article/details/134475721