相关文章:
Java设计模式实战 - 02 工厂类获取设备连接器 DeviceConnectorFactory
Java设计模式实战 - 03 工厂类获取数据同步器 OpenDataSynchronizerFactory
Kafka实战 - 01 自定义 SpringBoot Starter 实现 Kafka 的自动配置
项目背景:
SIR平台上,从XDR平台接入的安全告警数据,为了保证两个平台安全告警数据的处置状态一致,SIR平台需要将已经处置闭环的安全告警数据的处置状态同步给XDR平台,从而更新XDR平台上安全告警数据的处置状态。
引入 ngsoc-common-kafka 这个自定义starter:
<dependency>
<groupId>com.hhgroupId>
<artifactId>ngsoc-common-kafkaartifactId>
<version>3.0.1version>
dependency>
ngsoc:
kafka:
clusters:
- name: ngsoc
#{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}
bootstrap-servers: #{{range $data.route}}
- '127.0.0.1:9092' #{{end}}
consumers:
- name: incidentDealStatus
group-id: incident_deal_status_group
topics:
- INCIDENT_DEAL_STATUS_SYNC
enable-auto-commit: false
max-poll-records: 1000
- name: alertDealStatus
group-id: alert_deal_status_group
topics:
- ALERT_DEAL_STATUS_SYNC
enable-auto-commit: false
max-poll-records: 1000
/**
* 安全告警处置状态同步通用定时任务
*/
@Log4j2
@Component
public class AlertDealStatusSyncTask implements InitializingBean {
@Setter(onMethod_ = { @Autowired })
private RestHighLevelClient restHighLevelClient;
/**
* 数据同步器
*/
@Setter(onMethod_ = @Autowired)
private OpenDataSyncDispatchService<List<KafkaDealStatusSyncDto>, ApiResponse<?>> openDataSyncDispatchService;
/**
* kafka消费者
*/
@Setter(onMethod_ = @Autowired)
private KafkaConsumer<String, String> alertDealStatusKafkaConsumer;
/**
* 支持的设备
*/
private final static SourceDeviceEnum[] SUPPORT_DEVICES = { SourceDeviceEnum.XDR };
/**
* 获取SourceDeviceEnum枚举类中设备类型的名称集合
*/
private final static List<String> SRC_DEVICES = Arrays.stream(SUPPORT_DEVICES).map(Enum::name).collect(Collectors.toList());
/**
* ElasticSearch存放安全事件的索引
*/
private final static String INDEX = "futurex-incident-index";
/**
* 设备ip名称
*/
private final static String FIELD_DEV_IP_NAME = "devIpName";
/**
* 安全告警uuid
*/
private final static String FIELD_UUID = "uuId";
/**
* 原子类
*/
private AtomicBoolean isRunning;
/**
* Spring容器启动时执行该方法
*/
@Override
public void afterPropertiesSet() throws Exception {
isRunning = new AtomicBoolean(false);
}
/**
* Spring bean销毁前执行该方法
*/
@PreDestroy
public void destroy() {
isRunning.set(false);
}
/**
* 定时任务每隔10s拉取kafka中的消息向XDR平台同步安全告警的处置状态
*/
@Scheduled(fixedDelayString = "PT10S")
public void doTask() {
if (!isRunning.get()) {
try {
isRunning.set(true);
StopWatch watch = new StopWatch();
watch.start();
// 配置文件已经限制了每次拉取1000条
ConsumerRecords<String, String> records = alertDealStatusKafkaConsumer.poll(Duration.ofMillis(1000));
if (records.count() > 0) {
log.info("[安全告警状态同步]alert dealStatus task pull {} records", records.count());
// 失败重试三次
int retryTimes = 3;
for (int i = 0; i < retryTimes; i++) {
try {
// 同步数据
syncData(records);
break;
} catch (Exception e) {
log.error("[安全告警状态同步]sync alert failed, retry {} times", i + 1, e);
TimeUnit.SECONDS.sleep(2);
}
}
// 在consumer端消费消息操作完成以后再提交offset
alertDealStatusKafkaConsumer.commitSync();
watch.stop();
log.info("[安全告警状态同步]sync alert dealStatus task finished, total millis {}", watch.getTotalTimeMillis());
}
} catch (Exception e) {
// do something
log.error("[安全告警状态同步]sync alert failed, with errors", e);
} finally {
isRunning.set(false);
}
}
}
/**
* 数据同步
* @param records kafka数据
*/
private void syncData(ConsumerRecords<String, String> records) throws IOException {
if (!records.isEmpty()) {
// 将kafka中的json串转为 Pair, List>
Pair<List<KafkaDealStatusSyncDto>, List<String>> pair = getBeansAndIds(records);
// 获取同步实体列表 List
List<KafkaDealStatusSyncDto> dealStatusSyncDtos = pair.getFirst();
// 获取数据uuids列表 List uuids
List<String> uuids = pair.getSecond();
// 根据uuids查ElasticSearch数据库获取日志来源信息
List<AlertDealStatusDto> datas = query(uuids);
if (!CollectionUtils.isEmpty(datas)) {
// 生成UUID和数据库返回信息的映射关系
Map<String, AlertDealStatusDto> dataMap = datas.stream().collect(Collectors.toMap(AlertDealStatusDto::getUuId, o -> o));
// 数据按设备分类,且保证顺序
Map<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> syncDataWithDeviceMap = new HashMap<>(SRC_DEVICES.size());
// 按顺序处理Kafka数据
dealStatusSyncDtos.forEach((kafkaDealStatusSyncDto) -> {
AlertDealStatusDto alertDealStatusDto = dataMap.get(kafkaDealStatusSyncDto.getUuId());
if (Objects.nonNull(alertDealStatusDto)) {
// 告警的日志来源是个数组,需要分别处理
List<String> devIpNames = alertDealStatusDto.getDevIpName();
if (!CollectionUtils.isEmpty(devIpNames)) {
for (String devIpName : devIpNames) {
if (SRC_DEVICES.contains(devIpName)) {
SourceDeviceEnum sourceDevice = SourceDeviceEnum.build(devIpName);
// 将ID替换成第三方同步的UUID
kafkaDealStatusSyncDto.setUuId(alertDealStatusDto.getOriginalUuid());
List<KafkaDealStatusSyncDto> dataList = syncDataWithDeviceMap.get(sourceDevice);
if (CollectionUtils.isEmpty(dataList)) {
dataList = new ArrayList<>();
dataList.add(kafkaDealStatusSyncDto);
syncDataWithDeviceMap.put(sourceDevice, dataList);
} else {
dataList.add(kafkaDealStatusSyncDto);
}
}
}
}
}
});
// 根据不同的设备类型分发同步数据到不同的同步器
dispatchData(syncDataWithDeviceMap);
}
}
}
/**
* 根据不同的设备类型同步处置状态
*
* @param syncDataWithDeviceMap 设备和同步数据映射
*/
private void dispatchData(Map<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> syncDataWithDeviceMap) {
Set<Map.Entry<SourceDeviceEnum, List<KafkaDealStatusSyncDto>>> entrySet = syncDataWithDeviceMap.entrySet();
for (Map.Entry<SourceDeviceEnum, List<KafkaDealStatusSyncDto>> entry : entrySet) {
SyncDataWrapper<List<KafkaDealStatusSyncDto>> syncDataWrapper = SyncDataWrapper
.<List<KafkaDealStatusSyncDto>>builder()
.deviceInfo(DeviceInfo.builder().sourceDevice(entry.getKey()).build())
.syncDataType(SyncDataTypeEnum.ALERT_DEAL_STATUS)
.data(entry.getValue())
.build();
ApiResponse<?> response = openDataSyncDispatchService.dispatch(syncDataWrapper);
if (Objects.isNull(response) || response.getCode() != ApiResponse.CODE_OK) {
throw new DataSyncException("sync incident dealStatus failed");
}
}
}
/**
* 根据UUID查询数据
*
* @param uuids ElasticSearch中查询参数
* @return List 数据列表
*/
@NotNull
private List<AlertDealStatusDto> query(List<String> uuids) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.must(QueryBuilders.termsQuery(FIELD_UUID, uuids));
boolQueryBuilder.must(QueryBuilders.termsQuery(FIELD_DEV_IP_NAME, SRC_DEVICES));
searchSourceBuilder.query(boolQueryBuilder);
// 设置一次查询数据的大小为10000
searchSourceBuilder.size(10000);
SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.source(searchSourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
List<AlertDealStatusDto> alertDealStatusDtos = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
for (SearchHit hit : response.getHits()) {
alertDealStatusDtos.add(JsonTranscodeUtil.transcode(hit.getSourceAsString(), AlertDealStatusDto.class));
}
return alertDealStatusDtos;
}
/**
* 获取转换为实体后的列表和ID列表
*
* @param records kafka记录
* @return 实体数据和ids pair
*/
private Pair<List<KafkaDealStatusSyncDto>, List<String>> getBeansAndIds(ConsumerRecords<String, String> records) {
// 同步数据实体列表
List<KafkaDealStatusSyncDto> dealStatusSyncDtos = new ArrayList<>();
// 同步数据uuid列表
List<String> uuids = new ArrayList<>();
records.iterator().forEachRemaining(record -> {
// 将kafka中的json数据反序列化为KafkaDealStatusSyncDto
KafkaDealStatusSyncDto dealStatusSyncDto = convertToBean(record.value());
if (Objects.nonNull(dealStatusSyncDto)) {
dealStatusSyncDtos.add(dealStatusSyncDto);
uuids.add(dealStatusSyncDto.getUuId());
}
});
Collections.sort(dealStatusSyncDtos);
// Map
return Pair.of(dealStatusSyncDtos, uuids);
}
/**
* 将kafka数据反序列化
*
* @param value json串
* @return KafkaDealStatusSyncDto实体
*/
private KafkaDealStatusSyncDto convertToBean(String value) {
try {
return JsonTranscodeUtil.transcode(value, KafkaDealStatusSyncDto.class);
} catch (JsonProcessingException e) {
return null;
}
}
}
}
/**
* 数据同步任务分发服务
*
* DATA: 同步数据类型
* DEVICE_CONFIG: 设备配置类型
* RETURN: 返回值类型
*/
public interface OpenDataSyncDispatchService<DATA, RETURN> {
/**
* 分发数据同步任务 -- 同步方式
*
* @param syncDataWrapper 数据体
* @return 分发结果
*/
RETURN dispatch(SyncDataWrapper<DATA> syncDataWrapper);
/**
* 分发数据同步任务 -- 异步方式调用
*
* @param syncDataWrapper 数据体
* @return 分发结果
*/
Future<RETURN> dispatchAsync(SyncDataWrapper<DATA> syncDataWrapper);
}
/**
* 同步数据实体包装
*/
@Builder
@Data
public class SyncDataWrapper<DATA> {
/**
* 设备信息
*/
private DeviceInfo deviceInfo;
/**
* 同步的数据类型
*/
private SyncDataTypeEnum syncDataType;
/**
* 同步的具体内容
*/
private DATA data;
}
/**
* 数据同步分发器
*/
@Service
public class OpenDataSyncDispatchServiceImpl<DATA, RETURN> implements OpenDataSyncDispatchService<DATA, RETURN> {
@Setter(onMethod_ = @Autowired)
private DeviceConnectorFactory deviceConnectorFactory;
@Setter(onMethod_ = @Autowired)
private OpenDataSynchronizerFactory<DATA, RETURN> openDataSynchronizerFactory;
@Setter(onMethod_ = @Autowired)
private ThreadPoolTaskExecutor executor;
@Override
public RETURN dispatch(SyncDataWrapper<DATA> syncDataWrapper) {
// 设备类型
SourceDeviceEnum sourceDevice = syncDataWrapper.getDeviceInfo().getSourceDevice();
// 同步的数据类型
SyncDataTypeEnum syncDataType = syncDataWrapper.getSyncDataType();
// 通过日志来源设备和同步数据类型获取一个同步器
OpenDataSynchronizer<DATA, RETURN> synchronizer = openDataSynchronizerFactory.get(sourceDevice, syncDataType);
// 通过日志来源设备获取一个设备连接器
DeviceConnector connector = deviceConnectorFactory.get(sourceDevice);
// 通过设备连接器打开一个连接
DeviceConnection connection = connector.connect(syncDataWrapper.getDeviceInfo());
// 同步数据
return synchronizer.sync(connection, syncDataWrapper.getData());
}
@Override
public Future<RETURN> dispatchAsync(SyncDataWrapper<DATA> syncDataWrapper) {
return executor.submit(() -> dispatch(syncDataWrapper));
}
}