• Flink学习20:算子介绍reduce


    1.reduce简介

    按照指定的方式,把每个元素进行累计执行。比如实现累加计算

     

    示例:

    import keyByNameTest.StockPrice
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    object reduceTest {

      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)

      def main(args: Array[String]): Unit = {

        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        //create ds

        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

        val ds = env.fromCollection(pricesList)

        //transformation

    //update the stock's new time, and accumulate the price
        val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))

        reducedDs.print()

        env.execute()

      }

    }


    输出结果:

    自定义reduce func

    核心步骤:

    1.继承 ReduceFunction 类

    2.重写reduce 方法

    示例:

    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    
    object myReduceTest {
    
    
      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)
    
      //define my reduce func
      update the stock's new time, and accumulate the price
      class MyReduceFunc extends ReduceFunction[StockPrice] {
        override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {
    
          //update the stock's new time, and accumulate the price
          StockPrice(t.stockId, t1.timestamp, t.price + t1.price)
    
        }
      }
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create ds
        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
    
        val ds = env.fromCollection(pricesList)
    
        //transformation
        val keyByedDs = ds.keyBy(0)
    
          //use my reduce func
        val myReducedDs = keyByedDs.reduce(new MyReduceFunc)
    
        myReducedDs.print()
    
        env.execute()
      }
    
    
    
    }
    

    输出结果:

     

     

  • 相关阅读:
    【技能树笔记】网络篇——练习题解析(七)
    晨起坚持这3件事,懂养生的人每天都在做!护肠胃,更长寿
    科大讯飞--C++开发工程师一面
    【数学建模】层次分析
    RTP/RTCP 协议讲解
    【MySQL高级】Mysql复制及Mysql权限管理
    Android学习---zygote(上)
    推荐系统相关论文阅读整理
    MySQL间隙锁深入分析
    RedisJson 横空出世!
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126269374