• Flink自定义网课数据源


    一、定义实体类

    1. package com.lihaiwei.text1.model;
    2. import com.lihaiwei.text1.util.TimeUtil;
    3. import lombok.AllArgsConstructor;
    4. import lombok.Data;
    5. import lombok.NoArgsConstructor;
    6. import org.apache.flink.streaming.api.functions.source.SourceFunction;
    7. import java.util.Date;
    8. @Data
    9. @AllArgsConstructor
    10. @NoArgsConstructor
    11. public class VideoOrder {
    12. private String tradeNo;
    13. private String title;
    14. private int money;
    15. private int userId;
    16. private Date createTime;
    17. @Override
    18. public String toString() {
    19. return "VideoOrder{" +
    20. "tradeNo='" + tradeNo + '\'' +
    21. ", title='" + title + '\'' +
    22. ", money=" + money +
    23. ", userId=" + userId +
    24. ", createTime=" + TimeUtil.format(createTime) +
    25. '}';
    26. }
    27. }

    二、自定义数据源:

    1. package com.lihaiwei.text1.source;
    2. import com.lihaiwei.text1.model.VideoOrder;
    3. import org.apache.flink.configuration.Configuration;
    4. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    5. import java.util.*;
    6. public class VideoOrderSource extends RichParallelSourceFunction {
    7. private volatile Boolean flag = true;
    8. private Random random = new Random();
    9. private static List list = new ArrayList<>();
    10. static {
    11. list.add("spark课程");
    12. list.add("oracle课程");
    13. list.add("RabbitMQ消息队列");
    14. list.add("Kafka课程");
    15. list.add("hadoop课程");
    16. list.add("Flink流式技术课程");
    17. list.add("工业级微服务项目大课训练营");
    18. list.add("Linux课程");
    19. }
    20. /**
    21. * run 方法调用前 用于初始化连接
    22. * @param parameters
    23. * @throws Exception
    24. */
    25. @Override
    26. public void open(Configuration parameters) throws Exception {
    27. System.out.println("-----open-----");
    28. }
    29. /**
    30. * 用于清理之前
    31. * @throws Exception
    32. */
    33. @Override
    34. public void close() throws Exception {
    35. System.out.println("-----close-----");
    36. }
    37. /**
    38. * 产生数据的逻辑
    39. * @param ctx
    40. * @throws Exception
    41. */
    42. @Override
    43. public void run(SourceContext ctx) throws Exception {
    44. while (flag){
    45. Thread.sleep(1000);
    46. String id = UUID.randomUUID().toString();
    47. int userId = random.nextInt(10);
    48. int money = random.nextInt(100);
    49. int videoNum = random.nextInt(list.size());
    50. String title = list.get(videoNum);
    51. VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
    52. //VideoOrderSourceV2
    53. ctx.collect(videoOrder);
    54. }
    55. }
    56. /**
    57. * 控制任务取消
    58. */
    59. @Override
    60. public void cancel() {
    61. flag = false;
    62. }
    63. }

  • 相关阅读:
    J2EE基础:Spring及ioc
    制作 U 盘操作系统安装盘后空间变小如何解决
    Windows安装Erlang
    [附源码]Python计算机毕业设计SSM晋中学院教室管理系统(程序+LW)
    JVM 面试题
    Metabase学习教程:视图-4
    用 ZEGO Avatar 做一个虚拟人|虚拟主播直播解决方案
    随手记录第五话 -- SpringCache搭配Redis的实现以及设置每个key的过期时间
    生态系统长期观测数据产品体系
    【JavaSE】初识泛型
  • 原文地址:https://blog.csdn.net/w13716207404/article/details/127108585