在开发过程中,遇到了一些比较实用的编码技巧,故记录以加深印象。因为每个技巧的篇幅较短,故不做拆分,只合在一篇小文章中阐述。以下会涉及kafka的事务提交方法、redis分布式锁简化以及多key情况下应该怎么加锁、业务日志如何解耦。
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- @Transactional(rollbackFor = Exception.class)
- public void saveServiceOrder(ServiceOrder serviceOrder){
- // do something
-
- NoticeListDTO notice = NoticeListDTO.builder().build();
- // 通知其它服务
- kafkaTemplate.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice));
- }
- @Component
- @Slf4j
- public class KafkaTemplateHelper {
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- /**
- * 事务提交后发送kafka消息
- * @param topic
- * @param data
- * @param <T>
- */
- public <T> void send(String topic, Object data) {
- // 是否开启事务判断
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
- @Override
- public void afterCommit() {
- log.info("事务提交成功后发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));
- kafkaTemplate.send(topic,data);
- }
- });
- } else {
- log.info("没有开启事务直接发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));
- kafkaTemplate.send(topic,data);
- }
- }
- }
-
- @Autowired
- private KafkaTemplateHelper kafkaTemplateHelper;
-
- @Transactional(rollbackFor = Exception.class)
- public void saveServiceOrder(ServiceOrder serviceOrder){
- // do something
-
- NoticeListDTO notice = NoticeListDTO.builder().build();
- // 通知a服务
- kafkaTemplateHelper.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice));
- }
- @ApiOperation("服务单更新")
- @Transactional(rollbackFor = Exception.class)
- public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){
- log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));
- String lockKey = "mh:scs:serviceOrderUpdate:"+req.getServiceOrderId();
- boolean lock = redissonDistributedLocker.tryLock(lockKey,0L,10L);
- AssertUtil.businessInvalid(!lock,"操作过于频繁,请稍后再试");
- try {
- // do something
- return ApiResult.success();
- }finally {
- redissonDistributedLocker.unlock(lockKey);
- }
- }
- /**
- * 为方法加锁,处理完成再释放
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target({ElementType.METHOD})
- public @interface AopLock {
-
- /**
- * SpEL表达式,用于计算lockKey.
- */
- String value();
-
- /**
- * 单位秒
- */
- int waitTime() default 0;
-
- /**
- * 单位秒
- */
- int leaseTime() default 6;
-
- int errorCode() default 2733;
-
- String errorMsg() default "操作过于频繁,请稍后再试";
-
- }
- /**
- * 为方法加锁,处理完成再释放
- *
- */
- @Slf4j
- @Aspect
- @Order(3)
- @ConditionalOnBean(RedissonClient.class)
- public class AopLockAspect {
- @Autowired
- private RedissonClient redissonClient;
- @Value("${spring.application.name}")
- private String lockKeyPrefix;
-
- @Around("@annotation(common.aop.annos.AopLock)")
- public Object lock(ProceedingJoinPoint joinPoint) throws Throwable {
- Object[] args = joinPoint.getArgs();
- MethodSignature signature = (MethodSignature) joinPoint.getSignature();
- Method method = signature.getMethod();
- EvaluationContext context = initEvaluationContext(joinPoint);
-
- AopLock aopLock = method.getAnnotation(AopLock.class);
- String spEl = aopLock.value();
- String expressionValue = lockKeyPrefix + ":" + PARSER.parseExpression(spEl).getValue(context);
- RLock lock = redissonClient.getLock(expressionValue);
- try {
- boolean getterLock = lock.tryLock(aopLock.waitTime(), aopLock.leaseTime(), TimeUnit.SECONDS);
- if (!getterLock) {
- throw new ServiceException(aopLock.errorCode(), aopLock.errorMsg());
- }
- return joinPoint.proceed(args);
- } finally {
- try {
- lock.unlock();
- } catch (Exception e) {
- log.warn("unlock error:" + e.getMessage() + "," + e.getClass().getName());
- }
- }
- }
- }
- @ApiOperation("服务单更新")
- @AopLock(value="'mh:scs:serviceOrderUpdate:' + #req.serviceOrderId",leaseTime = 60*30)
- @Transactional(rollbackFor = Exception.class)
- public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){
- log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));
-
- // do something
- return ApiResult.success();
- }
- @Data
- public class UpdateServiceOrdersReq implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Valid
- private List<ServiceOrder> serviceOrderList;
- }
- @ApiOperation("批量更新服务单信息")
- @PostMapping("/xxxx/updateServiceOrders")
- public ResponseBean updateServiceOrders(@RequestBody @Validated UpdateServiceOrdersReq req) {
- List<String> redisKeys = new ArrayList<>();
- List<ServiceOrder> list = new ArrayList<>();
- for (ServiceOrder serviceOrder : list) {
- redisKeys.add("mh:scs:updateServiceOrders:" + serviceOrder.getServiceOrderId());
- }
- try {
- for (String redisKey : redisKeys) {
- boolean lock = redissonDistributedLocker.tryLock(redisKey, 5L, 30L);
- if(!lock){
- AssertUtil.businessInvalid("批量更新服务单获取锁失败,请稍后尝试!");
- }
- }
- ResponseBean responseBean = ResponseBean.success();
- // do something
- return responseBean;
- } catch (Exception ex){
- throw ex;
- } finally {
- redisKeys.forEach(redisKey->{
- try {
- redissonDistributedLocker.unlock(redisKey);
- } catch (Exception e) {
- log.error("updateServiceOrders:释放redis锁失败:{}", redisKey, e);
- }
- });
- }
- }
- /**
- * @author ppz
- * @date 2022年04月12日 17:00
- * @Description 服务单日志操作工具类
- */
- public class ServiceOrderLogUtils {
- private static ScsServiceOrderLogService logService = ApplicationContextUtils.getBean(ScsServiceOrderLogService.class);
- private static int corePoolSize = Runtime.getRuntime().availableProcessors();
- private static ExecutorService executor = new ThreadPoolExecutor(
- corePoolSize,
- corePoolSize*2,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(Integer.MAX_VALUE),
- new ThreadPoolExecutor.CallerRunsPolicy());
-
- private ServiceOrderLogUtils() {
- throw new IllegalStateException("Utility class");
- }
-
- /**
- * 日志公共线程池中执行线程
- * @param runnable 可运行对象
- */
- public static void execute(Runnable runnable) {
- executor.execute(runnable);
- }
-
- /**
- * 保存订单操作日志
- * @param serviceOrderId 订单编号
- * @param operType 日志操作类型
- * @param operContent 操作内容
- * @return: void
- */
- public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent){
- saveLog(createLog(serviceOrderId, operType, operContent));
- }
-
- /**
- * 保存订单操作日志
- * @param serviceOrderId 订单编号
- * @param operType 日志操作类型
- * @param operContent 操作内容
- * @param operUserId 操作人登录名
- * @param operUserName 操作人名称
- * @return: void
- */
- public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){
- saveLog(createLog(serviceOrderId, operType, operContent, operUserId, operUserName));
- }
-
- public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent) {
- AuthUser userInfo = WebUtils.getCurrentUser();
- return createLog(serviceOrderId, operType, operContent, StringUtil.toInt(userInfo.getLoginName()), userInfo.getName());
- }
-
- /**
- * 封装订单日志实体
- * @param serviceOrderId
- * @param operType
- * @param operContent
- * @param operUserId
- * @param operUserName
- * @return: ScsServiceOrderLog
- */
- public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){
- ScsServiceOrderLog log = new ScsServiceOrderLog();
- log.setServiceOrderId(serviceOrderId);
- log.setOperContent(operContent);
- log.setOperType(operType.getCode());
- log.setOperatorId(operUserId);
- log.setOperatorName(operUserName);
- return log;
- }
-
- /**
- * 保存订单操作日志
- * @param log 日志对象
- * @return: void
- */
- public static void saveLog(ScsServiceOrderLog log){
- List<ScsServiceOrderLog> list = Lists.newArrayList();
- list.add(log);
- saveLog(list);
- }
-
- /**
- * 批量保存订单操作日志
- * @param list
- * @return: void
- */
- public static void saveLog(List<ScsServiceOrderLog> list){
- if(CollectionUtils.isEmpty(list)) {
- return;
- }
- Date now = new Date();
- for(ScsServiceOrderLog log : list) {
- if(log.getOperatorTime() == null) {
- log.setOperatorTime(now);
- }
- if(StrUtil.length(log.getOperContent()) > 512) {
- log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));
- }
- }
- if(!list.isEmpty()) {
- execute(new SaveLogThread(list));
- }
- }
-
- /**
- * 订单日志保存线程
- * @author: xiecy
- * @date: 2019年4月29日 下午12:03:35
- */
- static class SaveLogThread implements Runnable {
- private List<ScsServiceOrderLog> list = null;
-
- public SaveLogThread(List<ScsServiceOrderLog> list) {
- super();
- this.list = list;
- }
-
- @Override
- public void run() {
- if(list != null && !list.isEmpty()) {
- logService.batchInsert(list);
- }
- }
- }
-
- /**
- * 同步批量保存日志
- * @param list
- * @return: void
- */
- public static void saveLogSync(List<ScsServiceOrderLog> list){
- if(list.isEmpty()) {
- return;
- }
- Date now = new Date();
- AuthUser userInfo = WebUtils.getCurrentUser();
- for(ScsServiceOrderLog log : list) {
- if(log.getOperatorTime() == null) {
- log.setOperatorTime(now);
- }
- if(log.getOperatorId() == null && userInfo!=null) {
- log.setOperatorId(StringUtil.toInt(userInfo.getLoginName()));
- log.setOperatorName(userInfo.getName());
- }
- if(StrUtil.length(log.getOperContent()) > 512) {
- log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));
- }
- }
- if(list != null && !list.isEmpty()) {
- logService.batchInsert(list);
- }
- }
-
- }
- @Transactional(rollbackFor = Exception.class)
- public boolean updateShippingDemandStatus(UpdateShippingDemandStatusReq req) {
- // todo something
- ServiceOrderLogUtils.saveLog(serviceOrderId, OperTypeEnum.CANCEL_SHIPPING_DEMAND,"用户取消运输需求");
- }
看到看到这了,不来个赞吗~