🚀 作者 :“大数据小禅”
🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象
第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理
第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发
第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差
第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式
注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层
Flink编程模型
Source来源
元素集合
文件/文件系统
基于Socket
自定义Source,实现接口自定义数据源,rich相关的api更丰富
并行度为1
并行度大于1
Connectors与第三方系统进行对接(用于source或者sink都可以)
Apache Bahir连接器
总结 和外部系统进行读取写入的
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//相同类型元素的数据流 source
DataStream<String> stringDS1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
stringDS1.print("stringDS1");
DataStream<String> stringDS2 = env.fromCollection(Arrays.asList("微服务项目大课,java","alibabacloud,rabbitmq","hadoop,hbase"));
stringDS2.print("stringDS2");
DataStreamSource<Long> longDS3 = env.fromSequence(0,10);
longDS3.print("longDS3");
//DataStream需要调用execute,可以取个名称
env.execute("xdclass job");
}
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> textDS = env.readTextFile("/Users/xdclass/Desktop/xdclass_access.log");
//DataStream textDS = env.readTextFile("hdfs://xdclass_node:8010/file/log/words.txt");
textDS.print();
env.execute("xdclass job");
}
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888);
stringDataStream.print();
env.execute(" job");
}
自定义Source,实现接口自定义数据源
并行度为1
并行度大于1
Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
创建接口
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
}
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
ctx.collect(new VideoOrder(id,title,money,userId,new Date()));
}
}
/**
* 取消任务
*/
@Override
public void cancel() {
flag = false;
}
}
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<VideoOrder> videoOrderDataStream = env.addSource(new VideoOrderSource());
videoOrderDataStream.print();
//DataStream需要调用execute,可以取个名称
env.execute("custom source job");
}
不断产生很多订单