• flink消费kafka时获取元数据信息


    flink消费kafka时,只需要简单配置就能使用并正常运行

    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val props = new Properties()
    3. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    4. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
    5. val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    6. val stream1 = env.addSource(consumer1)
    7. stream1.print()
    8. env.execute("KafkaSourceStreaming")

    但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema 接口来实现自定义的反序列化逻辑。

    1. object KafkaSourceStreaming {
    2. def main(args: Array[String]): Unit = {
    3. val env = StreamExecutionEnvironment.getExecutionEnvironment
    4. val props = new Properties()
    5. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    6. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
    7. val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    8. val stream1 = env.addSource(consumer1)
    9. stream1.print()
    10. val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
    11. val stream = env.addSource(consumer)
    12. stream.print()
    13. env.execute("KafkaSourceStreaming")
    14. }
    15. /**
    16. * 获取kafka元数据信息
    17. */
    18. class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
    19. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
    20. val key = if (record.key() == null) null else new String(record.key())
    21. val value = new String(record.value())
    22. new ConsumerRecord[String, String](
    23. record.topic(),
    24. record.partition(),
    25. record.offset(),
    26. record.timestamp(),
    27. record.timestampType(),
    28. record.checksum(),
    29. record.serializedKeySize(),
    30. record.serializedValueSize(),
    31. key,
    32. value,
    33. record.headers(),
    34. record.leaderEpoch()
    35. )
    36. }
    37. override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false
    38. override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
    39. TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
    40. }
    41. }
    42. }

  • 相关阅读:
    UART串口及Linux实现
    .NET 反向代理 YARP 自定义配置提供程序(Configuration Providers)
    Linux-JVM-CPU爆表调优
    过滤器和监听器
    从0搭建Vue3组件库(五): 如何使用Vite打包组件库
    COIG:开源四类中文指令语料库
    6.S081-9线程切换 - Thread Switching
    react常用hooks总结
    多媒体操作流程
    R语言使用purrr包的pmap函数或者map2函数来向量化普通标量函数来处理向量数据、计算两个向量之间的按元素的最大公约数GCD
  • 原文地址:https://blog.csdn.net/SuperBoy_Liang/article/details/139654728