

1.创建一个类,用来指定数据流中的数据类型
2.创建一个数据源的类,继承RichSourceFunction等类,并重写run 和cancel 方法
3.在main方法中,生成环境后,把自定义的数据源的类,通过addSource 加入到环境中
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import java.util.Calendar
import scala.util.Random
//defined the stockPrice attribute
case class StockPrice(stockID:String, timestamp:Long, price:Double)
//to define myself dataSource
class StockPriceSource extends RichSourceFunction[StockPrice]{
var isRunning: Boolean =true
val rand = new Random()
//initialize the stock price
private var priceList = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)
var stockId =0
var curPrice =0.0d
//when we defined our dataSource function, we must override Two function :run and cancel
override def run(sourceContext: SourceFunction.SourceContext[StockPrice]): Unit = {
while (isRunning){
//change the stock price random
//get the stockid by random
stockId=rand.nextInt(priceList.size)
//generate the random price
val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05
//update the stock price list
priceList = priceList.updated(stockId,curPrice)
//create the time stamp
val curTime = Calendar.getInstance.getTimeInMillis
//add my data source to sourceContext
sourceContext.collect(StockPrice("stock_"+stockId.toString, curTime, curPrice))
//thread sleep
Thread.sleep(rand.nextInt(10))
}
}
override def cancel(): Unit = {
//cancel the run function
isRunning=false
}
}
object myDataSourceTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//set the parallelism
env.setParallelism(1)
//create my dataSource
val stockPriceStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)
//print
stockPriceStream.print()
//execute
env.execute("stock price streaming")
}
}