• Flume(5个demo轻松入门)


    kafka整合 flume 

    3个问题
    海量订单 存入消息队列 前提 手动
    克隆表达式(定时器) springBoot整合 ack  简单发送 同步发送 异步发送加快速度(会丢失数据)
    分库数据库顶不住 
    flume 安装在分机上
    ​
    常用版本
    oragle 11g
    mysql 5.7
    tomcat 
    spring 5.2.12
    nacos 1.4.2
    setea 
    setinel 
    canal 1.1.6
    flume 1.6cdh 5.14.2
    redis 5
    kafka 2.0.0
    nginx 1.8.1
    zookerper 
    ​
    注意关注Agent内的部分,包含了Source、Channel和Sink三部分,Agent是一个JVM进程。
    在关注这三部分之前了解一个概念:Flume event。event将要传输数据进行封装,然后以event的形式来传输数据,是flume传输数据的基本单位。现在来关注三部分的功能。
    ​
    Source:用来收集数据。数据源可以是多种,例如web server。它可以处理各种类型的数据,包括自定义的类型;
    Channel:用来缓存数据。当Source收集到数据时,将数据缓存到 一个或多个 Channel中(可以在内存,也可以在磁盘文件中),直到Sink取出其中的数据;
    Sink:取出Channel中的数据,将数据发送到目的地。目的地例如HDFS,还可以是另一个Agent的Source。
    可靠性
    只有Channel中的缓存全部到达目的地或者下一个Agent时,Channel才会清空缓存。
    ​
    可恢复性
    Channel可以将缓存在文件或者内存中,在操作失败时可以进行数据恢复。
    ​
    配置AGENT
    Agent配置存放在一个本地配置文件中,它是一个遵循java属性配置的文件。一个配置文件可以配置一个或多个Agent。配置包含每个Sourcce,Channel和Sink的属性配置,以及他们如何注入以形成数据流。
    ​

    kafka基本常识(使用前提)

    ​
    kafka命令行查看topic
    ​
    查看当前服务器中的所有topic:
    bin/kafka-topics.sh --zookeeper 主机:2181 --list
    创建topic:
    bin/kafka-topics.sh --zookeeper 主机:2181 --create --replication-factor 3 --partitions 1 --topic 名
    字
    删除topic:
    bin/kafka-topics.sh --zookeeper 主机:2181 --delete --topic 名字
    需要server.properties中设置delete.topic.enable=true否则只是标记删除。
    发送消息:
    bin/kafka-console-producer.sh --broker-list 主机:9092 --topic 名字
    >hello world
    消费消息:
    bin/kafka-console-consumer.sh --zookeeper 主机:9092 --topic 名字
    bin/kafka-console-consumer.sh --bootstrap-server 主机:9092 --topic 名字
    bin/kafka-console-consumer.sh --bootstrap-server 主机:9092 --from-beginning --topic 名字
    查看某个topic的情况
    bin/kafka-topic.sh --zookeeper 主机:2181 --describe --topic 名字
    修改分区数:
    bin/kafka-topics.sh --zookeeper 主机:2181 --alter --topic 名字 --partitions 6

    demo1

     

    1.解压

    cd /opt/
    ls
    tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz 
    ls
    mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160
    ls
    cd soft/flume160/conf/
    ls
    cp flume-conf.properties.template  flume-conf.properties
    ​
    ​
    ​

    2.编写hello

     

    #在opt创建flumecfg配置文件 first.conf,添加如下内容

    cd /opt/
    mkdir flumecfg
    cd flumecfg/
    ls
    vim first.conf
    #文件配置
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1
    ​
    a1.sources.r1.type=netcat
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=9999
    ​
    a1.sinks.k1.type=logger
    ​
    a1.channels.c1.type=memory
    ​
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    ​

     

     

    3.安装navcat

    #安装 netcat 工具
    yum install -y nc
    ​
    #判断 44444 端口是否被占用
    sudo netstat -nlp | grep 44444
    ​
    #判断 9999 端口是否被占用
    sudo netstat -nlp | grep 9999
    #等待3.1窗口启动 在运行一下代码
    nc localhost 9999

    3.1换窗口

    #开启flume监听端口

    cd /opt/soft/flume160/bin/
    #配置
    ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/first.conf -Dflume.root.logger=INFO,console 

    注意先启动窗口3.1 在启动之前的窗口!!!

     

    demo2

    1.使用窗口3.1

     

    vim /opt/flumecfg/first.conf
    touch /opt/flumecfg/sec.conf
    vim sec.conf
    #添加以下内容
    #命名sources channels sinks
    a1.channels = c1
    a1.sources = s1
    a1.sinks = k1
    ​
    a1.sources.s1.type = spooldir
    #监听的内容来自/opt/data目录
    a1.sources.s1.spoolDir = /opt/data
    ​
    a1.channels.c1.type=memory
    #输出的目的地是控制台的logger类型
    a1.sinks.k1.type = logger
    ​
    #将sink source 和channel绑定
    a1.sinks.k1.channel = c1
    a1.sources.s1.channels = c1
    ​
    #保存
    :wq
    ​

    2.拖入文件

     

    3.开启flume监听端口

    cd /opt/soft/flume160/bin/
    #配置
    ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/sec.conf -Dflume.root.logger=INFO,console 

     

    demo3

     

    1.配置

    cd /opt/flumecfg/
    ls
    cp sec.conf third.conf
    vim third.conf
    #配置添加以下内容
    a1.channels = c1
    a1.sources = s1
    a1.sinks = k1
    
    a1.sources.s1.type = spooldir
    a1.sources.s1.spoolDir = /opt/data
    
    a1.channels.c1.type=memory
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mydemo
    a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092
    a1.sinks.k1.kafka.producer.acks = 1
    
    a1.sinks.k1.channel = c1
    a1.sources.s1.channels = c1
    :wq
    
    
    

     

     

    2.新开窗口A启动 kafka

    #zookeper先启动
    zkServer.sh start
    
    #kafka启动
    kafka-server-start.sh /opt/soft/kafka200/config/server.properties
    
    #新开窗口b 开启consumer监控
    kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo
    
    

    3.日志文件改名

    .log
    
    

     

    4.新开窗口C 启动

    cd /opt/soft/flume160/bin/
    
    ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf
    

     

    demo4(二次开发)

     

    1.新建工程

     

    1.1导入pom

    
    
    
      4.0.0
    
      org.example
      tcinterceptor
      1.0-SNAPSHOT
    
      tcinterceptor
      
      http://www.example.com
    
      
        UTF-8
        1.8
        1.8
      
    
      
        
          junit
          junit
          4.11
          test
        
        
          org.apache.kafka
          kafka-clients
          2.5.1
        
      
    
      
        
          
            maven-compiler-plugin
            2.3.2
            
              1.8
              1.8
            
          
          
            maven-assembly-plugin
            
              
                jar-with-dependencies
              
              
                
                  com.kgc.towercrane.intceptors.SimplePartitioner
                
              
            
            
              
                make-assembly
                package
                
                  single
                
              
            
          
        
    
      
    
    
    
    
    

    2.编写SimplePartitioner类


    3.拖入jar包

     

    4.配置正则

    cd /opt/flumecfg
    vim third.conf
    #添加
    a1.sources.s1.spoolDir = /opt/data
    a1.sources.s1.interceptors=i1
    a1.sources.s1.interceptors.i1.type=regex_extractor
    a1.sources.s1.interceptors.i1.regex=(TC[0-9]+).*
    a1.sources.s1.interceptors.i1.serializers=ssl
    a1.sources.s1.interceptors.i1.serializers.ssl.name=key
    a1.channels.c1.type=memory
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = mydemo
    a1.sinks.k1.kafka.bootstrap.servers = 192.168.64.138:9092
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.partitioner.class=com.kgc.towercrane.intceptor.SimplePartitioner
    
    a1.sinks.k1.channel = c1
    a1.sources.s1.channels = c1
    
    :wq
    

    5.配置zookeper del

    vim /opt/soft/kafka200/config/server.properties
    #必须配置为true才能删除
    delete.topic.enable=true
    :wq
    
    

     

    6.删除

    kafka-topics.sh --delete --zookeeper 192.168.64.138:2181 --topic mydemo
    
    #查看删除结果
    cd /opt/soft/kafka200/kafka-logs/
    ls
    

    7.创建mydemo

    #创建3个分区
    kafka-topics.sh --create --zookeeper 192.168.64.138:2181 --topic mydemo --replication-factor 1 --partitions 3
    
    #查看分区结果
    cd /opt/soft/kafka200/kafka-logs/
    ls
    #先建立窗口A B C D
    #1 zookeper先启动(窗口A)
    zkServer.sh start
    
    #2 kafka启动(窗口A)
    kafka-server-start.sh /opt/soft/kafka200/config/server.properties
    
    #3 监控kafka(窗口B)
    kafka-console-consumer.sh --bootstrap-server 192.168.64.138:9092 --topic mydemo
    
    #4进入目录(窗口C)
    cd /opt/soft/flume160/bin
    #4.1 启动flume(窗口C)
    ./flume-ng agent -n a1 -c /opt/soft/flume160/conf/ -f /opt/flumecfg/third.conf -Dflume.root.logger=INFO,console 
    
    #5 监控mydemo便于查看分区数量(窗口D)
    kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.64.138:9092 --topic mydemo
    
    #组重置
    ./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
    

     

    8问题测试

     

     

    9.ideal配置MyConsumer类

    @Component
    public class MyConsumer {
        @KafkaListener(topics = "tc",groupId = "cm")
        public void readKafka(ConsumerRecord record, Acknowledgment ack){
            System.out.println(record.key()+"====>"+record.value()+"====>"+record.partition());
            ack.acknowledge();
        }
    }
    
    #application启动
    

    10.组重置

    ./kafka-consumer-groups.sh --bootstrap-server=192.168.64.138:9092 --execute --reset-offsets --topic=tc --group=cm --to-earliest
    

    demo5(springBoot整合kafka消费者 详细见下篇文章)

    1.ideal tcmanager配置TcinfoPushService

     

    1

    #启动nacos
    startup.sh 
    

     

  • 相关阅读:
    科目三:右转弯
    SpringCloud - 服务注册中心
    Docker 网络管理及资源控制
    2022.11.16-----leetcode.775
    08-BOM&DOM概念
    LQ0112 立方和【进制】
    前端面试,备考第 13 天 - 执行上下文 | 作用域链 | 闭包
    【面试】C++和python的区别
    去中心化与无平台成员:与 Nasheq.eth、Ivan Manchev和Rob Edwards开启 “智能钱包”系列对话!
    导出excel报错:fontConfigiration错误
  • 原文地址:https://blog.csdn.net/just_learing/article/details/126013000