本文的ID生成服务参考自Leaf:美团分布式ID生成服务开源,该分布式服务可用于生成业务主键id或业务uid
id预先分配,使用时直接从Redis队列pop出并计算返回
设计想法类似于MySQL自增主键的互斥量:对于”simple inserts”,该值会用互斥量(mutex)去对内存中的计数器进行累加的操作。在获取到需要增加的ID的量后,autoinc_lock就会被释放,不必等到语句执行结束。
细节 | 说明 | 名字解释 |
---|---|---|
id的长度 | long类型 64 字节 | |
id的算法 | indexNext << APPID_BYTE_LEN << GATE_PASS_BYTE_LEN | shardId << APPID_BYTE_LEN | appId | indexNext :为队列下一个数字,下文详细解释 appId: 分配给业务的业务id shardId:Math.abs(gatePass.hashCode()) % MAX_GATE_PASS_VALUE; gatePass:通行证,可以是用户名,手机号,邮箱等等 单位字节长度 GATE_PASS_BYTE_LEN = 6 APPID_BYTE_LEN = 8 |
indexNext | long shardId = Math.abs(gatePass.hashCode()) % MAX_GATE_PASS_VALUE; String redisKey = “id.gen.idList_”+appId + “_” + shardId ; | 每一个 appid 都会分配 64个shardId,每对appid+shardId对应一个redis list队列,从 gatePass hashcode 取余后从对应redis队列pop出 MAX_GATE_PASS_VALUE = 2 << GATE_PASS_BYTE_LEN - 1 |
号段 | redis list存放着号段,在号段数量小于设定值时从DB load下一号段 | 阈值为IDLE * step |
maxLimitId | 业务最大限制id | 由于部分业务协议可能存在unsigned int等类型,可为对应的业务设置最大限制id,防止生成的id超出业务限制 |
CREATE TABLE `id_alloc` (
`app_id` int(11) NOT NULL COMMENT '业务id',
`app_name` varchar(128) NOT NULL COMMENT '业务名',
`max_id` bigint(20) NOT NULL DEFAULT '1' COMMENT '已用最大id',
`shard_id` int(11) NOT NULL COMMENT '分片id',
`step` int(11) NOT NULL DEFAULT '1000' COMMENT '号段数量',
`description` varchar(255) DEFAULT NULL,
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`max_limit_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '业务最大限制id',
PRIMARY KEY (`app_id`,`shard_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='id生成'
基于spring-boot 2.6.6版本
@Slf4j
@Service
public class IdGenService {
public static final String REDIS_KEY_PREFIX = "id.gen.idList_";
public static final String LOCK_PREFIX = "id.gen.lock_";
public static final Duration LOCK_TIME = Duration.ofSeconds(60);
private volatile boolean initOK = false;
public final static int RETRY_TIME = 2;
public static final double IDLE = 0.2D;
private Map<Integer, Set<Integer>> allAppShard = new ConcurrentHashMap<>();
private Map<Integer, Long> maxLimitIdMap = new ConcurrentHashMap<>();
@Autowired
private IdAllocRepository idAllocRepository;
@Autowired
private StringRedisTemplate redis;
@PostConstruct
public void init() {
log.info("Init ...");
updateConfigFromDb(false);
initOK = true;
}
/**
* 更新db的appId+shardId
*
* @param cleanLock 获得lock的进程清除redis队列
*/
public void updateConfigFromDb(boolean cleanLock) {
log.info("update config from db");
try {
List<IdAlloc> dbApps = idAllocRepository.getAllLeafAlloc();
if (CollectionUtils.isEmpty(dbApps)) {
return;
}
Map<Integer, Set<Integer>> insertAppsMap = dbApps.stream().collect(Collectors.groupingBy(IdAlloc::getAppId,
ConcurrentHashMap::new,
Collectors.mapping(IdAlloc::getShardId, Collectors.toSet())));
Set<Integer> cacheAppsSet = new HashSet<>(allAppShard.keySet());
// 已失效的appId从cache删除
if (cleanLock) {
for (int appId : cacheAppsSet) {
if (!insertAppsMap.containsKey(appId)) {
log.info("remove redis queue from appId={}", appId);
Set<String> removeKeys = allAppShard.get(appId).stream().map(shardId -> getRedisKey(appId, shardId))
.collect(Collectors.toSet());
redis.delete(removeKeys);
}
}
}
allAppShard = insertAppsMap;
maxLimitIdMap = dbApps.stream().collect(Collectors.toMap(IdAlloc::getAppId, IdAlloc::getMaxLimitId, (k1, k2) -> k1));
} catch (Exception e) {
log.warn("update config from db exception", e);
}
}
public Result get(int appId, String gatePass) {
if (!initOK) {
return new Result(IdConstant.EXCEPTION_ID_IDCACHE_INIT_FALSE);
}
if (allAppShard.containsKey(appId)) {
int shardId = Math.abs(gatePass.hashCode()) % IdConstant.MAX_GATE_PASS_VALUE;
if (CollectionUtils.isEmpty(allAppShard.get(appId)) || !allAppShard.get(appId).contains(shardId)) {
return new Result(IdConstant.EXCEPTION_ID_SHARD_NOT_EXISTS);
}
return getIdFromQueue(appId, shardId);
}
return new Result(IdConstant.EXCEPTION_ID_KEY_NOT_EXISTS);
}
/**
* 更新redis号段
*
* @param appId 业务id
* @param shardId 分片id
*/
public void updateSegmentFromDb(int appId, int shardId) {
String lockKey = LOCK_PREFIX + appId + "_" + shardId;
try {
if (BooleanUtils.isFalse(redis.opsForValue().setIfAbsent(lockKey, String.valueOf(System.currentTimeMillis()), LOCK_TIME))) {
log.info("{} lock failed return", lockKey);
return;
}
IdAlloc idAlloc = idAllocRepository.updateAndGetMaxId(appId, shardId);
List<String> idGenList = Lists.newArrayListWithExpectedSize(idAlloc.getStep());
for (long i = idAlloc.getMaxId() - idAlloc.getStep() + 1; i <= idAlloc.getMaxId(); i++) {
idGenList.add(String.valueOf(i));
}
redis.opsForList().rightPushAll(getRedisKey(appId, shardId), idGenList);
log.info("updateSegmentFromDb, appId:{}, idAlloc:{}", appId, idAlloc);
} catch (Exception e) {
log.warn("updateSegmentFromDb fail, lockKey:{}", lockKey, e);
} finally {
redis.delete(lockKey);
}
}
/**
* 定时更新redis号段
*/
@Scheduled(cron = "0 */2 * * * ?")
@JobLock(timeout = 1800)
public void reloadSegment() {
try {
List<IdAlloc> dbApps = idAllocRepository.getAllLeafAlloc();
if (CollectionUtils.isEmpty(dbApps)) {
return;
}
for (IdAlloc idAlloc : dbApps) {
String redisKey = getRedisKey(idAlloc.getAppId(), idAlloc.getShardId());
Long queueSize = redis.opsForList().size(redisKey);
log.info("queueSize:{}, appId:{}, shardId:{}", queueSize, idAlloc.getAppId(), idAlloc.getShardId());
if (queueSize == null || queueSize <= IDLE * idAlloc.getStep()) {
log.info("need to reload, now queueSize:{}, idAlloc:{}", queueSize, idAlloc);
updateSegmentFromDb(idAlloc.getAppId(), idAlloc.getShardId());
}
}
} catch (Exception e) {
log.warn("update config from db exception", e);
}
}
/**
* 定时更新db的appId+shardId配置
*/
@Scheduled(fixedDelay = 2, initialDelay = 2, timeUnit = TimeUnit.MINUTES)
public void scheduleUpdateConfig() {
boolean cleanLock = BooleanUtils.isTrue(redis.opsForValue().setIfAbsent(REDIS_KEY_PREFIX,
String.valueOf(System.currentTimeMillis()), LOCK_TIME));
log.info("updateConfigFromDb cleanLock:{}", cleanLock);
updateConfigFromDb(cleanLock);
}
public Result getIdFromQueue(int appId, int shardId) {
String redisKey = getRedisKey(appId, shardId);
int cnt = 0;
while (cnt++ < RETRY_TIME) {
try {
String nextId = redis.opsForList().leftPop(redisKey);
if (NumberUtils.isDigits(nextId)) {
long id = NumberUtils.toLong(nextId) << IdConstant.APPID_BYTE_LEN << IdConstant.GATE_PASS_BYTE_LEN |
shardId << IdConstant.APPID_BYTE_LEN | appId;
// 业务最大限制id
id = maxLimitIdMap.getOrDefault(appId, 0L) > 0 ? id % maxLimitIdMap.get(appId) : id;
return new Result(IdConstant.SUCCESS, id);
}
updateSegmentFromDb(appId, shardId);
} catch (Exception e) {
log.warn("getIdFromQueue appId:{}, shardId:{} exception", appId, shardId, e);
}
}
return new Result(IdConstant.EXCEPTION_ID_QUEUE_EMPTY);
}
private String getRedisKey(int appId, int shardId) {
return REDIS_KEY_PREFIX + appId + "_" + shardId;
}
}
public class IdConstant {
public static final int SUCCESS = 0;
public static final int APPID_BYTE_LEN = 8;
public static final int GATE_PASS_BYTE_LEN = 6;
public static final int MAX_GATE_PASS_VALUE = 2 << GATE_PASS_BYTE_LEN - 1;
/**
* IDCache未初始化成功时的异常码
*/
public static final int EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
/**
* appId不存在时的异常码
*/
public static final int EXCEPTION_ID_KEY_NOT_EXISTS = -2;
/**
* shardId不存在时的异常码
*/
public static final int EXCEPTION_ID_SHARD_NOT_EXISTS = -3;
/**
* 队列无数据的异常码
*/
public static final int EXCEPTION_ID_QUEUE_EMPTY = -4;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class IdAlloc {
private int appId;
private String appName;
private int shardId;
private long maxId;
private int step;
private String updateTime;
private long maxLimitId;
}
@Repository
public class IdAllocRepository {
@Resource
private IDAllocMapper idAllocMapper;
/**
* 同一事务中更新后获取,保证获取到本线程改更的号段
*/
@Transactional(transactionManager = YyzoneBaseConfig.TRANSACTION_MANAGER, rollbackFor = {RuntimeException.class, Exception.class})
public IdAlloc updateAndGetMaxId(int appId, int shardId) {
idAllocMapper.updateMaxId(appId, shardId);
return idAllocMapper.getLeafAlloc(appId, shardId);
}
public List<IdAlloc> getAllLeafAlloc(){
return idAllocMapper.getAllLeafAlloc();
}
}
public interface IDAllocMapper {
@Select("SELECT app_name, app_id, shard_id, max_id, step, update_time, max_limit_id FROM id_alloc")
@Results(value = {
@Result(column = "app_name", property = "appName"),
@Result(column = "app_id", property = "appId"),
@Result(column = "shard_id", property = "shardId"),
@Result(column = "max_id", property = "maxId"),
@Result(column = "step", property = "step"),
@Result(column = "update_time", property = "updateTime"),
@Result(column = "max_limit_id", property = "maxLimitId")
})
List<IdAlloc> getAllLeafAlloc();
@Select("SELECT app_name, app_id, shard_id, max_id, step FROM id_alloc WHERE app_id = #{appId} and shard_id = #{shardId} limit 1")
@Results(value = {
@Result(column = "app_name", property = "appName"),
@Result(column = "app_id", property = "appId"),
@Result(column = "shard_id", property = "shardId"),
@Result(column = "max_id", property = "maxId"),
@Result(column = "step", property = "step")
})
IdAlloc getLeafAlloc(@Param("appId") Integer appId, @Param("shardId") Integer shardId);
@Update("UPDATE id_alloc SET max_id = max_id + step WHERE app_id = #{appId} and shard_id = #{shardId}")
void updateMaxId(@Param("appId") Integer appId, @Param("shardId") Integer shardId);
}
DROP PROCEDURE IF EXISTS alloc_initData;
DELIMITER $
CREATE PROCEDURE alloc_initData(
IN appId INT(11),
IN appName VARCHAR(128),
IN description VARCHAR(255)
)
BEGIN
DECLARE i INT DEFAULT 0;
WHILE i<64 DO
INSERT INTO id_alloc(app_id, app_name, max_id, shard_id, description) VALUES(appId,appName,0,i, description);
SET i = i+1;
END WHILE;
END $
CALL alloc_initData(1,'test','描述');
参考资料: