• Flume配置4——自定义Source+Sink


    自定义Source

    1.说明

    • 官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source
    • Source 的目的是从外部客户端接收数据并将其存储到配置的 Channels 中

    2.自定义步骤(参考官方文档)

    • class MySource extends AbstractSource implements Configurable,PollableSource
    • 方法
    getBackOffSleepIncrement()//暂不用
    getMaxBackOffSleepInterval()//暂不用
    configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
    process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
    stop()//关闭相关的资源
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.案例需求

    • flume收集数据,并给每条数据添加前缀与后缀,若不添加则使用默认,最后输出到控制台

    4.原理图

    在这里插入图片描述

    5.实现

    • 添加依赖
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.8.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • MySource代码
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @program: Flume-MyISS
     * @description:
     * @author: 作者
     * @create: 2022-06-26 21:21
     */
    public class MySource extends AbstractSource implements Configurable, PollableSource {
        //前缀,后缀
        private String prefix;
        private String subfix;
        private Long delay;
    
    
        @Override//初始化配置信息,获取.conf中的配置
        public void configure(Context context) {
            prefix = context.getString("pre", "pre-");//第二个参数是给出的默认值
            subfix = context.getString("sub");//未给出则没有默认值,即配置文件中未配置时默认没有
            delay = context.getLong("delay", 2000L);//延迟时间
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            Event event = new SimpleEvent();
            Map<String, String> headers = new HashMap<>();
    
            try {//ctrl+alt+t
                //循环创建Event,输出给Channel
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(headers);
                    event.setBody((prefix + "hello" + subfix).getBytes());
                    getChannelProcessor().processEvent(event);
                    /**
                     * source提交event会先经过拦截器--->读源码processEvent
                     * event = this.interceptorChain.intercept(event);//经过拦截器拦截
                     * if (event != null) {//如果拦截器返回的不是null才会进入if,继续执行
                     *     List<Channel> requiredChannels = this.selector.getRequiredChannels(event);//selector是一个channel选择器,返回list是因为可能需要传给多个channel
                     *     Iterator var3 = requiredChannels.iterator();//需要遍历channel
                     *     while(var3.hasNext()) {
                     *          Channel reqChannel = (Channel)var3.next();
                     *          Transaction tx = reqChannel.getTransaction();//获取事务
                     *          Preconditions.checkNotNull(tx, "Transaction object must not be null");
                     *          try {
                     *              tx.begin();//开启事务
                     *              reqChannel.put(event);
                     *              tx.commit();//提交事务
                     *          } catch (Throwable var17) {
                     *              tx.rollback();//发生异常则会回滚
                     *              ......
                     *         }....
                     */
                }
                Thread.sleep(delay);//一旦得到返回值是READY,又立马调用process函数
                return Status.READY;
            } catch (Exception e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
        }
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 项目jar放入flume的lib目录下
      在这里插入图片描述
      在这里插入图片描述
    • /jobs/t7下编写配置文件mysource-flume-logger.conf
    vim mysource-flume-logger.conf
    ---------------------------------
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = com.yc.source.MySource
    a1.sources.r1.delay = 4000
    #a1.sources.r1.pre = pre-pre
    #a1.sources.r1.sub = sub
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 启动顺序flume
    bin/flume-ng agent --conf conf --conf-file jobs/t7/mysource-flume-logger.conf --name a1 -Dflume.root.logger==INFO,console
    
    • 1
    • 未设置前缀与后缀
      在这里插入图片描述
    • 设置前缀与后缀
      在这里插入图片描述

    自定义Sink

    1.说明

    • Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 FlumeAgent
    • Sink 是完全事务性的
    在从 Channel 批量删除数据之前,每个 SinkChannel 启动一个事务
    批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务
    事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件
    
    • 1
    • 2
    • 3
    • 官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink

    2.自定义步骤(参考官方文档)

    • extends AbstractSink implements Configurable
    • 方法
    configure(Context context)//初始化context,即读取配置文件中的信息,每个信息对应一个配置项
    process()//获取数据,封装成Event并写入Channel,这个方法被循环调用
    
    • 1
    • 2

    3.案例需求

    • flume 接收数据,并给每条数据添加前缀与后缀(若不添加则使用默认)输出到控制台

    4.原理图

    在这里插入图片描述

    5.实现

    • 添加依赖
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.8.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • MySink代码
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @program: Flume-MyISS
     * @description:
     * @author: 作者
     * @create: 2022-06-26 22:39
     */
    public class MySink extends AbstractSink implements Configurable {
        //创建 Logger 对象
        private static final Logger LOG = LoggerFactory.getLogger(MySink.class);
        private String prefix;
        private String subfix;
    
        @Override
        public void configure(Context context) {
            //读取配置文件内容,有默认值
            prefix = context.getString("pre", "pre-hello:");
            //读取配置文件内容,无默认值
            subfix = context.getString("sub");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            //1.获取channel,开启事务
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
            transaction.begin();
    
            //2.从channel中抓去数据,打印到控制台
            try {
                //2.1抓取数据
                Event event;
                while (true) {//抓取不到数据就一直循环的抓取
                    event = channel.take();
                    if (event != null) {
                        break;
                    }
                }
                //2.2处理数据
                LOG.info(new String(prefix + event.getBody() + subfix));
    
                //2.3提交事务
                transaction.commit();
                return Status.READY;
            } catch (Exception e) {
                //回滚
                transaction.rollback();
                return Status.BACKOFF;
            } finally {
                transaction.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 项目jar放入flume的lib目录下
      在这里插入图片描述
      在这里插入图片描述
    • /jobs/t8下编写配置文件netcat-flume-mysink.conf
    vim netcat-flume-mysink.conf
    ------------------------------
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = com.yc.sink.MySink
    #a1.sinks.k1.pre = pre-pre
    #a1.sinks.k1.sub = sub
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 启动顺序flume
    bin/flume-ng agent --conf conf --conf-file jobs/t8/netcat-flume-mysink.conf --name a1 -Dflume.root.logger==INFO,console
    
    • 1
    • 端口发送数据
    telnet localhost 44444
    
    • 1
    • 未设置前缀与后缀
      在这里插入图片描述
    • 设置前缀与后缀
      在这里插入图片描述
  • 相关阅读:
    十四)Stable Diffusion使用教程:一个线稿渲3D例子
    SSE 服务端消息推送
    物联网开发学习笔记——目录索引
    Redis和Memcached网络模型详解
    httpserver 下载服务器demo 以及libevent版本的 httpserver
    HBase导出建表语句
    C++学习笔记
    JAVA反射(原理+使用)
    easyphoto 妙鸭相机
    .NET 发布,部署和运行应用程序
  • 原文地址:https://blog.csdn.net/weixin_51699336/article/details/125509606