• Flink整合kafka实现Flume监控指定文件改动


    场景:

    实现监控指定文件改动后,将改动的文件内容传输到kafkatopic中

    第一步修改flume-conf.properties

    # the core components

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # set source

    a1.sources.r1.type = exec

    a1.sources.r1.command = tail -F /export/server/flume/conf/eventlog.log

    # Describe the sink

    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

    a1.sinks.k1.kafka.topic=eventlog

    a1.sinks.k1.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092

    # Use a channel which buffers events in memory

    a1.channels.c1.type = memory

    a1.channels.c1.capacity = 20000

    a1.channels.c1.transactionCapacity = 10000

    # Bind the source and sink to the channel

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    第二步:启动kafka集群

    cd kafka/bin

    kafka-server-start.sh -daemon ../config/server.properties

    第三步:打开kafka消费者

    ./kafka-console-consumer.sh --topic eventlog --bootstrap-server node1:9092,node2:9092,node3:9092

    第四步:启动flume

    ./flume-ng agent -c ../conf/ -f ../conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

    第五步:更改 /export/server/flume/conf/下的eventlog.log 文件

    mv eventlog.log eventlog.log2

    mv eventlog.log2 eventlog.log

    结果:

     

    添加flink程序获取kafka的数据

    SourceKafka.scala

    class SourceKafka {

    def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] ={

    val props = new Properties();

    //kafka消费者节点信息

    props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//3,4

    //kafka组id

    props.setProperty("group.id","test-consumer-group")

    //key序列化方式

    props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    //value序列化方式

    props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    //kafka偏移量消费策略

    /**

    * none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常

    * earliest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从头开始消费

    * latest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从最新的数据开始消费

    */

    props.setProperty("auto.offset.reset","latest")

    new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(),props)

    }

    }

    AdEventLog.scala

    object AdEventLog {

    def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    env.setParallelism(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置时间特征为事件时间

    //从kafka中获取数据(flume)

    val kafkaSource: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("eventlog")

    val eventLogStream: DataStream[String] = env.addSource(kafkaSource)

    eventLogStream.print()

    env.execute()

    }

    }

    结果展示:

     

  • 相关阅读:
    达梦数据库调整非默认模式所属数据库用户
    B-神经网络模型复杂度分析
    复制带随机指针的链表
    54. 螺旋矩阵
    绿色至未来,积极应对树木资源消耗问题-FSC认证
    树莓派图像处理基础知识
    OSINT技术情报精选·2024年6月第2周
    【Linux】Ubuntu20.04版本安装谷歌中文输入法【教程】
    详解JMM
    RabbitMQ - 06 - Topic交换机
  • 原文地址:https://blog.csdn.net/bababuzaijia/article/details/126495888