主要作用:把相同的数据,汇总到相同的分区中
(数据本来是分布在不同的slot中,keyBy会把相同的数据拉到相同的slot中)
在使用keyBy时候,需要向keyBy传递一个参数,告诉其按照哪个字段进行归类。
有2种传递参数的方式,
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object keyByTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create ds val ds = env.fromElements(("张三", 4), ("张三", 2), ("leo", 5), ("leo", 1),("raj", 8), ("giao", 7)) val keyByedDs = ds.keyBy(0) keyByedDs.print() env.execute() } }
import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object keyByNameTest { //defined the dataSource's data type case class StockPrice(stockId:String, timestamp: Long, price:Double) def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create ds val pricesList = List(StockPrice("stock1", 154545454, 1212.23), StockPrice("stock1", 154545454, 1212.23), StockPrice("stock2", 154545454, 666.23), StockPrice("stock3", 154545454, 888.23)) val ds = env.fromCollection(pricesList) //transformation val keyByedDs = ds.keyBy("stockId") keyByedDs.print() env.execute() } }