实现监控指定文件改动后,将改动的文件内容传输到kafkatopic中
# the core components a1.sources = r1 a1.sinks = k1 a1.channels = c1 # set source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /export/server/flume/conf/eventlog.log # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic=eventlog a1.sinks.k1.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
cd kafka/bin kafka-server-start.sh -daemon ../config/server.properties |
./kafka-console-consumer.sh --topic eventlog --bootstrap-server node1:9092,node2:9092,node3:9092 |
./flume-ng agent -c ../conf/ -f ../conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console |
mv eventlog.log eventlog.log2 mv eventlog.log2 eventlog.log |
SourceKafka.scala
class SourceKafka { def getKafkaSource(topicName: String) : FlinkKafkaConsumer[String] ={ val props = new Properties(); //kafka消费者节点信息 props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");//3,4 //kafka组id props.setProperty("group.id","test-consumer-group") //key序列化方式 props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") //value序列化方式 props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") //kafka偏移量消费策略 /** * none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常 * earliest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从头开始消费 * latest:在各分区下有提交的offset时:从offset处开始消费,在各分区下无提交的offset时:从最新的数据开始消费 */ props.setProperty("auto.offset.reset","latest") new FlinkKafkaConsumer[String](topicName, new SimpleStringSchema(),props) } } |
AdEventLog.scala
object AdEventLog { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //设置时间特征为事件时间 //从kafka中获取数据(flume) val kafkaSource: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("eventlog") val eventLogStream: DataStream[String] = env.addSource(kafkaSource) eventLogStream.print() env.execute() } } |
结果展示: