当flink消费kafka时,只需要简单配置就能使用并正常运行
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val props = new Properties()
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
-
-
- val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
- val stream1 = env.addSource(consumer1)
- stream1.print()
-
- env.execute("KafkaSourceStreaming")
但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema
接口来实现自定义的反序列化逻辑。
- object KafkaSourceStreaming {
- def main(args: Array[String]): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- val props = new Properties()
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
-
-
- val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
- val stream1 = env.addSource(consumer1)
- stream1.print()
-
- val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
- val stream = env.addSource(consumer)
- stream.print()
-
- env.execute("KafkaSourceStreaming")
- }
-
-
- /**
- * 获取kafka元数据信息
- */
- class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
- override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
- val key = if (record.key() == null) null else new String(record.key())
- val value = new String(record.value())
- new ConsumerRecord[String, String](
- record.topic(),
- record.partition(),
- record.offset(),
- record.timestamp(),
- record.timestampType(),
- record.checksum(),
- record.serializedKeySize(),
- record.serializedValueSize(),
- key,
- value,
- record.headers(),
- record.leaderEpoch()
- )
- }
-
- override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false
-
- override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
- TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
- }
- }
- }