• Flink学习20:算子介绍reduce


    1.reduce简介

    按照指定的方式,把每个元素进行累计执行。比如实现累加计算

     

    示例:

    import keyByNameTest.StockPrice
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    object reduceTest {

      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)

      def main(args: Array[String]): Unit = {

        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment

        //create ds

        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))

        val ds = env.fromCollection(pricesList)

        //transformation

    //update the stock's new time, and accumulate the price
        val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))

        reducedDs.print()

        env.execute()

      }

    }


    输出结果:

    自定义reduce func

    核心步骤:

    1.继承 ReduceFunction 类

    2.重写reduce 方法

    示例:

    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    
    object myReduceTest {
    
    
      //defined the dataSource's data type
      case class StockPrice(stockId:String, timestamp: Long, price:Double)
    
      //define my reduce func
      update the stock's new time, and accumulate the price
      class MyReduceFunc extends ReduceFunction[StockPrice] {
        override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {
    
          //update the stock's new time, and accumulate the price
          StockPrice(t.stockId, t1.timestamp, t.price + t1.price)
    
        }
      }
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create ds
        val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
    
        val ds = env.fromCollection(pricesList)
    
        //transformation
        val keyByedDs = ds.keyBy(0)
    
          //use my reduce func
        val myReducedDs = keyByedDs.reduce(new MyReduceFunc)
    
        myReducedDs.print()
    
        env.execute()
      }
    
    
    
    }
    

    输出结果:

     

     

  • 相关阅读:
    java毕业设计MVC的时鲜蔬菜配送系统Mybatis+系统+数据库+调试部署
    【算法集训专题攻克篇】第十二篇之链表
    LeetCode75——Day29
    C++——string的封装
    Node.js的基本概念&&node -v 和npm -v 这两个命令的作用
    2022/11/20[指针] 通过函数,利用指针将数组a中前n个元素按相反顺序存放
    [github初学者教程] 分支管理-以及问题解决
    每日刷题记录 (四)
    语法分析出错,不是 GROUP BY 表达式
    xcode15出现大量Duplicate symbols
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126269374