• 分布式锁&kafka事务提交等编码技巧


    一、前言

    在开发过程中,遇到了一些比较实用的编码技巧,故记录以加深印象。因为每个技巧的篇幅较短,故不做拆分,只合在一篇小文章中阐述。以下会涉及kafka的事务提交方法、redis分布式锁简化以及多key情况下应该怎么加锁、业务日志如何解耦。

    二、kafka的事务提交方法

    • kafka我们常用于削峰填谷,以及系统间解耦等场景。这里我们会常遇到一种情况,就是上游系统在处理完成业务后,需要通知其它的系统,假如我们不考虑事务提交失败的情况下,就可以像下面这样写。但是假如出现网络异常或者数据库异常等情况,就会出现事务提交失败从而回滚,但是消息却已经发生给其它服务了,那么就会导致整条调用链的异常
    1. @Autowired
    2. private KafkaTemplate kafkaTemplate;
    3. @Transactional(rollbackFor = Exception.class)
    4. public void saveServiceOrder(ServiceOrder serviceOrder){
    5. // do something
    6. NoticeListDTO notice = NoticeListDTO.builder().build();
    7. // 通知其它服务
    8. kafkaTemplate.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice));
    9. }
    • 所以,我们可以进行进一步优化,就是将消息通知后置到事务提交后,这样系统的可靠度就会更高。我们增加一个kafka帮助类,如下:
    1. @Component
    2. @Slf4j
    3. public class KafkaTemplateHelper {
    4. @Autowired
    5. private KafkaTemplate kafkaTemplate;
    6. /**
    7. * 事务提交后发送kafka消息
    8. * @param topic
    9. * @param data
    10. * @param <T>
    11. */
    12. public <T> void send(String topic, Object data) {
    13. // 是否开启事务判断
    14. if (TransactionSynchronizationManager.isSynchronizationActive()) {
    15. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    16. @Override
    17. public void afterCommit() {
    18. log.info("事务提交成功后发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));
    19. kafkaTemplate.send(topic,data);
    20. }
    21. });
    22. } else {
    23. log.info("没有开启事务直接发送消息:topic:{},data:{}",topic, JSONObject.toJSONString(data));
    24. kafkaTemplate.send(topic,data);
    25. }
    26. }
    27. }
    • kafka调用如下,它就会保证在事务结束后再通知其它系统,同理,很多需要后置的操作也可以这么玩。其实kafka还有一套可靠性应用方案可以分享,待有空再写
    1. @Autowired
    2. private KafkaTemplateHelper kafkaTemplateHelper;
    3. @Transactional(rollbackFor = Exception.class)
    4. public void saveServiceOrder(ServiceOrder serviceOrder){
    5. // do something
    6. NoticeListDTO notice = NoticeListDTO.builder().build();
    7. // 通知a服务
    8. kafkaTemplateHelper.send(TopicNameConstants.SERVICE_ORDER_CHANGE_NOTIFY, JSONObject.toJSONString(notice));
    9. }

    三、redis分布式锁代码简化

    • 我们使用redis分布式锁就离不开redission组件,举个栗子,我们一般在服务集群的情况下,为了保证并发不出现问题,会如下加锁,用一段字符串加上入参中的唯一编号(如用户id、订单编号等等)来保证接口幂等性(PS:redissonDistributedLocker只是redission的简单封装)。这样写很好,没有问题,但是我们不禁会想,好像每个创建、更新等业务操作都得给接口加这些重复代码,那么有没有更加优雅的方式呢,没错,我们要追求的就是极致的优雅
    1. @ApiOperation("服务单更新")
    2. @Transactional(rollbackFor = Exception.class)
    3. public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){
    4. log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));
    5. String lockKey = "mh:scs:serviceOrderUpdate:"+req.getServiceOrderId();
    6. boolean lock = redissonDistributedLocker.tryLock(lockKey,0L,10L);
    7. AssertUtil.businessInvalid(!lock,"操作过于频繁,请稍后再试");
    8. try {
    9. // do something
    10. return ApiResult.success();
    11. }finally {
    12. redissonDistributedLocker.unlock(lockKey);
    13. }
    14. }
    • 使用Aop为接口加锁,添加一个注解anno,并写个实现
    1. /**
    2. * 为方法加锁,处理完成再释放
    3. */
    4. @Retention(RetentionPolicy.RUNTIME)
    5. @Target({ElementType.METHOD})
    6. public @interface AopLock {
    7. /**
    8. * SpEL表达式,用于计算lockKey.
    9. */
    10. String value();
    11. /**
    12. * 单位秒
    13. */
    14. int waitTime() default 0;
    15. /**
    16. * 单位秒
    17. */
    18. int leaseTime() default 6;
    19. int errorCode() default 2733;
    20. String errorMsg() default "操作过于频繁,请稍后再试";
    21. }
    1. /**
    2. * 为方法加锁,处理完成再释放
    3. *
    4. */
    5. @Slf4j
    6. @Aspect
    7. @Order(3)
    8. @ConditionalOnBean(RedissonClient.class)
    9. public class AopLockAspect {
    10. @Autowired
    11. private RedissonClient redissonClient;
    12. @Value("${spring.application.name}")
    13. private String lockKeyPrefix;
    14. @Around("@annotation(common.aop.annos.AopLock)")
    15. public Object lock(ProceedingJoinPoint joinPoint) throws Throwable {
    16. Object[] args = joinPoint.getArgs();
    17. MethodSignature signature = (MethodSignature) joinPoint.getSignature();
    18. Method method = signature.getMethod();
    19. EvaluationContext context = initEvaluationContext(joinPoint);
    20. AopLock aopLock = method.getAnnotation(AopLock.class);
    21. String spEl = aopLock.value();
    22. String expressionValue = lockKeyPrefix + ":" + PARSER.parseExpression(spEl).getValue(context);
    23. RLock lock = redissonClient.getLock(expressionValue);
    24. try {
    25. boolean getterLock = lock.tryLock(aopLock.waitTime(), aopLock.leaseTime(), TimeUnit.SECONDS);
    26. if (!getterLock) {
    27. throw new ServiceException(aopLock.errorCode(), aopLock.errorMsg());
    28. }
    29. return joinPoint.proceed(args);
    30. } finally {
    31. try {
    32. lock.unlock();
    33. } catch (Exception e) {
    34. log.warn("unlock error:" + e.getMessage() + "," + e.getClass().getName());
    35. }
    36. }
    37. }
    38. }
    • 那么我们的加锁就可以简单的加个@AopLock注解就可以了,是不是很棒呢
    1. @ApiOperation("服务单更新")
    2. @AopLock(value="'mh:scs:serviceOrderUpdate:' + #req.serviceOrderId",leaseTime = 60*30)
    3. @Transactional(rollbackFor = Exception.class)
    4. public ApiResult serviceOrderUpdate(@RequestBody @Validated ServiceOrder req){
    5. log.info("服务单更新:time={},params={}", System.currentTimeMillis(), JSONObject.toJSONString(req));
    6. // do something
    7. return ApiResult.success();
    8. }

    四、redission在多key情况下应该怎么加锁

    • 上面的例子很好地将简单的分布式锁代码简化,但是我们会有一些场景是无法这样加锁的,比如一些批处理的场景,用户A批量操作了单据a、b、c,同一时间,用户B批量操作了单据b、c、d,这时bc单据就会有并发问题,在这种场景下,我们是不能简单地根据某个单据的订单编号进行加锁的,要思考换一种方式,如下:
    • 订单实体类
    1. @Data
    2. public class UpdateServiceOrdersReq implements Serializable {
    3. private static final long serialVersionUID = 1L;
    4. @Valid
    5. private List<ServiceOrder> serviceOrderList;
    6. }
    • 接口实现,对每个订单的id都加锁,假如有其中一个订单的锁获取失败的话则返回重试信息,在更新操作结束后释放所有的锁
    1. @ApiOperation("批量更新服务单信息")
    2. @PostMapping("/xxxx/updateServiceOrders")
    3. public ResponseBean updateServiceOrders(@RequestBody @Validated UpdateServiceOrdersReq req) {
    4. List<String> redisKeys = new ArrayList<>();
    5. List<ServiceOrder> list = new ArrayList<>();
    6. for (ServiceOrder serviceOrder : list) {
    7. redisKeys.add("mh:scs:updateServiceOrders:" + serviceOrder.getServiceOrderId());
    8. }
    9. try {
    10. for (String redisKey : redisKeys) {
    11. boolean lock = redissonDistributedLocker.tryLock(redisKey, 5L, 30L);
    12. if(!lock){
    13. AssertUtil.businessInvalid("批量更新服务单获取锁失败,请稍后尝试!");
    14. }
    15. }
    16. ResponseBean responseBean = ResponseBean.success();
    17. // do something
    18. return responseBean;
    19. } catch (Exception ex){
    20. throw ex;
    21. } finally {
    22. redisKeys.forEach(redisKey->{
    23. try {
    24. redissonDistributedLocker.unlock(redisKey);
    25. } catch (Exception e) {
    26. log.error("updateServiceOrders:释放redis锁失败:{}", redisKey, e);
    27. }
    28. });
    29. }
    30. }

    五、业务日志如何解耦

    • 在写业务系统的过程中,我们难免要进行一些业务日志操作记录,这里就会涉及业务日志字符串的数据组装,比如产品要求记录车辆出发时间、更新日期时间巴拉巴拉之类的,但同时会存在一个问题,因为业务日志记录是非主业务流程操作(类似消息通知之类的),故不可因为复杂的日志数据拼接去影响接口的响应速度,从而影响用户体验;这里就要思考如何解耦的问题。我思考了两种场景下的处理方案,可以分享出来给大家分享
    • 情况一,假如只是简单的进行文字记录,我们可以使用线程池的方式去对日志记录进行解耦
      • 使用线程池创建其它线程进行日志操作,这样就不会影响到主线程了
    1. /**
    2. * @author ppz
    3. * @date 2022年04月12日 17:00
    4. * @Description 服务单日志操作工具类
    5. */
    6. public class ServiceOrderLogUtils {
    7. private static ScsServiceOrderLogService logService = ApplicationContextUtils.getBean(ScsServiceOrderLogService.class);
    8. private static int corePoolSize = Runtime.getRuntime().availableProcessors();
    9. private static ExecutorService executor = new ThreadPoolExecutor(
    10. corePoolSize,
    11. corePoolSize*2,
    12. 10L,
    13. TimeUnit.SECONDS,
    14. new LinkedBlockingQueue<>(Integer.MAX_VALUE),
    15. new ThreadPoolExecutor.CallerRunsPolicy());
    16. private ServiceOrderLogUtils() {
    17. throw new IllegalStateException("Utility class");
    18. }
    19. /**
    20. * 日志公共线程池中执行线程
    21. * @param runnable 可运行对象
    22. */
    23. public static void execute(Runnable runnable) {
    24. executor.execute(runnable);
    25. }
    26. /**
    27. * 保存订单操作日志
    28. * @param serviceOrderId 订单编号
    29. * @param operType 日志操作类型
    30. * @param operContent 操作内容
    31. * @return: void
    32. */
    33. public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent){
    34. saveLog(createLog(serviceOrderId, operType, operContent));
    35. }
    36. /**
    37. * 保存订单操作日志
    38. * @param serviceOrderId 订单编号
    39. * @param operType 日志操作类型
    40. * @param operContent 操作内容
    41. * @param operUserId 操作人登录名
    42. * @param operUserName 操作人名称
    43. * @return: void
    44. */
    45. public static void saveLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){
    46. saveLog(createLog(serviceOrderId, operType, operContent, operUserId, operUserName));
    47. }
    48. public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent) {
    49. AuthUser userInfo = WebUtils.getCurrentUser();
    50. return createLog(serviceOrderId, operType, operContent, StringUtil.toInt(userInfo.getLoginName()), userInfo.getName());
    51. }
    52. /**
    53. * 封装订单日志实体
    54. * @param serviceOrderId
    55. * @param operType
    56. * @param operContent
    57. * @param operUserId
    58. * @param operUserName
    59. * @return: ScsServiceOrderLog
    60. */
    61. public static ScsServiceOrderLog createLog(Long serviceOrderId, OperTypeEnum operType, String operContent, Integer operUserId, String operUserName){
    62. ScsServiceOrderLog log = new ScsServiceOrderLog();
    63. log.setServiceOrderId(serviceOrderId);
    64. log.setOperContent(operContent);
    65. log.setOperType(operType.getCode());
    66. log.setOperatorId(operUserId);
    67. log.setOperatorName(operUserName);
    68. return log;
    69. }
    70. /**
    71. * 保存订单操作日志
    72. * @param log 日志对象
    73. * @return: void
    74. */
    75. public static void saveLog(ScsServiceOrderLog log){
    76. List<ScsServiceOrderLog> list = Lists.newArrayList();
    77. list.add(log);
    78. saveLog(list);
    79. }
    80. /**
    81. * 批量保存订单操作日志
    82. * @param list
    83. * @return: void
    84. */
    85. public static void saveLog(List<ScsServiceOrderLog> list){
    86. if(CollectionUtils.isEmpty(list)) {
    87. return;
    88. }
    89. Date now = new Date();
    90. for(ScsServiceOrderLog log : list) {
    91. if(log.getOperatorTime() == null) {
    92. log.setOperatorTime(now);
    93. }
    94. if(StrUtil.length(log.getOperContent()) > 512) {
    95. log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));
    96. }
    97. }
    98. if(!list.isEmpty()) {
    99. execute(new SaveLogThread(list));
    100. }
    101. }
    102. /**
    103. * 订单日志保存线程
    104. * @author: xiecy
    105. * @date: 2019年4月29日 下午12:03:35
    106. */
    107. static class SaveLogThread implements Runnable {
    108. private List<ScsServiceOrderLog> list = null;
    109. public SaveLogThread(List<ScsServiceOrderLog> list) {
    110. super();
    111. this.list = list;
    112. }
    113. @Override
    114. public void run() {
    115. if(list != null && !list.isEmpty()) {
    116. logService.batchInsert(list);
    117. }
    118. }
    119. }
    120. /**
    121. * 同步批量保存日志
    122. * @param list
    123. * @return: void
    124. */
    125. public static void saveLogSync(List<ScsServiceOrderLog> list){
    126. if(list.isEmpty()) {
    127. return;
    128. }
    129. Date now = new Date();
    130. AuthUser userInfo = WebUtils.getCurrentUser();
    131. for(ScsServiceOrderLog log : list) {
    132. if(log.getOperatorTime() == null) {
    133. log.setOperatorTime(now);
    134. }
    135. if(log.getOperatorId() == null && userInfo!=null) {
    136. log.setOperatorId(StringUtil.toInt(userInfo.getLoginName()));
    137. log.setOperatorName(userInfo.getName());
    138. }
    139. if(StrUtil.length(log.getOperContent()) > 512) {
    140. log.setOperContent(StrUtil.subWithLength(log.getOperContent(), 0, 512));
    141. }
    142. }
    143. if(list != null && !list.isEmpty()) {
    144. logService.batchInsert(list);
    145. }
    146. }
    147. }
    • 业务代码中进行使用
    1. @Transactional(rollbackFor = Exception.class)
    2. public boolean updateShippingDemandStatus(UpdateShippingDemandStatusReq req) {
    3. // todo something
    4. ServiceOrderLogUtils.saveLog(serviceOrderId, OperTypeEnum.CANCEL_SHIPPING_DEMAND,"用户取消运输需求");
    5. }
    • 情况二:假如日志记录需要对数据进行复杂组件的话,可以把使用到的数据组装到一个实体,然后通过发送给kafka或者redis进行解耦,在另外的线程中进行数据组装,具体就不展示了

    最后的最后

    看到看到这了,不来个赞吗~

  • 相关阅读:
    macOS 13 Ventura后,打开软件显示“XXapp已损坏,无法打开”如何解决?
    人工智能之地形导航系统
    【建议背诵】软考高项考试案例简答题汇总~(2)
    serveless 思想 Midway.js 框架使用教程(一)
    数据库页已标记为 RestorePending,可能表明磁盘已损坏。要从此状态恢复,请执行还原操作。
    Amazon图片下载器:利用Scrapy库完成图像下载任务
    Redis
    倒计时c#/unity
    SSH 远程管理软件 SecureCRT 下载安装教程
    如何在 Windows 10 中安装 VMware Workstation Player 17
  • 原文地址:https://blog.csdn.net/Firstlucky77/article/details/125447532