一、定义实体类
- package com.lihaiwei.text1.model;
-
- import com.lihaiwei.text1.util.TimeUtil;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
-
- import java.util.Date;
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class VideoOrder {
- private String tradeNo;
- private String title;
- private int money;
- private int userId;
- private Date createTime;
-
- @Override
- public String toString() {
- return "VideoOrder{" +
- "tradeNo='" + tradeNo + '\'' +
- ", title='" + title + '\'' +
- ", money=" + money +
- ", userId=" + userId +
- ", createTime=" + TimeUtil.format(createTime) +
- '}';
- }
- }
二、自定义数据源:
- package com.lihaiwei.text1.source;
-
- import com.lihaiwei.text1.model.VideoOrder;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-
- import java.util.*;
-
- public class VideoOrderSource extends RichParallelSourceFunction
{ -
-
- private volatile Boolean flag = true;
-
- private Random random = new Random();
-
- private static List
list = new ArrayList<>(); - static {
- list.add("spark课程");
- list.add("oracle课程");
- list.add("RabbitMQ消息队列");
- list.add("Kafka课程");
- list.add("hadoop课程");
- list.add("Flink流式技术课程");
- list.add("工业级微服务项目大课训练营");
- list.add("Linux课程");
- }
-
-
- /**
- * run 方法调用前 用于初始化连接
- * @param parameters
- * @throws Exception
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- System.out.println("-----open-----");
- }
-
- /**
- * 用于清理之前
- * @throws Exception
- */
- @Override
- public void close() throws Exception {
- System.out.println("-----close-----");
- }
-
-
- /**
- * 产生数据的逻辑
- * @param ctx
- * @throws Exception
- */
- @Override
- public void run(SourceContext
ctx) throws Exception { -
- while (flag){
- Thread.sleep(1000);
- String id = UUID.randomUUID().toString();
- int userId = random.nextInt(10);
- int money = random.nextInt(100);
- int videoNum = random.nextInt(list.size());
- String title = list.get(videoNum);
- VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
- //VideoOrderSourceV2
- ctx.collect(videoOrder);
- }
-
-
- }
-
- /**
- * 控制任务取消
- */
- @Override
- public void cancel() {
-
- flag = false;
- }
- }