• Flink学习20:算子介绍reduce


    1.reduce简介

    按照指定的方式,把每个元素进行累计执行。比如实现累加计算

     

    示例:

    import keyByNameTest.StockPrice
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    object reduceTest {

      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)

      def main(args: Array[String]): Unit = {

        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        //create ds

        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

        val ds = env.fromCollection(pricesList)

        //transformation

    //update the stock's new time, and accumulate the price
        val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))

        reducedDs.print()

        env.execute()

      }

    }


    输出结果:

    自定义reduce func

    核心步骤:

    1.继承 ReduceFunction 类

    2.重写reduce 方法

    示例:

    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    
    object myReduceTest {
    
    
      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)
    
      //define my reduce func
      update the stock's new time, and accumulate the price
      class MyReduceFunc extends ReduceFunction[StockPrice] {
        override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {
    
          //update the stock's new time, and accumulate the price
          StockPrice(t.stockId, t1.timestamp, t.price + t1.price)
    
        }
      }
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create ds
        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
    
        val ds = env.fromCollection(pricesList)
    
        //transformation
        val keyByedDs = ds.keyBy(0)
    
          //use my reduce func
        val myReducedDs = keyByedDs.reduce(new MyReduceFunc)
    
        myReducedDs.print()
    
        env.execute()
      }
    
    
    
    }
    

    输出结果:

     

     

  • 相关阅读:
    聊聊基于Alink库的推荐系统
    设计模式之单例模式
    Python 多进程编程《*》:shared_memory 模块
    Vue系列之指令
    Jmeter扩展---自定义取样器
    面试官:并发编程实战会吗?(线程控制操作详解)
    如何看待现在的网络安全行业?
    即时通讯开发移动端弱网络优化方法总结
    java迷宫寻找最短路径
    vue-cli 输出的模板 html 文件使用条件语句
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126269374