• Flink学习20:聚合算子(sum,max,min)


    1.聚合算子简介

    常见的聚合算子 sum,max,min等

    聚合算子可以在在keyedStream 流上进行滚动的聚合(即累计的操作),而且同一个 keyedStream 流上只能调用一次 聚合算子

     

    sum 示例:

    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    object aggregationTest {
    
    //defined the dataSource's type
      case class StockPrice(stockId:String, timeStamp:Long, price:Double)
    
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //generate ds
    
        val stockList = List(StockPrice("stock_1", 66666, 1)
          , StockPrice("stock_1", 8888, 2)
          , StockPrice("stock_2", 77777, 1)
          , StockPrice("stock_2", 999, 3)
          , StockPrice("stock_3", 3333, 1)
        )
    
        val ds = env.fromCollection(stockList)
    
        //transformation
    
        val keyedStream = ds.keyBy("stockId")
    
        val sumedStream = keyedStream.sum(2)
    
        sumedStream.print()
    
        env.execute()
    
      }
    
    }

    输出结果:

    max示例:

    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    //defined the dataSource's type
    case class StockPrice(stockId:String, timeStamp:Long, price:Double)
    
    object maxTest {
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //generate ds
        val stockList = List(StockPrice("stock_1", 66666, 1)
          , StockPrice("stock_1", 8888, 2)
          , StockPrice("stock_2", 77777, 1)
          , StockPrice("stock_2", 999, 3)
          , StockPrice("stock_3", 3333, 1)
        )
    
        val ds = env.fromCollection(stockList)
    
        //transformation
        val keyedStream = ds.keyBy("stockId")
    
        val maxedStream = keyedStream.max(2)
    
        maxedStream.print()
    
        env.execute()
    
      }
    
    
    }
    

    输出结果:

     

     

  • 相关阅读:
    【软考软件评测师】第三十章 操作系统(PV操作与死锁)
    力扣287. 寻找重复数
    13 如何利用缓存实现万级并发扣减
    昂首资本通过套期保值,MT4和MT5这样选
    SpringBoot的搭建(两种方式)
    为mysql添加TCMalloc库,以提升性能!
    MFC文本输出学习
    java计算机毕业设计Vue框架校园相约健康运动平台源码+mysql数据库+系统+lw文档+部署
    《Java笔记——基础语法》
    axios封装
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126296749