• Flume集成Kafka


    之前提到Flume可以直接采集数据存储到HDFS中,那为什么还要引入Kafka这个中间件呢,这个是因为在实际应用场景中,我们既需要实时计算也需要离线计算。

    image-20240313120335406

    Kfka to HDFS配置

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.channels = channel1
    a1.sources.r1.batchSize = 10
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
    a1.sources.r1.kafka.topics = test_r2p5
    a1.sources.r1.kafka.consumer.group.id = flume-group1
    
    # Bind the source and sink to the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1000
    
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = hdfs://192.168.52.100:9000/kafkaout/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = access
    a1.sinks.k1.hdfs.writeFormat = Text
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.fileType = DataStream
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.rollSize = 134217728
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    File to Kafka配置

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /home/log/test.log
    
    # Bind the source and sink to the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1000
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = test_r2p5
    a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
    a1.sinks.k1.kafka.flumeBatchSize = 10
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    创建Topic

    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 5 --replication-factor 2 --topic test_r2p5
    
    
    • 1
    • 2

    启动flume

    [root@hadoop04 conf-kafka-hdfs]# bin/flume-ng agent --name a1 --conf conf-kafka-hdfs --conf-file conf-kafka-hdfs/kafka-to-hdfs.conf -Dflume.root.logger=INFO,console
    [root@hadoop04 apache-flume-1.11.0-bin]# bin/flume-ng agent --name a1 --conf conf-file-kafka --conf-file conf-file-kafka/file-to-kafka.conf -Dflume.root.logger=INFO,console
    
    
    • 1
    • 2
    • 3

    创建test.log文件

    [root@hadoop04 log]# echo hello world >> /home/log/test.log
    
    
    • 1
    • 2

    验证

    [root@hadoop01 kafka_2.12-2.4.0]# hdfs dfs -cat /kafkaout/2024-03-13/access.1710307375351.tmp
    hello world
    
    
    • 1
    • 2
    • 3

    p01 kafka_2.12-2.4.0]# hdfs dfs -cat /kafkaout/2024-03-13/access.1710307375351.tmp
    hello world

    
    
    
    
    • 1
    • 2
    • 3
  • 相关阅读:
    安装vue vue-server-renderer报错
    移远通信推出六款新型天线,为物联网客户带来更丰富的产品选择
    Java核心知识经典面试题来啦(基础语法篇)
    改进的Salp Swarm优化算法(ISSA)(Matlab代码实现)
    基于Spring Boot的体育馆管理系统的设计与实现
    Nuxt 常见问题与解决方案
    3_docker部署mysql主主备份
    5. 【非递归版】先序、中序、后序遍历 + 求数的深度(用层序遍历实现)
    前端面试宝典React篇16 React Hook 的使用限制有哪些?
    直流多功能电表的接线方法介绍
  • 原文地址:https://blog.csdn.net/Grady00/article/details/136677725