本案例实现的功能统计对Kafka中的消息单词出现的次数,即词频统计。其主要演示了Flink流式程序消费kafka中的消息,其目的想让初学者了解Flink如何编写消费Kafka中消息的程序以及通过程序的演示来进一步学习flink。
实验环境:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.10.1</version>
</dependency>
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
object SourceFromKafka {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "slave01:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
SimpleStringSchema(), properties))
stream3.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
env.execute("SourceFromKafka Job starting……")
}
}
kafka-server-start.sh ./server.properties &
kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181,--replication-factor 1 --partitions 2 --topic sensor
kafka-console-producer.sh --broker-list slave01:9092 --topic sensor

hello world
hello world
hello world
hello world
hello world
