按照指定的方式,把每个元素进行累计执行。比如实现累加计算
import keyByNameTest.StockPrice
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object reduceTest {
//defined the dataSource's data type
case class StockPrice(stockId:String, timestamp: Long, price:Double)
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
val ds = env.fromCollection(pricesList)
//transformation
//update the stock's new time, and accumulate the price
val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))
reducedDs.print()
env.execute()
}
1.继承 ReduceFunction 类
2.重写reduce 方法
import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object myReduceTest { //defined the dataSource's data type case class StockPrice(stockId:String, timestamp: Long, price:Double) //define my reduce func update the stock's new time, and accumulate the price class MyReduceFunc extends ReduceFunction[StockPrice] { override def reduce(t: StockPrice, t1: StockPrice): StockPrice = { //update the stock's new time, and accumulate the price StockPrice(t.stockId, t1.timestamp, t.price + t1.price) } } def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create ds val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23)) val ds = env.fromCollection(pricesList) //transformation val keyByedDs = ds.keyBy(0) //use my reduce func val myReducedDs = keyByedDs.reduce(new MyReduceFunc) myReducedDs.print() env.execute() } }