• Flink基于时间窗口定时输出到ElasticSearch中并做到真正不丢数据


    Flink时间窗口运用

    介绍Flink定时输出到外部存储介质,有两种办法实现,在RichSinkFunction中实现SinkFunction的方法,在其中open()方法中引入java的定时任务

    另一种实现,基于Flink window窗口机制,将结果定时sink到ElasticSearch中。

    需求:


    经过flink清洗后的数据,要求每500毫秒sink一次数据到ES中(该文件内容是String格式,需要进行追加,不属于大家可以用Java实现,具体代码我就不细讲了)。

    实现:

    1、序列化方法:

    1. /**
    2. * 自定义序列化器
    3. */
    4. public class CustomDeserialization implements DebeziumDeserializationSchema {
    5. @Override
    6. public void deserialize(SourceRecord sourceRecord, Collector collector)
    7. throws Exception {
    8. JSONObject res = new JSONObject();
    9. // 获取数据库和表名称
    10. String topic = sourceRecord.topic();
    11. String[] fields = topic.split("\\.");
    12. String database = fields[1];
    13. String tableName = fields[2];
    14. Struct value = (Struct) sourceRecord.value();
    15. // 获取before数据
    16. Struct before = value.getStruct("before");
    17. JSONObject beforeJson = new JSONObject();
    18. if (before != null) {
    19. Schema beforeSchema = before.schema();
    20. List beforeFields = beforeSchema.fields();
    21. for (Field field : beforeFields) {
    22. Object beforeValue = before.get(field);
    23. beforeJson.put(field.name(), beforeValue);
    24. }
    25. }
    26. // 获取after数据
    27. Struct after = value.getStruct("after");
    28. JSONObject afterJson = new JSONObject();
    29. if (after != null) {
    30. Schema afterSchema = after.schema();
    31. List afterFields = afterSchema.fields();
    32. for (Field field : afterFields) {
    33. Object afterValue = after.get(field);
    34. afterJson.put(field.name(), afterValue);
    35. }
    36. }
    37. //获取操作类型 READ DELETE UPDATE CREATE
    38. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
    39. String type = operation.toString().toLowerCase();
    40. if ("create".equals(type)) {
    41. type = "insert";
    42. }
    43. // 将字段写到json对象中
    44. res.put("database", database);
    45. res.put("tableName", tableName);
    46. res.put("before", beforeJson);
    47. res.put("after", afterJson);
    48. res.put("type", type);
    49. //输出数据
    50. collector.collect(res.toString());
    51. }
    52. @Override
    53. public TypeInformation getProducedType() {
    54. return BasicTypeInfo.STRING_TYPE_INFO;
    55. }
    56. }

    以上时序列号方法,大家可以随意定义因为我这块用了 Flink CDC

    2、时间窗口的启用:

    1. /**
    2. * 按时间开窗收集更新全量不会丢数
    3. */
    4. DataStream> streamList = streamSource
    5. .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
    6. .process(new ProcessAllWindowFunction, TimeWindow>() {
    7. @Override
    8. public void process(Context context, Iterable iterable, Collector> collector) throws Exception {
    9. List arrayList = new ArrayList();
    10. iterable.forEach(single -> {
    11. arrayList.add(single);
    12. });
    13. if (arrayList.size() > 0) {
    14. collector.collect(arrayList);
    15. }
    16. }
    17. });

    3、Sink下层处理

    1. @Override
    2. public void invoke(List values, Context context) throws Exception {
    3. try {
    4. List> list = new ArrayList<>();
    5. String tName = "";
    6. for (String value : values) {
    7. JSONObject jsonObject = JSON.parseObject(value.toString());
    8. // String arrayslist = Arrays.asList(pgConnection.getTableList()).toString();
    9. String schemaName = jsonObject.get("database").toString();
    10. String tableName = jsonObject.get("tableName").toString();
    11. //多表流需要判断处理,不一样流写入到ES索引也是不一样的
    12. //if (Arrays.asList(pgConnection.getTableList()).contains(schemaName + "." + tableName)) {
    13. tName = schemaName + "_" + tableName;
    14. JSONObject jsonAfter = JSON.parseObject(jsonObject.get("after").toString());
    15. //System.out.println(esLogAppendServer.getFields());
    16. if (jsonObject != null) {
    17. Map map = new HashMap();
    18. for (Map.Entry entry : jsonAfter.entrySet()) {
    19. //这里处理一下日期变成时间戳问题,以下进行遍历执行
    20. map.put(entry.getKey(), entry.getValue());
    21. }
    22. list.add(map);
    23. }
    24. }
    25. saveElasticSearch(tName, list);
    26. } catch (Exception ex) {
    27. log.info(DateUtils.getDate() + "---" + ex.toString());
    28. }
    29. }

    测试:

     方便测试,先将时间改为每100毫秒执行,Time.milliseconds(100),通过开窗获取100毫秒的数据:

    第1个时间窗口到达:Iterable中集合了这100毫秒接收的所有实时数据,统一处理

    总结:

         Flink是实时处理,window机制可以认为是flink的批处理实现,因为需要等待水位线对齐触发timer。一般还基于时间窗口做一些批量处理不会丢数据,所以比较适合数据表全量更新。

  • 相关阅读:
    Educational Codeforces Round 137 (Rated for Div. 2) DE
    神经网络-非线性激活
    如何查看所有员工电脑访问网站记录?
    python库-dotenv包 | .env配置文件
    【高级RAG技巧】在大模型知识库问答中增强文档分割与表格提取
    详解clickhouse分区目录的合并过程
    【微信小程序】一文解决button、input、image组件
    8 AVL树的判断---来源刘H同学
    软件设计模式学习笔记(七)
    .net第二章数据类型、变量和常量
  • 原文地址:https://blog.csdn.net/Angel_asp/article/details/126540610