• Kafka+redis分布式锁结合使用心得总结


    #kafka部分

    @KafkaListener(topics = "#{'${vsmart_alert_detection_tms_send_message_topic}'.split(',')}", groupId = "${vsmart.alert.detection.consumer.group}")
    public void vsmartAlertDetectionTmsSendMessage(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        doSendMessage(record,ack);
    }

    private void doSendMessage(ConsumerRecord record, Acknowledgment ack) {
        Optional message = Optional.ofNullable(record.value());
        String key = record.topic() + "-" + record.partition() + "-offset:" + record.offset();
        if (RedisUtils.isExistsKey(key)) {
            ack.acknowledge();
            return;
        }

        try {
            if (message.isPresent() && (record.timestamp() > (System.currentTimeMillis() - kafkaConsumerDelayTime))) {
                JSONObject msg = JSONObject.parseObject(record.value().toString());
                msg.put(VSMART_KAFKA_MSG_POSITION_INFO, key);

                //具体操作
            }
        }catch (Exception e){
            
        }finally {
            ack.acknowledge();
        }
    }    

    #redis部分 

    public Boolean handler(JSONObject msg) {
        //解析
        Boolean isOk = jsonToDetectionInfos(msg);

        if (!isOk) {
            return false;
        }

        //加锁 associatedKey()
        
        String lockKey = associatedKey();
        if (StrUtil.isEmpty(lockKey)) {
            return false;
        }
        RLock lock = SpringUtils.getBean(RedissonClient.class).getLock(lockKey);
        //锁的时间 根据业务需要进行调整
        try {
            boolean flag_2 = lock.tryLock(10, 300, TimeUnit.SECONDS);

            if (flag_2) {        
                //加锁后执行前判断是否已经处理过kafka中相同位置的信息了
                if (ObjectUtil.isNotNull(msg) &&
                        ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO)) &&
                        RedisUtils.isExistsKey(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO))) {            
                    return false;
                }

                //具体业务操作
                //...
                    
                return true;
            } else {
                detectionRuleBo.getLogText().append(StrUtil.format("{}-获取锁失败;", detectionRuleBo.getName())).append("
    ");
                return false;
            }

        } catch (Exception e) {
            
        } finally {
            ///释放锁
            if (null != lock && lock.isHeldByCurrentThread()) {        
                if (ObjectUtil.isNotNull(msg) &&
                        ObjectUtil.isNotNull(msg.get(VSMART_KAFKA_MSG_POSITION_INFO))) {
                    RedisUtils.setCacheStrExpire(msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), msg.getString(VSMART_KAFKA_MSG_POSITION_INFO), 60 * 60);
                }
                //解锁
                lock.unlock();
            }
            return true;
        }
    }

  • 相关阅读:
    HC小区管理系统房产业主商铺业主账户操作说明
    Android中的屏幕刷新机制(动画视频形象说明机制)
    libopenssl 实现私钥加密公钥解密
    灵性图书馆:好书推荐-《当下的力量》
    kubernetes之资源限制及QOS服务质量
    Ubuntu镜像源cn.arichinve.ubuntu.com不可用原因分析和解决
    openlayer 加载4547坐标系 以及 wfs服务数据(或其他坐标系)
    做社交媒体营销应该注意些什么?Shopline卖家的成功秘笈在这里!
    Java集合面试题整理(超详细)
    PHP活动报名微信小程序系统源码
  • 原文地址:https://blog.csdn.net/weixin_40976261/article/details/134282511