• flink消费kafka时获取元数据信息


    flink消费kafka时,只需要简单配置就能使用并正常运行

    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val props = new Properties()
    3. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    4. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
    5. val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    6. val stream1 = env.addSource(consumer1)
    7. stream1.print()
    8. env.execute("KafkaSourceStreaming")

    但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema 接口来实现自定义的反序列化逻辑。

    1. object KafkaSourceStreaming {
    2. def main(args: Array[String]): Unit = {
    3. val env = StreamExecutionEnvironment.getExecutionEnvironment
    4. val props = new Properties()
    5. props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    6. props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")
    7. val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    8. val stream1 = env.addSource(consumer1)
    9. stream1.print()
    10. val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
    11. val stream = env.addSource(consumer)
    12. stream.print()
    13. env.execute("KafkaSourceStreaming")
    14. }
    15. /**
    16. * 获取kafka元数据信息
    17. */
    18. class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
    19. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
    20. val key = if (record.key() == null) null else new String(record.key())
    21. val value = new String(record.value())
    22. new ConsumerRecord[String, String](
    23. record.topic(),
    24. record.partition(),
    25. record.offset(),
    26. record.timestamp(),
    27. record.timestampType(),
    28. record.checksum(),
    29. record.serializedKeySize(),
    30. record.serializedValueSize(),
    31. key,
    32. value,
    33. record.headers(),
    34. record.leaderEpoch()
    35. )
    36. }
    37. override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false
    38. override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
    39. TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
    40. }
    41. }
    42. }

  • 相关阅读:
    要学习使用 calib3D 模块在图像中创建 3D 效果-姿势估计
    Redis集群高可用架构
    基于SSM的医院在线挂号预约系统的设计与实现
    docsify多页文档找不到文档404
    前端面试题
    华为机试真题 Java 实现【磁盘容量排序】
    solr自定义定制自带core添加分词器,解决镜像没有权限问题
    ROS数据格式转换:LaserScan转MultiEchoLaserScan
    Redis数据持久化方式RDB和AOF的区别
    Lindorm-Operator 云原生实践
  • 原文地址:https://blog.csdn.net/SuperBoy_Liang/article/details/139654728