• Flume分布式日志采集


    Apache Flume

    概述

    Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。Flume构建在日志流之上一个简单灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。使用Flume这套架构实现对日志流数据的实时在线分析。Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。本次课程使用的是apache-flume-1.9.0-bin.tar.gz

    架构

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2qzeqgFT-1629453218230)(assets/20201021093100281.png)]

    安装

    • 安装JDK 1.8+ 配置JAVA_HOME环境变量-略
    • 安装Flume下载地址http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
    [root@CentOS ~]# tar -zxf apache-flume-1.9.0-bin.tar.gz -C /usr/
    [root@CentOS ~]# cd /usr/apache-flume-1.9.0-bin/
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng version
    Flume 1.9.0
    Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
    Revision: d4fcab4f501d41597bc616921329a4339f73585e
    Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018
    From source with checksum 35db629a3bda49d23e9b3690c80737f9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Agent配置模板

    # 声明组件信息
    .sources =  
    .sinks =  
    .channels =  
    
    # 组件配置
    .sources.. = 
    .channels.. = 
    .sinks.. = 
    
    # 链接组件
    .sources..channels =   ...
    .sinks..channel = 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    模板结构是必须掌握的,掌握该模板的目的是为了便于后期的查阅和配置。

    表示组件的名字,系统有哪些可以使用的组件需要查阅文档.

    查阅:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

    快速入门

    helloword.properties 单个Agent的配置,将该配置文件放置在flume安装目录下的conf目录下。

    # 声明基本组件 Source Channel Sink
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    1、安装一下yum -y install nmap-ncat,这样方便后续的测试。
    2、需要安装yum -y install telnet ,方便做测试。

    ②启动a1 采集组件

    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/helloword.properties -Dflume.root.logger=INFO,console
    
    • 1

    附注启动命令参数

    Usage: ./bin/flume-ng  [options]...
    
    commands:
      help                      display this help text
      agent                     run a Flume agent
      avro-client               run an avro Flume client
      version                   show Flume version info
    
    global options:# 全局属性
      --conf,-c           use configs in  directory
      --classpath,-C        append to the classpath
      --dryrun,-d               do not actually start Flume, just print the command
      --plugins-path      colon-separated list of plugins.d directories. See the
                                plugins.d section in the user guide for more details.
                                Default: $FLUME_HOME/plugins.d
      -Dproperty=value          sets a Java system property value
      -Xproperty=value          sets a Java -X option
    
    agent options:
      --name,-n           the name of this agent (required)
      --conf-file,-f      specify a config file (required if -z missing)
      --zkConnString,-z    specify the ZooKeeper connection to use (required if -f missing)
      --zkBasePath,-p     specify the base path in ZooKeeper for agent configs
      --no-reload-conf          do not reload config file if changed
      --help,-h                 display help text
    
    avro-client options:
      --rpcProps,-P    RPC client properties file with server connection params
      --host,-H        hostname to which events will be sent
      --port,-p        port of the avro source
      --dirname         directory to stream to avro source
      --filename,-F    text file to stream to avro source (default: std input)
      --headerFile,-R  File containing event headers as key/value pairs on each new line
      --help,-h              display help text
    
      Either --rpcProps or both --host and --port must be specified.
    
    Note that if  directory is specified, then it is always included first
    in the classpath.
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    ③测试a1

    [root@CentOS apache-flume-1.9.0-bin]# telnet CentOS 44444
    Trying 192.168.52.134...
    Connected to CentOS.
    Escape character is '^]'.
    hello world
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2020-02-05 11:44:43,546 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D             hello world. }
    
    • 1

    基础组件概述

    Source-输入源

    √Avro Source

    通常用于远程采集数据(RPC服务),内部启动一个Avro 服务器,用于接收来自Avro Client的请求,并且将接收数据存储到Chanel中。

    属性默认值含义
    channels需要对接Channel
    type表示组件类型,必须给avro
    bind绑定IP
    port绑定监听端口
    #声明组件
    a1.sources = s1
    
    # 配置组件
    a1.sources.s1.type = avro
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 对接channel
    a1.sources.s1.channels = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    .sources = 
    # 组件配置
    .sources.. = 
    
    • 1
    • 2
    • 3
    # 声明基本组件 Source Channel Sink  example2.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = avro
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example2.properties -Dflume.root.logger=INFO,console
    
    • 1
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng avro-client --host CentOS --port 44444  --filename /root/t_employee
    
    • 1

    Exec Source

    可以将指令在控制台输出采集过来。通常需要将Flume的agent目标采集服务部署在一起。

    属性默认值描述
    channels需要对接Channel
    type必须指定为exec
    command要执行的命令
    # 声明基本组件 Source Channel Sink  example3.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = exec
    a1.sources.s1.command = tail -F /root/t_user
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example3.properties -Dflume.root.logger=INFO,console
    
    • 1
    [root@CentOS ~]# tail -f t_user
    
    • 1

    Spooling Directory Source

    采集静态目录下,新增文本文件,采集完成后会修改文件后缀,但是不会删除采集的源文件,如果用户只想采集一次,可以修改该source默认行为。通常需要将Flume的agent目标采集服务部署在一起。

    属性默认值说明
    channels对接的Channel
    type必须修改为spooldir
    spoolDir给定需要采集的目录
    fileSuffix.COMPLETED使用该值修改采集完成文件名
    deletePolicynever可选值never/immediate
    includePattern^.*$表示匹配所有文件
    ignorePattern^$表示不匹配的文件
    # 声明基本组件 Source Channel Sink  example4.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = spooldir
    a1.sources.s1.spoolDir = /root/spooldir
    a1.sources.s1.fileHeader = true
    a1.sources.s1.deletePolicy = immediate
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example4.properties -Dflume.root.logger=INFO,console
    
    • 1

    Taildir Source

    实时监测动态文本行的追加,并且记录采集的文件读取的位置了偏移量,即使下一次再次采集,可以实现增量采集。通常需要将Flume的agent目标采集服务部署在一起。

    属性默认值说明
    channels对接的通道
    type必须指定为TAILDIR
    filegroups以空格分隔的文件组列表。
    filegroups.文件组的绝对路径。正则表达式(而非文件系统模式)只能用于文件名。
    positionFile~/.flume/taildir_position.json记录采集文件的位置信息,实现增量采集
    # 声明基本组件 Source Channel Sink  example5.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = TAILDIR
    a1.sources.s1.filegroups = g1 g2
    a1.sources.s1.filegroups.g1 = /root/taildir/.*\.log$
    a1.sources.s1.filegroups.g2 = /root/taildir/.*\.java$
    a1.sources.s1.headers.g1.type = log
    a1.sources.s1.headers.g2.type = java
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example5.properties -Dflume.root.logger=INFO,console
    
    • 1

    Kafka Source

    参数默认值说明
    channels
    type必须为org.apache.flume.source.kafka.KafkaSource
    kafka.topicsKafka使用者将从中读取消息的主题的逗号分隔列表。
    kafka.bootstrap.servers来源使用的Kafka集群中的Broker列表
    kafka.topics.regex正则表达式,用于定义订阅源的主题集。此属性的优先级高于kafka.topics,并且覆盖kafka.topics(如果存在)。
    batchSize1000批量写入通道的最大消息数
    # 声明基本组件 Source Channel Sink example9.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.s1.batchSize = 100 
    a1.sources.s1.batchDurationMillis = 2000
    a1.sources.s1.kafka.bootstrap.servers = CentOS:9092
    a1.sources.s1.kafka.topics = topic01
    a1.sources.s1.kafka.consumer.group.id = g1
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example9.properties -Dflume.root.logger=INFO,console
    
    • 1

    Sink-输出

    Logger Sink

    通常用于测试/调试目的。

    File Roll Sink

    可以将采集的数据写入到本地文件

    # 声明基本组件 Source Channel Sink example6.properties
    
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll
    a1.sinks.sk1.sink.rollInterval = 0
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example6.properties
    
    • 1

    √HDFS Sink

    可以将数据写入到HDFS文件系统。

    # 声明基本组件 Source Channel Sink example7.properties
    
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = hdfs
    a1.sinks.sk1.hdfs.path = /flume-hdfs/%y-%m-%d
    a1.sinks.sk1.hdfs.rollInterval = 0
    a1.sinks.sk1.hdfs.rollSize = 0
    a1.sinks.sk1.hdfs.rollCount = 0
    a1.sinks.sk1.hdfs.useLocalTimeStamp = true
    a1.sinks.sk1.hdfs.fileType = DataStream
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    √Kafka Sink

    将数据写入Kafka的Topic中

    # 声明基本组件 Source Channel Sink example8.properties
    
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.sk1.kafka.bootstrap.servers = CentOS:9092
    a1.sinks.sk1.kafka.topic = topic01
    a1.sinks.sk1.kafka.flumeBatchSize = 20
    a1.sinks.sk1.kafka.producer.acks = 1
    a1.sinks.sk1.kafka.producer.linger.ms = 1
    a1.sinks.sk1.kafka.producer.compression.type = snappy
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Avro Sink: 将数据写出给Avro Source

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DJfyaTTW-1629453218234)(assets/image-20201021114843362.png)]

    # 声明基本组件 Source Channel Sink example10.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.s1.batchSize = 100 
    a1.sources.s1.batchDurationMillis = 2000
    a1.sources.s1.kafka.bootstrap.servers = CentOS:9092
    a1.sources.s1.kafka.topics = topic01
    a1.sources.s1.kafka.consumer.group.id = g1
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = avro
    a1.sinks.sk1.hostname = CentOS
    a1.sinks.sk1.port = 44444
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    # 声明基本组件 Source Channel Sink example9.properties
    a2.sources = s1
    a2.sinks = sk1
    a2.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a2.sources.s1.type = avro
    a2.sources.s1.bind = CentOS 
    a2.sources.s1.port = 44444
    
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a2.sinks.sk1.type = file_roll
    a2.sinks.sk1.sink.directory = /root/file_roll
    a2.sinks.sk1.sink.rollInterval = 0
    
    # 配置Channel通道,主要负责数据缓冲
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a2.sources.s1.channels = c1
    a2.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --conf-file conf/example10.properties --name a2
    
    • 1
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --conf-file conf/example10.properties --name a1
    
    • 1
    [root@CentOS kafka_2.11-2.2.0]# ./bin/kafka-console-producer.sh --broker-list CentOS:9092 --topic topic01
    
    • 1

    Channel-通道

    Memory Channel

    将Source数据直接写入内存,不安全,可能会导致数据丢失。

    参数默认值说明
    type只可以写memory
    capacity100通道中存储的最大事件数
    transactionCapacity100每一次source或者Sink组件写入Channel或者读取Channel的批量大小

    transactionCapacity <= capacity

    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000 
    a1.channels.c1.transactionCapacity = 100
    
    • 1
    • 2
    • 3

    JDBC Channel

    参数默认值说明
    type组件类型名称,必须为jdbc
    db.typeDERBY数据库供应商,必须是DERBY。

    事件存储在数据库支持的持久性存储中。 JDBC通道当前支持嵌入式Derby。这是一种持久通道,非常适合可恢复性很重要的流程。-存储非常重要的数据,的时候可以使用jdbc channel

    a1.channels.c1.type = jdbc
    
    • 1

    1、如果用户配置HIVE_HOME环境,需要用户移除hive的lib下的derby或者flume的lib下的derby(仅仅删除一方即可)

    2、默认情况下,flume使用的是复制|广播模式的通道选择器。

    Kafka Channel

    参数默认值说明
    type组件类型名称,必须为org.apache.flume.channel.kafka.KafkaChannel
    kafka.bootstrap.servers该通道使用的Kafka集群中的Broker列表。
    kafka.topicflume-channel该频道将使用的Kafka主题
    kafka.consumer.group.idflumeConsumer用于向Kafka注册的消费者组ID

    将Source采集的数据写入外围系统的Kafka集群。

    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = CentOS:9092
    a1.channels.c1.kafka.topic = topic_channel
    a1.channels.c1.kafka.consumer.group.id = g1
    
    • 1
    • 2
    • 3
    • 4
    # 声明基本组件 Source Channel Sink  example10.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = CentOS:9092
    a1.channels.c1.kafka.topic = topic_channel
    a1.channels.c1.kafka.consumer.group.id = g1
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    √File Channel

    参数默认值说明
    type组件类型名称,必须是file
    checkpointDir~/.flume/file-channel/checkpoint将存储检查点文件的目录
    dataDirs~/.flume/file-channel/data用逗号分隔的目录列表,用于存储日志文件

    使用文件系统作为通道的实现,能够实现对缓冲数据的持久化。

    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /root/flume/checkpoint
    a1.channels.c1.dataDirs = /root/flume/data
    
    • 1
    • 2
    • 3

    高级组件

    拦截器

    作用于Source组件,对Source封装的Event数据进行拦截或者是装饰,Flume内建了许多拦截器:

    案例1

    测试装饰拦截器

    # 声明基本组件 Source Channel Sink  example11.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 添加拦截器
    a1.sources.s1.interceptors = i1 i2 i3 i4 i5 i6
    a1.sources.s1.interceptors.i1.type = timestamp
    a1.sources.s1.interceptors.i2.type = host
    a1.sources.s1.interceptors.i3.type = static
    a1.sources.s1.interceptors.i3.key = from
    a1.sources.s1.interceptors.i3.value = baizhi
    a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
    a1.sources.s1.interceptors.i4.headerName = uuid
    a1.sources.s1.interceptors.i5.type = remove_header
    a1.sources.s1.interceptors.i5.withName = from
    a1.sources.s1.interceptors.i6.type = search_replace
    a1.sources.s1.interceptors.i6.searchPattern = ^jiangzz
    a1.sources.s1.interceptors.i6.replaceString = baizhi
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/example11.properties -Dflume.root.logger=INFO,console
    
    • 1

    案例2

    测试过滤和抽取拦截器

    # 声明基本组件 Source Channel Sink  example12.properties
    a1.sources = s1
    a1.sinks = sk1
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 添加拦截器
    a1.sources.s1.interceptors = i1 i2
    a1.sources.s1.interceptors.i1.type = regex_extractor
    a1.sources.s1.interceptors.i1.regex = ^(INFO|ERROR)
    a1.sources.s1.interceptors.i1.serializers = s1
    a1.sources.s1.interceptors.i1.serializers.s1.name = loglevel
    
    a1.sources.s1.interceptors.i2.type = regex_filter
    a1.sources.s1.interceptors.i2.regex = .*baizhi.*
    a1.sources.s1.interceptors.i2.excludeEvents = false
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = logger
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    通道选择器

    当一个Source组件对接多个Channel组件的时候,通道选择器决定了Source的数据如何路由到Channel中,如果用户不指定通道选择器,默认系统会将Source数据广播给所有的Channel(默认使用replicating模式)。

    replicating

    在这里插入图片描述

    # 声明基本组件 Source Channel Sink  example13.properties
    a1.sources = s1
    a1.sinks = sk1 sk2
    a1.channels = c1 c2
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll_1
    a1.sinks.sk1.sink.rollInterval = 0
    
    a1.sinks.sk2.type = file_roll
    a1.sinks.sk2.sink.directory = /root/file_roll_2
    a1.sinks.sk2.sink.rollInterval = 0
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = jdbc
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1 c2
    a1.sinks.sk1.channel = c1
    a1.sinks.sk2.channel = c2 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    等价写法:

    # 声明基本组件 Source Channel Sink  example14.properties
    a1.sources = s1
    a1.sinks = sk1 sk2
    a1.channels = c1 c2
    
    # 通道选择器 复制模式
    a1.sources.s1.selector.type = replicating
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll_1
    a1.sinks.sk1.sink.rollInterval = 0
    
    a1.sinks.sk2.type = file_roll
    a1.sinks.sk2.sink.directory = /root/file_roll_2
    a1.sinks.sk2.sink.rollInterval = 0
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = jdbc
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1 c2
    a1.sinks.sk1.channel = c1
    a1.sinks.sk2.channel = c2 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Multiplexing

    在这里插入图片描述

    # 声明基本组件 Source Channel Sink  example15.properties
    a1.sources = s1
    a1.sinks = sk1 sk2
    a1.channels = c1 c2
    
    # 通道选择器 复制模式
    a1.sources.s1.selector.type = multiplexing
    a1.sources.s1.selector.header = level 
    a1.sources.s1.selector.mapping.INFO = c1
    a1.sources.s1.selector.mapping.ERROR = c2
    a1.sources.s1.selector.default = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    a1.sources.s1.interceptors = i1
    a1.sources.s1.interceptors.i1.type = regex_extractor
    a1.sources.s1.interceptors.i1.regex = ^(INFO|ERROR)
    a1.sources.s1.interceptors.i1.serializers = s1
    a1.sources.s1.interceptors.i1.serializers.s1.name = level
    
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll_1
    a1.sinks.sk1.sink.rollInterval = 0
    
    a1.sinks.sk2.type = file_roll
    a1.sinks.sk2.sink.directory = /root/file_roll_2
    a1.sinks.sk2.sink.rollInterval = 0
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    a1.channels.c2.type = jdbc
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1 c2
    a1.sinks.sk1.channel = c1
    a1.sinks.sk2.channel = c2 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    这里需要删除hive安装目录下的derby的驱动jar!

    Sink Processors

    Flume使用Sink Group将多个Sink实例封装成一个逻辑的Sink组件,内部通过Sink Processors实现Sink Group的故障和负载均衡。

    Load balancing Sink Processor

    在这里插入图片描述

    # 声明基本组件 Source Channel Sink  example16.properties
    a1.sources = s1
    a1.sinks = sk1 sk2
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll_1
    a1.sinks.sk1.sink.rollInterval = 0
    a1.sinks.sk1.sink.batchSize = 1
    
    a1.sinks.sk2.type = file_roll
    a1.sinks.sk2.sink.directory = /root/file_roll_2
    a1.sinks.sk2.sink.rollInterval = 0
    a1.sinks.sk2.sink.batchSize = 1
    
    # 配置Sink Porcessors
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = sk1 sk2
    a1.sinkgroups.g1.processor.type = load_balance
    a1.sinkgroups.g1.processor.backoff = true
    a1.sinkgroups.g1.processor.selector = round_robin
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    a1.sinks.sk2.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    如果想看到负载均衡效果,sink.batchSizetransactionCapacity必须配置成1

    Failover Sink Processor

    # 声明基本组件 Source Channel Sink  example17.properties
    a1.sources = s1
    a1.sinks = sk1 sk2
    a1.channels = c1
    
    # 配置Source组件,从Socket中接收文本数据
    a1.sources.s1.type = netcat
    a1.sources.s1.bind = CentOS
    a1.sources.s1.port = 44444
    
    # 配置Sink组件,将接收数据打印在日志控制台
    a1.sinks.sk1.type = file_roll
    a1.sinks.sk1.sink.directory = /root/file_roll_1
    a1.sinks.sk1.sink.rollInterval = 0
    a1.sinks.sk1.sink.batchSize = 1
    
    a1.sinks.sk2.type = file_roll
    a1.sinks.sk2.sink.directory = /root/file_roll_2
    a1.sinks.sk2.sink.rollInterval = 0
    a1.sinks.sk2.sink.batchSize = 1
    
    # 配置Sink Porcessors
    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = sk1 sk2
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.sk1 = 20
    a1.sinkgroups.g1.processor.priority.sk2 = 10
    a1.sinkgroups.g1.processor.maxpenalty = 10000
    
    # 配置Channel通道,主要负责数据缓冲
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1
    
    # 进行组件间的绑定
    a1.sources.s1.channels = c1
    a1.sinks.sk1.channel = c1
    a1.sinks.sk2.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    应用集成-API

    原生API集成

    
    <dependency>
      <groupId>org.apache.flumegroupId>
      <artifactId>flume-ng-sdkartifactId>
      <version>1.9.0version>
    dependency>
    
    <dependency>
      <groupId>junitgroupId>
      <artifactId>junitartifactId>
      <version>4.12version>
      <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 单机链接

    参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

    public class RpcClientTests {
        private RpcClient client;
        @Before
        public void before(){
            client= RpcClientFactory.getDefaultInstance("CentOS",44444);
        }
    
        @Test
        public void testSend() throws EventDeliveryException {
            Event event= EventBuilder.withBody("this is body".getBytes());
            HashMap<String, String> header = new HashMap<String, String>();
            header.put("from","baizhi");
            event.setHeaders(header);
            client.append(event);
        }
    
        @After
        public void after(){
            client.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 集群链接

    ①故障转移

    //参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
    public class RpcClientTests02_FailoverClient {
        private RpcClient client;
        @Before
        public void before(){
            Properties props = new Properties();
            props.put("client.type", "default_failover");
           // List of hosts (space-separated list of user-chosen host aliases)
            props.put("hosts", "h1 h2 h3");
          // host/port pair for each host alias
            props.put("hosts.h1", "CentOSA:44444");
            props.put("hosts.h2","CentOSB:44444");
            props.put("hosts.h3", "CentOSC:44444");
    
            client= RpcClientFactory.getInstance(props);
        }
    
        @Test
        public void testSend() throws EventDeliveryException {
            Event event= EventBuilder.withBody("this is body".getBytes());
            HashMap<String, String> header = new HashMap<String, String>();
            header.put("from","zhangsan");
            event.setHeaders(header);
            client.append(event);
        }
    
        @After
        public void after(){
            client.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    ②负载均衡

    //参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
    public class RpcClientTests02_LoadBalancing {
        private RpcClient client;
        @Before
        public void before(){
            Properties props = new Properties();
            props.put("client.type", "default_loadbalance");
    
            // List of hosts (space-separated list of user-chosen host aliases)
            props.put("hosts", "h1 h2 h3");
    
            // host/port pair for each host alias
    
            props.put("hosts.h1", "CentOSA:44444");
            props.put("hosts.h2", "CentOSB:44444");
            props.put("hosts.h3", "CentOSC:44444");
    
    
            props.put("host-selector", "random"); // For random host selection
            // props.put("host-selector", "round_robin"); // For round-robin host
            //                                            // selection
            props.put("backoff", "true"); // Disabled by default.
    
            props.put("maxBackoff", "10000"); // Defaults 0, which effectively
            // becomes 30000 ms
    
            client= RpcClientFactory.getInstance(props);
        }
    
        @Test
        public void testSend() throws EventDeliveryException {
            Event event= EventBuilder.withBody("this is body".getBytes());
            HashMap<String, String> header = new HashMap<String, String>();
            header.put("from","lisi");
            event.setHeaders(header);
            client.append(event);
        }
    
        @After
        public void after(){
            client.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    log4j集成(传统)

    <dependency>
      <groupId>org.apache.flumegroupId>
      <artifactId>flume-ng-sdkartifactId>
      <version>1.9.0version>
    dependency>
    <dependency>
      <groupId>org.apache.flume.flume-ng-clientsgroupId>
      <artifactId>flume-ng-log4jappenderartifactId>
      <version>1.9.0version>
    dependency>
    <dependency>
      <groupId>org.slf4jgroupId>
      <artifactId>slf4j-log4j12artifactId>
      <version>1.7.5version>
    dependency>
    <dependency>
      <groupId>junitgroupId>
      <artifactId>junitartifactId>
      <version>4.12version>
      <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    log4j.appender.flume= org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
    log4j.appender.flume.Hosts = CentOSA:44444 CentOSB:44444 CentOSC:44444
    log4j.appender.flume.Selector = RANDOM
    
    log4j.logger.com.baizhi = DEBUG,flume
    log4j.appender.flume.layout=org.apache.log4j.PatternLayout
    log4j.appender.flume.layout.ConversionPattern=%p %d %c %m %n
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    public class TestLog {
        private static Log log= LogFactory.getLog(TestLog.class);
    
        public static void main(String[] args) {
            log.debug("你好!_debug");
            log.info("你好!_info");
            log.warn("你好!_warn");
            log.error("你好!_error");
        }
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    √SpringBoot 集成

    参考:https://github.com/gilt/logback-flume-appender

    <parent>
      <groupId>org.springframework.bootgroupId>
      <artifactId>spring-boot-starter-parentartifactId>
      <version>2.1.5.RELEASEversion>
    parent>
    <dependencies>
      <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starterartifactId>
      dependency>
      
      <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-testartifactId>
        <scope>testscope>
      dependency>
      <dependency>
        <groupId>org.apache.flumegroupId>
        <artifactId>flume-ng-sdkartifactId>
        <version>1.9.0version>
      dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    
    <configuration scan="true" scanPeriod="60 seconds" debug="false">
    
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" >
            <encoder>
                <pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%npattern>
                <charset>UTF-8charset>
            encoder>
        appender>
    
        <appender name="flume" class="com.gilt.logback.flume.FlumeLogstashV1Appender">
            <flumeAgents>
                CentOS:44444,
                CentOS:44444,
                CentOS:44444
            flumeAgents>
            <flumeProperties>
                connect-timeout=4000;
                request-timeout=8000
            flumeProperties>
            <batchSize>1batchSize>
            <reportingWindow>1reportingWindow>
            <additionalAvroHeaders>
                myHeader=myValue
            additionalAvroHeaders>
            <application>smapleappapplication>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%npattern>
            layout>
        appender>
    
        
        <root level="ERROR">
            <appender-ref ref="STDOUT" />
        root>
    
        <logger name="com.baizhi.service" level="DEBUG" additivity="false">
            <appender-ref ref="STDOUT" />
            <appender-ref ref="flume" />
        logger>
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    import com.baizhi.service.IUserSerivice;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    
    @Service
    public class UserService implements IUserSerivice {
        private static final Logger LOG= LoggerFactory.getLogger(UserService.class);
        @Override
        public String sayHello(String name) {
            LOG.info("hello "+name);
            return "hello "+name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    @SpringBootApplication
    public class FlumeAplication {
        public static void main(String[] args) {
            SpringApplication.run(FlumeAplication.class,args);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    @SpringBootTest(classes = {KafkaSpringBootApplication.class})
    @RunWith(SpringRunner.class)
    public class KafkaTempolateTests {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        @Autowired
        private IOrderService orderService;
    
        @Test
        public void testOrderService(){
            orderService.saveOrder("002","baizhi xxxxx ");
        }
        @Test
        public void testKafkaTemplate(){
            kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
                @Override
                public Object doInOperations(KafkaOperations kafkaOperations) {
                    return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
                }
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    .LoggerFactory;
    import org.springframework.stereotype.Service;

    @Service
    public class UserService implements IUserSerivice {
    private static final Logger LOG= LoggerFactory.getLogger(UserService.class);
    @Override
    public String sayHello(String name) {
    LOG.info("hello "+name);
    return "hello "+name;
    }
    }

    
    ```java
    @SpringBootApplication
    public class FlumeAplication {
        public static void main(String[] args) {
            SpringApplication.run(FlumeAplication.class,args);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    @SpringBootTest(classes = {KafkaSpringBootApplication.class})
    @RunWith(SpringRunner.class)
    public class KafkaTempolateTests {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        @Autowired
        private IOrderService orderService;
    
        @Test
        public void testOrderService(){
            orderService.saveOrder("002","baizhi xxxxx ");
        }
        @Test
        public void testKafkaTemplate(){
            kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
                @Override
                public Object doInOperations(KafkaOperations kafkaOperations) {
                    return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo"));
                }
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    反射内存卡驱动的安装
    火山引擎 RTC 音频 AI 降噪的应用与实践
    微信小程序中微信支付流程
    区块链技术中的共识机制算法:以工作量证明(PoW)为例
    做短视频的赶紧用起来,超好用的配音神器~
    K8SYaml文件详解及编写示例
    中国计算机学会芯片大会 (CCF Chip 2024)
    设计模式:享元模式
    ubuntu添加环境变量
    C. Card Game(dp&组合数)
  • 原文地址:https://blog.csdn.net/qq_42074949/article/details/119829020