• 分布式ID生成服务


    本文的ID生成服务参考自Leaf:美团分布式ID生成服务开源,该分布式服务可用于生成业务主键id或业务uid

    算法效果

    1. id必须是数字,且不可以重复
    2. id不可为自增,避免用户猜出业务数量
    3. id需要支持分布式服务部署
    4. 不同业务的id也不可能重复

    算法设计

    id预先分配,使用时直接从Redis队列pop出并计算返回

    设计想法类似于MySQL自增主键的互斥量:对于”simple inserts”,该值会用互斥量(mutex)去对内存中的计数器进行累加的操作。在获取到需要增加的ID的量后,autoinc_lock就会被释放,不必等到语句执行结束。

    细节说明名字解释
    id的长度long类型 64 字节
    id的算法indexNext << APPID_BYTE_LEN << GATE_PASS_BYTE_LEN | shardId << APPID_BYTE_LEN | appId
    indexNext :为队列下一个数字,下文详细解释
    appId: 分配给业务的业务id
    shardId:Math.abs(gatePass.hashCode()) % MAX_GATE_PASS_VALUE;
    gatePass:通行证,可以是用户名,手机号,邮箱等等

    单位字节长度
    GATE_PASS_BYTE_LEN = 6
    APPID_BYTE_LEN = 8
    indexNextlong shardId = Math.abs(gatePass.hashCode()) % MAX_GATE_PASS_VALUE;
    String redisKey = “id.gen.idList_”+appId + “_” + shardId ;
    每一个 appid 都会分配 64个shardId,每对appid+shardId对应一个redis list队列,从 gatePass hashcode 取余后从对应redis队列pop出

    MAX_GATE_PASS_VALUE = 2 << GATE_PASS_BYTE_LEN - 1
    号段redis list存放着号段,在号段数量小于设定值时从DB load下一号段阈值为IDLE * step
    maxLimitId业务最大限制id由于部分业务协议可能存在unsigned int等类型,可为对应的业务设置最大限制id,防止生成的id超出业务限制

    算法实现

    • 从redis队列获取id,计算结果
    • 定时更新db的appId+shardId配置
    • 定时更新redis号段

    MySQL表结构

    CREATE TABLE `id_alloc` (
      `app_id` int(11) NOT NULL COMMENT '业务id',
      `app_name` varchar(128) NOT NULL COMMENT '业务名',
      `max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '已用最大id',
      `shard_id` int(11) NOT NULL COMMENT '分片id',
      `step` int(11) NOT NULL DEFAULT '1000' COMMENT '号段数量',
      `description` varchar(255) DEFAULT NULL,
      `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
      `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
      `max_limit_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '业务最大限制id',
      PRIMARY KEY (`app_id`,`shard_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='id生成'
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    代码实现

    基于spring-boot 2.6.6版本

    @Slf4j
    @Service
    public class IdGenService {
    
        public static final String REDIS_KEY_PREFIX = "id.gen.idList_";
        public static final String LOCK_PREFIX = "id.gen.lock_";
        public static final Duration LOCK_TIME = Duration.ofSeconds(60);
        private volatile boolean initOK = false;
        public final static int RETRY_TIME = 2;
        public static final double IDLE = 0.2D;
        private Map<Integer, Set<Integer>> allAppShard = new ConcurrentHashMap<>();
        private Map<Integer, Long> maxLimitIdMap = new ConcurrentHashMap<>();
        @Autowired
        private IdAllocRepository idAllocRepository;
    
        @Autowired
        private StringRedisTemplate redis;
    
        @PostConstruct
        public void init() {
            log.info("Init ...");
            updateConfigFromDb(false);
            initOK = true;
        }
    
        /**
         * 更新db的appId+shardId
         *
         * @param cleanLock 获得lock的进程清除redis队列
         */
        public void updateConfigFromDb(boolean cleanLock) {
            log.info("update config from db");
            try {
                List<IdAlloc> dbApps = idAllocRepository.getAllLeafAlloc();
                if (CollectionUtils.isEmpty(dbApps)) {
                    return;
                }
    
                Map<Integer, Set<Integer>> insertAppsMap = dbApps.stream().collect(Collectors.groupingBy(IdAlloc::getAppId,
                        ConcurrentHashMap::new,
                        Collectors.mapping(IdAlloc::getShardId, Collectors.toSet())));
                Set<Integer> cacheAppsSet = new HashSet<>(allAppShard.keySet());
    
                // 已失效的appId从cache删除
                if (cleanLock) {
                    for (int appId : cacheAppsSet) {
                        if (!insertAppsMap.containsKey(appId)) {
                            log.info("remove redis queue from appId={}", appId);
                            Set<String> removeKeys = allAppShard.get(appId).stream().map(shardId -> getRedisKey(appId, shardId))
                                    .collect(Collectors.toSet());
                            redis.delete(removeKeys);
                        }
                    }
                }
                allAppShard = insertAppsMap;
                maxLimitIdMap = dbApps.stream().collect(Collectors.toMap(IdAlloc::getAppId, IdAlloc::getMaxLimitId, (k1, k2) -> k1));
            } catch (Exception e) {
                log.warn("update config from db exception", e);
            }
        }
    
        public Result get(int appId, String gatePass) {
            if (!initOK) {
                return new Result(IdConstant.EXCEPTION_ID_IDCACHE_INIT_FALSE);
            }
            if (allAppShard.containsKey(appId)) {
                int shardId = Math.abs(gatePass.hashCode()) % IdConstant.MAX_GATE_PASS_VALUE;
                if (CollectionUtils.isEmpty(allAppShard.get(appId)) || !allAppShard.get(appId).contains(shardId)) {
                    return new Result(IdConstant.EXCEPTION_ID_SHARD_NOT_EXISTS);
                }
                return getIdFromQueue(appId, shardId);
            }
            return new Result(IdConstant.EXCEPTION_ID_KEY_NOT_EXISTS);
        }
    
        /**
         * 更新redis号段
         *
         * @param appId   业务id
         * @param shardId 分片id
         */
        public void updateSegmentFromDb(int appId, int shardId) {
            String lockKey = LOCK_PREFIX + appId + "_" + shardId;
            try {
                if (BooleanUtils.isFalse(redis.opsForValue().setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()), LOCK_TIME))) {
                    log.info("{} lock failed return", lockKey);
                    return;
                }
    
                IdAlloc idAlloc = idAllocRepository.updateAndGetMaxId(appId, shardId);
                List<String> idGenList = Lists.newArrayListWithExpectedSize(idAlloc.getStep());
                for (long i = idAlloc.getMaxId() - idAlloc.getStep() + 1; i <= idAlloc.getMaxId(); i++) {
                    idGenList.add(String.valueOf(i));
                }
    
                redis.opsForList().rightPushAll(getRedisKey(appId, shardId), idGenList);
                log.info("updateSegmentFromDb, appId:{}, idAlloc:{}", appId, idAlloc);
            } catch (Exception e) {
                log.warn("updateSegmentFromDb fail, lockKey:{}", lockKey, e);
            } finally {
                redis.delete(lockKey);
            }
        }
    
        /**
         * 定时更新redis号段
         */
        @Scheduled(cron = "0 */2 * * * ?")
        @JobLock(timeout = 1800)
        public void reloadSegment() {
            try {
                List<IdAlloc> dbApps = idAllocRepository.getAllLeafAlloc();
                if (CollectionUtils.isEmpty(dbApps)) {
                    return;
                }
    
                for (IdAlloc idAlloc : dbApps) {
                    String redisKey = getRedisKey(idAlloc.getAppId(), idAlloc.getShardId());
                    Long queueSize = redis.opsForList().size(redisKey);
                    log.info("queueSize:{}, appId:{}, shardId:{}", queueSize, idAlloc.getAppId(), idAlloc.getShardId());
                    if (queueSize == null || queueSize <= IDLE * idAlloc.getStep()) {
                        log.info("need to reload, now queueSize:{}, idAlloc:{}", queueSize, idAlloc);
                        updateSegmentFromDb(idAlloc.getAppId(), idAlloc.getShardId());
                    }
                }
            } catch (Exception e) {
                log.warn("update config from db exception", e);
            }
        }
    
        /**
         * 定时更新db的appId+shardId配置
         */
        @Scheduled(fixedDelay = 2, initialDelay = 2, timeUnit = TimeUnit.MINUTES)
        public void scheduleUpdateConfig() {
            boolean cleanLock = BooleanUtils.isTrue(redis.opsForValue().setIfAbsent(REDIS_KEY_PREFIX,
                    String.valueOf(System.currentTimeMillis()), LOCK_TIME));
            log.info("updateConfigFromDb cleanLock:{}", cleanLock);
            updateConfigFromDb(cleanLock);
        }
    
        public Result getIdFromQueue(int appId, int shardId) {
            String redisKey = getRedisKey(appId, shardId);
            int cnt = 0;
            while (cnt++ < RETRY_TIME) {
                try {
                    String nextId = redis.opsForList().leftPop(redisKey);
                    if (NumberUtils.isDigits(nextId)) {
                        long id = NumberUtils.toLong(nextId) << IdConstant.APPID_BYTE_LEN << IdConstant.GATE_PASS_BYTE_LEN |
                                shardId << IdConstant.APPID_BYTE_LEN | appId;
                        // 业务最大限制id
                        id = maxLimitIdMap.getOrDefault(appId, 0L) > 0 ? id % maxLimitIdMap.get(appId) : id;
                        return new Result(IdConstant.SUCCESS, id);
                    }
    
                    updateSegmentFromDb(appId, shardId);
                } catch (Exception e) {
                    log.warn("getIdFromQueue appId:{}, shardId:{} exception", appId, shardId, e);
                }
            }
            return new Result(IdConstant.EXCEPTION_ID_QUEUE_EMPTY);
        }
    
        private String getRedisKey(int appId, int shardId) {
            return REDIS_KEY_PREFIX + appId + "_" + shardId;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    Constant
    public class IdConstant {
        public static final int SUCCESS = 0;
    
        public static final int APPID_BYTE_LEN = 8;
        public static final int GATE_PASS_BYTE_LEN = 6;
        public static final int MAX_GATE_PASS_VALUE = 2 << GATE_PASS_BYTE_LEN - 1;
    
        /**
         * IDCache未初始化成功时的异常码
         */
        public static final int EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
        /**
         * appId不存在时的异常码
         */
        public static final int EXCEPTION_ID_KEY_NOT_EXISTS = -2;
        /**
         * shardId不存在时的异常码
         */
        public static final int EXCEPTION_ID_SHARD_NOT_EXISTS = -3;
        /**
         * 队列无数据的异常码
         */
        public static final int EXCEPTION_ID_QUEUE_EMPTY = -4;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    Model
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class IdAlloc {
        private int appId;
        private String appName;
        private int shardId;
        private long maxId;
        private int step;
        private String updateTime;
        private long maxLimitId;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    Repository
    @Repository
    public class IdAllocRepository {
    
        @Resource
        private IDAllocMapper idAllocMapper;
    
        /**
         * 同一事务中更新后获取,保证获取到本线程改更的号段
         */
        @Transactional(transactionManager = YyzoneBaseConfig.TRANSACTION_MANAGER, rollbackFor = {RuntimeException.class, Exception.class})
        public IdAlloc updateAndGetMaxId(int appId, int shardId) {
            idAllocMapper.updateMaxId(appId, shardId);
            return idAllocMapper.getLeafAlloc(appId, shardId);
        }
    
        public List<IdAlloc> getAllLeafAlloc(){
            return idAllocMapper.getAllLeafAlloc();
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    Mapper
    public interface IDAllocMapper {
    
        @Select("SELECT app_name, app_id, shard_id, max_id, step, update_time, max_limit_id FROM id_alloc")
        @Results(value = {
                @Result(column = "app_name", property = "appName"),
                @Result(column = "app_id", property = "appId"),
                @Result(column = "shard_id", property = "shardId"),
                @Result(column = "max_id", property = "maxId"),
                @Result(column = "step", property = "step"),
                @Result(column = "update_time", property = "updateTime"),
                @Result(column = "max_limit_id", property = "maxLimitId")
        })
        List<IdAlloc> getAllLeafAlloc();
    
        @Select("SELECT app_name, app_id, shard_id, max_id, step FROM id_alloc WHERE app_id = #{appId} and shard_id = #{shardId} limit 1")
        @Results(value = {
                @Result(column = "app_name", property = "appName"),
                @Result(column = "app_id", property = "appId"),
                @Result(column = "shard_id", property = "shardId"),
                @Result(column = "max_id", property = "maxId"),
                @Result(column = "step", property = "step")
        })
        IdAlloc getLeafAlloc(@Param("appId") Integer appId, @Param("shardId") Integer shardId);
    
        @Update("UPDATE id_alloc SET max_id = max_id + step WHERE app_id = #{appId} and shard_id = #{shardId}")
        void updateMaxId(@Param("appId") Integer appId, @Param("shardId") Integer shardId);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    初始化数据脚本

    DROP PROCEDURE IF EXISTS alloc_initData;
    DELIMITER $
    CREATE PROCEDURE alloc_initData(
    IN appId INT(11),
    IN appName VARCHAR(128),
    IN description VARCHAR(255)
    )
    BEGIN
        DECLARE i INT DEFAULT 0;
        WHILE i<64 DO
            INSERT INTO id_alloc(app_id, app_name, max_id, shard_id, description) VALUES(appId,appName,0,i, description);
            SET i = i+1;
        END WHILE;
    END $
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    CALL alloc_initData(1,'test','描述');
    
    • 1

    参考资料:

    1. Leaf
    2. Leaf——美团点评分布式ID生成系统
    3. Leaf:美团分布式ID生成服务开源
  • 相关阅读:
    一文彻底搞懂 JS 闭包
    Flutter系列文章-Flutter UI进阶
    计算机毕业设计ssm+vue基本微信小程序的图书馆座位管理系统
    【数学建模学习笔记【集训十天】之第一天】
    【数据结构】基础:队列(C语言)
    maven
    3分钟认识Vue3的v-model
    vue video播放m3u8源
    回调函数机制
    Python大数据之Python进阶(三)多进程的使用
  • 原文地址:https://blog.csdn.net/why_still_confused/article/details/126614559