• Flink 侧输出流(SideOutput)


    🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。

    🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。ProcessFunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

    当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流

    val OutPut=OutputTag[String]("side-output")

    注意:OutputTag是如何根据旁路输出流包含的元素类型typed的    

     ✨可以通过以下几种函数发射数据到旁路输出

            ProcessFunction

            CoProcessFunction

            ProcessWindowFunction

            ProcessAllWindowFunction

    1. //将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,
    2. //然后通过侧输出流(SideOutput)将不同的流进行分离,得到不同的输出
    3. import com.alibaba.fastjson.JSON
    4. import com.tech.bean.Person_t
    5. import com.tech.util.KafkaSourceUtil
    6. import org.apache.flink.configuration.Configuration
    7. import org.apache.flink.streaming.api.datastream.DataStream
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    9. import org.apache.flink.streaming.api.functions.ProcessFunction
    10. import org.apache.flink.streaming.api.scala._
    11. import org.apache.flink.util.Collector
    12. object sideOutputPerson_t {
    13. def main(args: Array[String]): Unit = {
    14. // UI地址访问:http://localhost:8081/#/job/running
    15. val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
    16. val ksu = new KafkaSourceUtil("person_t", "test-consumer-group")
    17. val dstream = env.addSource(ksu.getSouceInfo())
    18. // 首先需要定义一个OutputTag来标识一个旁路输出流
    19. val outputTag = new OutputTag[String]("person_t_side-output")
    20. val mainDataStream = dstream
    21. .map(line => {
    22. JSON.parseObject(line, classOf[Person_t])
    23. })
    24. val sideOutput = mainDataStream.process(new ProcessFunction[Person_t, String] {
    25. override def processElement(
    26. value: Person_t,
    27. ctx: ProcessFunction[Person_t, String]#Context,
    28. out: Collector[String]): Unit = {
    29. if (!value.getName.contains("_side")) {
    30. out.collect(value.toString)
    31. } else {
    32. // 测输出流输出的部分
    33. ctx.output(outputTag, "sideOutput-> 带有_side标识的数据名称" + value.getName)
    34. }
    35. }
    36. })
    37. val sideOutputStream: DataStream[String] = sideOutput.getSideOutput(outputTag)
    38. // 测输出流处理
    39. sideOutputStream.print("测输出流")
    40. // 常规数据处理
    41. sideOutput.print("常规数据")
    42. env.execute("outSideput")
    43. }
    44. }

  • 相关阅读:
    QGIS 导入文本数据(WKT)
    STC单片机选择外部晶振烧录程序无法切换回内部晶振导致单片机不能使用
    OA系统登录界面(比较好看)
    @Component 和 @Bean的区别
    传统 API 管理与测试过程正面临严峻的挑战
    Vue学习:理解数据代理
    300分钟吃透分布式缓存-16讲:常用的缓存组件Redis是如何运行的?
    学习大数据之后可以做什么?未来发展怎么样?
    ERP为化工企业的采购决策提供技术支持
    超图聚类论文阅读1:Kumar算法
  • 原文地址:https://blog.csdn.net/weixin_61070671/article/details/136223855