• Kafka实战 - 03 Kafka消费消息实现数据的处置状态同步


    相关文章:

    Java设计模式实战 - 02 工厂类获取设备连接器 DeviceConnectorFactory
    Java设计模式实战 - 03 工厂类获取数据同步器 OpenDataSynchronizerFactory
    Kafka实战 - 01 自定义 SpringBoot Starter 实现 Kafka 的自动配置

    项目背景:

    SIR平台上,从XDR平台接入的安全告警数据,为了保证两个平台安全告警数据的处置状态一致,SIR平台需要将已经处置闭环的安全告警数据的处置状态同步给XDR平台,从而更新XDR平台上安全告警数据的处置状态。

    1. 依赖管理

    引入 ngsoc-common-kafka 这个自定义starter:

    <dependency>
        <groupId>com.hhgroupId>
        <artifactId>ngsoc-common-kafkaartifactId>
        <version>3.0.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. kafka 消费者配置

    ngsoc:
      kafka:
        clusters:
          - name: ngsoc
            #{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}
            bootstrap-servers: #{{range $data.route}}
              - '127.0.0.1:9092' #{{end}}
            consumers:
              - name: incidentDealStatus
                group-id: incident_deal_status_group
                topics:
                  - INCIDENT_DEAL_STATUS_SYNC
                enable-auto-commit: false
                max-poll-records: 1000
              - name: alertDealStatus
                group-id: alert_deal_status_group
                topics:
                  - ALERT_DEAL_STATUS_SYNC
                enable-auto-commit: false
                max-poll-records: 1000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3. 定时任务 AlertDealStatusSyncTask

    /**
     * 安全告警处置状态同步通用定时任务
     */
    @Log4j2
    @Component
    public class AlertDealStatusSyncTask implements InitializingBean {
    
        @Setter(onMethod_ = { @Autowired })
        private RestHighLevelClient restHighLevelClient;
    
        /**
         *  数据同步器
         */
        @Setter(onMethod_ = @Autowired)
        private OpenDataSyncDispatchService<List<KafkaDealStatusSyncDto>, ApiResponse<?>> openDataSyncDispatchService;
    
        /**
         * kafka消费者
         */
        @Setter(onMethod_ = @Autowired)
        private KafkaConsumer<String, String> alertDealStatusKafkaConsumer;
    
        /**
         * 支持的设备
         */
        private final static SourceDeviceEnum[] SUPPORT_DEVICES = { SourceDeviceEnum.XDR };
    
        /**
         * 获取SourceDeviceEnum枚举类中设备类型的名称集合
         */
        private final static List<String> SRC_DEVICES = Arrays.stream(SUPPORT_DEVICES).map(Enum::name).collect(Collectors.toList());
    
        /**
         * ElasticSearch存放安全事件的索引
         */
        private final static String INDEX = "futurex-incident-index";
    
        /**
         * 设备ip名称
         */
        private final static String FIELD_DEV_IP_NAME = "devIpName";
    
        /**
         * 安全告警uuid
         */
        private final static String FIELD_UUID = "uuId";
    
        /**
         * 原子类
         */
        private AtomicBoolean isRunning;
    
        /**
         * Spring容器启动时执行该方法
         */
        @Override
        public void afterPropertiesSet() throws Exception {
            isRunning = new AtomicBoolean(false);
        }
    
        /**
         * Spring bean销毁前执行该方法
         */
        @PreDestroy
        public void destroy() {
            isRunning.set(false);
        }
        /**
         * 定时任务每隔10s拉取kafka中的消息向XDR平台同步安全告警的处置状态
         */
        @Scheduled(fixedDelayString = "PT10S")
        public void doTask() {
            if (!isRunning.get()) {
                try {
                    isRunning.set(true);
                    StopWatch watch = new StopWatch();
                    watch.start();
                    // 配置文件已经限制了每次拉取1000条
                    ConsumerRecords<String, String> records = alertDealStatusKafkaConsumer.poll(Duration.ofMillis(1000));
                    if (records.count() > 0) {
                        log.info("[安全告警状态同步]alert dealStatus task pull {} records", records.count());
                        // 失败重试三次
                        int retryTimes = 3;
                        for (int i = 0; i < retryTimes; i++) {
                            try {
                                // 同步数据
                                syncData(records);
                                break;
                            } catch (Exception e) {
                                log.error("[安全告警状态同步]sync alert failed, retry {} times", i + 1, e);
                                TimeUnit.SECONDS.sleep(2);
                            }
                        }
                        // 在consumer端消费消息操作完成以后再提交offset
                        alertDealStatusKafkaConsumer.commitSync();
                        watch.stop();
                        log.info("[安全告警状态同步]sync alert dealStatus task finished, total millis {}", watch.getTotalTimeMillis());
                    }
                } catch (Exception e) {
                    // do something
                    log.error("[安全告警状态同步]sync alert failed, with errors", e);
                } finally {
                    isRunning.set(false);
                }
            }
        }
    
        /**
         * 数据同步
         * @param records kafka数据
         */
        private void syncData(ConsumerRecords<String, String> records) throws IOException {
            if (!records.isEmpty()) {
                // 将kafka中的json串转为 Pair, List>
                Pair<List<KafkaDealStatusSyncDto>, List<String>> pair = getBeansAndIds(records);
    
                // 获取同步实体列表 List
                List<KafkaDealStatusSyncDto> dealStatusSyncDtos = pair.getFirst();
    
                // 获取数据uuids列表 List uuids 
                List<String> uuids = pair.getSecond();
    
                // 根据uuids查ElasticSearch数据库获取日志来源信息
                List<AlertDealStatusDto> datas = query(uuids);
    
                if (!CollectionUtils.isEmpty(datas)) {
                    // 生成UUID和数据库返回信息的映射关系
                    Map<String, AlertDealStatusDto> dataMap = datas.stream().collect(Collectors.toMap(AlertDealStatusDto::getUuId, o -> o));
                    // 数据按设备分类,且保证顺序
                    Map<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> syncDataWithDeviceMap = new HashMap<>(SRC_DEVICES.size());
                    // 按顺序处理Kafka数据
                    dealStatusSyncDtos.forEach((kafkaDealStatusSyncDto) -> {
                        AlertDealStatusDto alertDealStatusDto = dataMap.get(kafkaDealStatusSyncDto.getUuId());
                        if (Objects.nonNull(alertDealStatusDto)) {
                            // 告警的日志来源是个数组,需要分别处理
                            List<String> devIpNames = alertDealStatusDto.getDevIpName();
                            if (!CollectionUtils.isEmpty(devIpNames)) {
                                for (String devIpName : devIpNames) {
                                    if (SRC_DEVICES.contains(devIpName)) {
                                        SourceDeviceEnum sourceDevice = SourceDeviceEnum.build(devIpName);
                                        // 将ID替换成第三方同步的UUID
                                        kafkaDealStatusSyncDto.setUuId(alertDealStatusDto.getOriginalUuid());
                                        List<KafkaDealStatusSyncDto> dataList = syncDataWithDeviceMap.get(sourceDevice);
                                        if (CollectionUtils.isEmpty(dataList)) {
                                            dataList = new ArrayList<>();
                                            dataList.add(kafkaDealStatusSyncDto);
                                            syncDataWithDeviceMap.put(sourceDevice, dataList);
                                        } else {
                                            dataList.add(kafkaDealStatusSyncDto);
                                        }
                                    }
                                }
                            }
                        }
                    });
                    // 根据不同的设备类型分发同步数据到不同的同步器
                    dispatchData(syncDataWithDeviceMap);
                }
            }
        }
    
        /**
         * 根据不同的设备类型同步处置状态
         * 
         * @param syncDataWithDeviceMap 设备和同步数据映射
         */
        private void dispatchData(Map<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> syncDataWithDeviceMap) {
            Set<Map.Entry<SourceDeviceEnum, List<KafkaDealStatusSyncDto>>> entrySet = syncDataWithDeviceMap.entrySet();
            for (Map.Entry<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> entry : entrySet) {
                SyncDataWrapper<List<KafkaDealStatusSyncDto>> syncDataWrapper = SyncDataWrapper
                    .<List<KafkaDealStatusSyncDto>>builder()
                    .deviceInfo(DeviceInfo.builder().sourceDevice(entry.getKey()).build())
                    .syncDataType(SyncDataTypeEnum.ALERT_DEAL_STATUS)
                    .data(entry.getValue())
                    .build();
                ApiResponse<?> response = openDataSyncDispatchService.dispatch(syncDataWrapper);
                if (Objects.isNull(response) || response.getCode() != ApiResponse.CODE_OK) {
                    throw new DataSyncException("sync incident dealStatus failed");
                }
            }
        }
    
        /**
         * 根据UUID查询数据
         * 
         * @param uuids ElasticSearch中查询参数
         * @return  List 数据列表
         */
        @NotNull
        private List<AlertDealStatusDto> query(List<String> uuids) throws IOException {
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
            boolQueryBuilder.must(QueryBuilders.termsQuery(FIELD_UUID, uuids));
            boolQueryBuilder.must(QueryBuilders.termsQuery(FIELD_DEV_IP_NAME, SRC_DEVICES));
            searchSourceBuilder.query(boolQueryBuilder);
            // 设置一次查询数据的大小为10000
            searchSourceBuilder.size(10000);
            SearchRequest searchRequest = new SearchRequest(INDEX);
            searchRequest.source(searchSourceBuilder);
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            List<AlertDealStatusDto> alertDealStatusDtos = new ArrayList<>();
            for (SearchHit hit : response.getHits()) {
                for (SearchHit hit : response.getHits()) {
                    alertDealStatusDtos.add(JsonTranscodeUtil.transcode(hit.getSourceAsString(), AlertDealStatusDto.class));
                }
                return alertDealStatusDtos;
            }
    
            /**
         * 获取转换为实体后的列表和ID列表
         * 
         * @param records kafka记录
         * @return 实体数据和ids pair
         */
            private Pair<List<KafkaDealStatusSyncDto>, List<String>> getBeansAndIds(ConsumerRecords<String, String> records) {
                // 同步数据实体列表
                List<KafkaDealStatusSyncDto> dealStatusSyncDtos = new ArrayList<>();
                // 同步数据uuid列表
                List<String> uuids = new ArrayList<>();
    
                records.iterator().forEachRemaining(record -> {
                    // 将kafka中的json数据反序列化为KafkaDealStatusSyncDto
                    KafkaDealStatusSyncDto dealStatusSyncDto = convertToBean(record.value());
                    if (Objects.nonNull(dealStatusSyncDto)) {
                        dealStatusSyncDtos.add(dealStatusSyncDto);
                        uuids.add(dealStatusSyncDto.getUuId());
                    }
                });
    
                Collections.sort(dealStatusSyncDtos);
                // Map
                return Pair.of(dealStatusSyncDtos, uuids);
            }
    
       /**
         * 将kafka数据反序列化
         * 
         * @param value json串
         * @return KafkaDealStatusSyncDto实体
         */
            private KafkaDealStatusSyncDto convertToBean(String value) {
                try {
                    return JsonTranscodeUtil.transcode(value, KafkaDealStatusSyncDto.class);
                } catch (JsonProcessingException e) {
                    return null;
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249

    4. 数据同步任务分发接口 OpenDataSyncDispatchService

    /**
     * 数据同步任务分发服务
     *
     * DATA: 同步数据类型
     * DEVICE_CONFIG: 设备配置类型
     * RETURN: 返回值类型
     */
    public interface OpenDataSyncDispatchService<DATA, RETURN> {
    
        /**
         * 分发数据同步任务 -- 同步方式
         * 
         * @param syncDataWrapper 数据体
         * @return 分发结果
         */
        RETURN dispatch(SyncDataWrapper<DATA> syncDataWrapper);
    
        /**
         * 分发数据同步任务 -- 异步方式调用
         * 
         * @param syncDataWrapper 数据体
         * @return 分发结果
         */
        Future<RETURN> dispatchAsync(SyncDataWrapper<DATA> syncDataWrapper);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    /**
     * 同步数据实体包装
     */
    @Builder
    @Data
    public class SyncDataWrapper<DATA> {
    
        /**
         * 设备信息
         */
        private DeviceInfo deviceInfo;
    
        /**
         * 同步的数据类型
         */
        private SyncDataTypeEnum syncDataType;
    
        /**
         * 同步的具体内容
         */
        private DATA data;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    5. 数据同步分发器 OpenDataSyncDispatchServiceImpl

    /**
     * 数据同步分发器
     */
    @Service
    public class OpenDataSyncDispatchServiceImpl<DATA, RETURN> implements OpenDataSyncDispatchService<DATA, RETURN> {
    
        @Setter(onMethod_ = @Autowired)
        private DeviceConnectorFactory deviceConnectorFactory;
    
        @Setter(onMethod_ = @Autowired)
        private OpenDataSynchronizerFactory<DATA, RETURN> openDataSynchronizerFactory;
    
        @Setter(onMethod_ = @Autowired)
        private ThreadPoolTaskExecutor executor;
    
        @Override
        public RETURN dispatch(SyncDataWrapper<DATA> syncDataWrapper) {
            // 设备类型
            SourceDeviceEnum sourceDevice = syncDataWrapper.getDeviceInfo().getSourceDevice();
            // 同步的数据类型
            SyncDataTypeEnum syncDataType = syncDataWrapper.getSyncDataType();
            // 通过日志来源设备和同步数据类型获取一个同步器
            OpenDataSynchronizer<DATA, RETURN> synchronizer = openDataSynchronizerFactory.get(sourceDevice, syncDataType);
            // 通过日志来源设备获取一个设备连接器
            DeviceConnector connector = deviceConnectorFactory.get(sourceDevice);
            // 通过设备连接器打开一个连接
            DeviceConnection connection = connector.connect(syncDataWrapper.getDeviceInfo());
            // 同步数据
            return synchronizer.sync(connection, syncDataWrapper.getData());
        }
    
        @Override
        public Future<RETURN> dispatchAsync(SyncDataWrapper<DATA> syncDataWrapper) {
            return executor.submit(() -> dispatch(syncDataWrapper));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    Kotlin 协程 (7/7篇) - 在Android中的使用
    Python 画二分类的AUC
    intellij debug模式提示 : Method breakpoints may dramatically slow down debugging
    Flutter 实现背景图片毛玻璃效果
    C# 统计指定文件夹下的文件
    基于YOLOv8的目标跟踪——汽车跟踪和计数
    HBase学习笔记(一)
    格式工厂多个图片合并成一个PDF的报错
    最长字段和问题
    英语语法 - 定语从句
  • 原文地址:https://blog.csdn.net/qq_42764468/article/details/127355006