介绍Flink定时输出到外部存储介质,有两种办法实现,在RichSinkFunction中实现SinkFunction的方法,在其中open()方法中引入java的定时任务。
另一种实现,基于Flink window窗口机制,将结果定时sink到ElasticSearch中。
经过flink清洗后的数据,要求每500毫秒sink一次数据到ES中(该文件内容是String格式,需要进行追加,不属于大家可以用Java实现,具体代码我就不细讲了)。
- /**
- * 自定义序列化器
- */
- public class CustomDeserialization implements DebeziumDeserializationSchema
{ -
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector
collector) - throws Exception {
-
- JSONObject res = new JSONObject();
- // 获取数据库和表名称
- String topic = sourceRecord.topic();
- String[] fields = topic.split("\\.");
- String database = fields[1];
- String tableName = fields[2];
- Struct value = (Struct) sourceRecord.value();
- // 获取before数据
- Struct before = value.getStruct("before");
- JSONObject beforeJson = new JSONObject();
- if (before != null) {
- Schema beforeSchema = before.schema();
- List
beforeFields = beforeSchema.fields(); - for (Field field : beforeFields) {
- Object beforeValue = before.get(field);
- beforeJson.put(field.name(), beforeValue);
- }
- }
- // 获取after数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after != null) {
- Schema afterSchema = after.schema();
- List
afterFields = afterSchema.fields(); - for (Field field : afterFields) {
- Object afterValue = after.get(field);
- afterJson.put(field.name(), afterValue);
- }
- }
- //获取操作类型 READ DELETE UPDATE CREATE
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- String type = operation.toString().toLowerCase();
- if ("create".equals(type)) {
- type = "insert";
- }
-
- // 将字段写到json对象中
- res.put("database", database);
- res.put("tableName", tableName);
- res.put("before", beforeJson);
- res.put("after", afterJson);
- res.put("type", type);
- //输出数据
- collector.collect(res.toString());
- }
-
- @Override
- public TypeInformation
getProducedType() { - return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
以上时序列号方法,大家可以随意定义因为我这块用了 Flink CDC
- /**
- * 按时间开窗收集更新全量不会丢数
- */
- DataStream
> streamList = streamSource
- .windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
- .process(new ProcessAllWindowFunction
, TimeWindow>() { - @Override
- public void process(Context context, Iterable
iterable, Collector> collector)
throws Exception { - List
arrayList = new ArrayList(); - iterable.forEach(single -> {
- arrayList.add(single);
- });
- if (arrayList.size() > 0) {
- collector.collect(arrayList);
- }
- }
- });
- @Override
- public void invoke(List
values, Context context) throws Exception { -
- try {
-
- List
- String tName = "";
-
- for (String value : values) {
-
- JSONObject jsonObject = JSON.parseObject(value.toString());
- // String arrayslist = Arrays.asList(pgConnection.getTableList()).toString();
- String schemaName = jsonObject.get("database").toString();
- String tableName = jsonObject.get("tableName").toString();
-
- //多表流需要判断处理,不一样流写入到ES索引也是不一样的
- //if (Arrays.asList(pgConnection.getTableList()).contains(schemaName + "." + tableName)) {
- tName = schemaName + "_" + tableName;
- JSONObject jsonAfter = JSON.parseObject(jsonObject.get("after").toString());
-
- //System.out.println(esLogAppendServer.getFields());
- if (jsonObject != null) {
- Map
map = new HashMap(); - for (Map.Entry
entry : jsonAfter.entrySet()) { - //这里处理一下日期变成时间戳问题,以下进行遍历执行
- map.put(entry.getKey(), entry.getValue());
- }
- list.add(map);
- }
-
- }
-
- saveElasticSearch(tName, list);
-
- } catch (Exception ex) {
- log.info(DateUtils.getDate() + "---" + ex.toString());
- }
-
- }
方便测试,先将时间改为每100毫秒执行,Time.milliseconds(100),通过开窗获取100毫秒的数据:
第1个时间窗口到达:Iterable中集合了这100毫秒接收的所有实时数据,统一处理
Flink是实时处理,window机制可以认为是flink的批处理实现,因为需要等待水位线对齐触发timer。一般还基于时间窗口做一些批量处理不会丢数据,所以比较适合数据表全量更新。