经常遇到这样的情况,项目中需要使用分布式锁,发现没有完整的产品可以使用,就临时写个分布式锁,花费许多时间和精力来开发和测试一个简单的分布式锁,实现部分功能就使用了。后面遇到这种情况,又去重新开发和测试,重复建设。
这里分享一个分布式锁的产品,实现完整功能,接入简单,方便业务方使用,大家如果自己研发分布式锁工具可以参考,但由于是内部项目,地址也访问不到仅供参考。
只接入需要的pom依赖,不需要的就不用接了。隐式依赖 slf4j-api-1.7.5 。
如下的pom,就是可用的最新版本。
<!-- tair lock -->
<dependency>
<groupId>com.alibaba.gtool</groupId>
<artifactId>xlock-tair</artifactId>
<version>1.2.1</version>
</dependency>
<!-- redis lock -->
<dependency>
<groupId>com.alibaba.gtool</groupId>
<artifactId>xlock-redis</artifactId>
<version>1.2.1</version>
</dependency>
//tair 锁管理器(通过 incr 实现)
TairLockManagerByIncr lockManager = new TairLockManagerByIncr();
lockManager.setTairManager(tairManager);//参数
lockManager.setTairNamespace(tairNamespace);//参数
lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
//tair 锁管理器(通过 version 实现)
TairLockManagerByVersion lockManager = new TairLockManagerByVersion();
lockManager.setTairManager(tairManager);//参数
lockManager.setTairNamespace(tairNamespace);//参数
lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
//redis 锁管理器
RedisLockManager lockManager = new RedisLockManager();
lockManager.setJedisPool(jedisPool);//JedisPool是线程安全的。
lockManager.setJedisDBIndex(0);//redis的库。默认0
lockManager.setEnableLogWarn(true);//是否使用logWarn输出。默认false
//由锁管理器创建锁
XLock lock = lockManager.createLock("lock_taobao_user_1200000000");
建议至少要配置事件 LOSE_LOCK(被动丢失锁) 的监听器。
//这里举个例子。实际可以配置多个不同的事件监听器。
YourBizListener listener = new YourBizListener(XLockEventEnum.LOSE_LOCK);
lock.addEventListener(listener);
加锁,解锁。为了安全,建议使用 try-catch-finally 结构。
boolean lockResult = false;
try {
/**
* 尝试一次加锁。不阻塞,不等待。
* 加锁成功返回true,否则返回false。
*/
lockResult = lock.tryLock();
if (lockResult) {
//do something
Thread.sleep(2000L);
}
} catch (Exception e) {
logger.error("ex", e);
} finally {
if (lockResult) {
/**
* 解锁
*/
lock.unLock();
}
}
try {
/**
* 加锁。
* 加锁成功,返回true。
* 否则,一直等待,直到加锁成功。
*/
lock.lock();
//do something
Thread.sleep(2000L);
} catch (Exception e) {
logger.error("ex", e);
} finally {
/**
* 解锁
*/
lock.unLock();
}
【分布式锁】
【原子操作型锁】
【Tair锁】
【Redis锁】
【分布式锁的管理器】
/**
* 分布式锁的管理器
*
* @author gumao
* @since 2016-11-21
*/
public interface XLockManager<A extends XLock> {
/**
* 创建锁对象。
*/
public A createLock(String lockName);
/**
* 移除锁对象。由使用方自己调用,谨慎使用。
*/
public boolean removeLock(A lock);
}
【分布式锁,指定锁的基础功能】
/**
* 分布式锁。指定锁的基础功能。
*
* @author gumao
* @since 2016-11-15
*/
public interface XLock {
/**
* 尝试一次加锁。不阻塞,不等待。
* 加锁成功返回true,否则返回false。
*/
public boolean tryLock() throws Exception;
/**
* 尝试一次加锁。
* 加锁成功,返回true。
* 加锁失败,就等待最大时长。最终加锁成功,返回true;否则,返回false。
*/
public boolean tryLock(long maxWaitMillis) throws Exception;
/**
* 加锁。
* 加锁成功,返回true。
* 否则,一直等待,直到加锁成功。
*/
public void lock() throws Exception;
/**
* 解锁
*/
public boolean unLock() throws Exception;
/**
* 锁的名称
*/
public String getName();
/**
* 线程是否加锁成功
*/
public boolean isThreadSuccess(Thread thread);
/**
* 添加一个事件监听器
*/
public void addEventListener(XLockEventListener listener);
/**
* 删除一个事件监听器
*/
public void removeEventListener(XLockEventListener listener);
}
【分布式锁的事件】
/**
* 分布式锁的事件。
*
* @author gumao
* @since 2016-11-20
*/
public enum XLockEventEnum {
LOCK_SUCCESS("加锁成功"), LOCK_FAIL("加锁失败"), UNLOCK_SUCCESS("解锁成功"), UNLOCK_FAIL("解锁失败"), LOSE_LOCK("被动丢失锁");
private String name;
private XLockEventEnum(String name) {
this.name = name;
}
}
【分布式锁的事件监听器】
/**
* 分布式锁的事件监听器
*
* @author gumao
* @since 2016-11-20
*/
public interface XLockEventListener {
/**
* 事件
*/
public XLockEventEnum getEvent();
/**
* 触发监听器
*/
public void fire(Thread thread, XLock xlock);
}
【原子操作型分布式锁的实现】
/**
* atomic 锁管理器
*
* @author gumao
* @since 2016-11-16
*/
public abstract class AtomicLockManager implements XLockManager<AtomicLock> {
private static Logger logger = LoggerFactory.getLogger(AtomicLockManager.class);
//心跳秒值
private int heartbeatSeconds = 2;
//会话秒值
private int sessionSeconds = heartbeatSeconds * 10;
//检查锁空闲的毫秒值
private long checkLockFreeMillis = 200L;
//锁名称和锁对象的映射
private Map<String, AtomicLock> lockMap = new HashMap<String, AtomicLock>(20);
//是否启用warn日志。很多应用把级别设置为warn。
private boolean enableLogWarn = false;
/**
* 创建锁对象。
*/
public AtomicLock createLock(String lockName) {
synchronized (this.lockMap) {
AtomicLock lock = this.lockMap.get(lockName);
if (null != lock) {
return lock;
}
lock = new AtomicLock(lockName);
lock.setAtomicLockManager(this);
this.lockMap.put(lockName, lock);
return lock;
}
}
/**
* 移除锁对象。由使用方自己调用,谨慎使用。
*/
public boolean removeLock(AtomicLock lock) {
synchronized (this.lockMap) {
atomicKill(lock);
this.lockMap.remove(lock.getName());
}
return true;
}
/**
* 删除锁的处理
*/
protected void atomicKill(AtomicLock lock) {
}
/**
* 加锁。
* 成功返回true,失败返回false
*/
protected abstract boolean atomicLock(Thread thread, AtomicLock lock);
/**
* 让锁的有效期延长。
* 返回线程是否有锁。
*/
protected abstract boolean atomicTouch(Thread thread, AtomicLock lock);
/**
* 线程是否有锁。
*/
protected abstract boolean atomicIsLocked(Thread thread, AtomicLock lock);
/**
* 锁是否被锁住了。
*/
protected abstract boolean atomicIsLocked(AtomicLock lock);
/**
* 解锁
* 成功返回true,失败返回false
*/
protected abstract boolean atomicUnlock(Thread thread, AtomicLock lock);
/**
* 某线程尝试一次加锁。加锁成功返回true,否则返回false。
*/
public boolean tryLock(Thread thread, AtomicLock lock) {
boolean lockSuccess = false;
String extra = "";
//如果已经有锁了,就返回成功,不用重复加锁
if (lock.isThreadSuccess(thread)) {
lockSuccess = true;
extra = "alreadyHaveLock . no need to lock again . ";
} else {
//加锁成功
if (this.atomicLock(thread, lock)) {
lockSuccess = true;
//保持锁的任务
lock.startRemainTask(thread);
//设置线程拥有锁
lock.markThreadSuccess(thread);
} else {
//设置线程没有锁
lock.markThreadFail(thread);
}
}
if (this.enableLogWarn) {
StringBuilder buf = new StringBuilder(130);
buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" tryLock");
buf.append(" lockResult=").append(lockSuccess);
buf.append(" thread=").append(thread.getName());
buf.append(" lock=").append(lock.getName());
buf.append(" extra=").append(extra);
logger.warn(buf.toString());
}
return lockSuccess;
}
/**
* 加锁。里面用到了线程阻塞,必须用当前线程来调用此方法。
* maxWaitMillis <= 0 时,加锁失败会一直阻塞。直到成功。
* maxWaitMillis > 0 时,加锁失败会最多阻塞maxWaitMillis毫秒。
*
* @param lock 锁
* @param maxWaitMillis 最大等待毫秒值
* @return 加锁成功true, 加锁失败false
* @throws Exception
*/
public boolean tryLock(AtomicLock lock, long maxWaitMillis) throws Exception {
Thread thread = Thread.currentThread();
//最终的毫秒值
long finalMillis = System.currentTimeMillis() + maxWaitMillis;
//--------------------------------------------------------
//--------------------------------------------------------
try {
//加锁
lock.getReentrantLock().lock();
//把线程加入锁的等待队列
lock.addToWaitQueue(thread);
//----------------------------------------------------------
//无限阻塞
if (maxWaitMillis <= 0) {
while (true) {
//如果指定线程获得了锁,就成功
if (this.tryLock(thread, lock)) {
break;
}
//等待可以重试这个条件
lock.getCanTryLockCondition().await();
}
return true;
}
//----------------------------------------------------------
//指定超时时间
else {
boolean lockSuccess = false;
while (true) {
//如果指定线程获得了锁,就成功
if (this.tryLock(thread, lock)) {
lockSuccess = true;
break;
}
//可以休眠的毫秒值
long sleepMillis = finalMillis - System.currentTimeMillis();
//如果超过最大等待时间,就退出
if (sleepMillis <= 0) {
break;
}
//等待可以重试这个条件
lock.getCanTryLockCondition().await(sleepMillis, TimeUnit.MILLISECONDS);
}
return lockSuccess;
}
//----------------------------------------------------------
} catch (Exception e) {
logger.error("tryLock", e);
throw e;
} finally {
// 把线程移出锁的等待队列
lock.removeFromWaitQueue(thread);
//解锁
lock.getReentrantLock().unlock();
}
}
/**
* 构建日志。统一格式。
*/
protected String buildLog(String title, Thread thread, AtomicLock lock, String extra) {
StringBuilder buf = new StringBuilder(200);
buf.append("\n ==============================");
buf.append("\n ").append(title);
buf.append("\n now =").append(XlockUtil.nowDatetime());
if (null != thread) {
buf.append("\n thread =").append(thread.getName()).append(" ").append(thread.getId());
}
if (null != lock) {
buf.append("\n lock =").append(lock.getName());
}
if (null != extra) {
buf.append("\n extra =").append(extra);
}
buf.append("\n ==============================");
return buf.toString();
}
/**
* 解锁。必须是当前线程解锁。
*/
public boolean unLock(AtomicLock lock) {
Thread thread = Thread.currentThread();
//如果已经没有该锁,就不能解锁
if (!lock.isThreadSuccess(thread)) {
String log = this.buildLog("AtomicLockManager unLock fail !", thread, lock,
"Current thread does not have this lock , so cannot unlock this lock .");
logger.error(log);
return false;
}
//解锁
boolean result = this.atomicUnlock(thread, lock);
//如果解锁成功
if (result) {
//设置当前线程没有锁
lock.markThreadFail(thread);
//停止持有锁
lock.stopRemainTask(thread);
//唤醒正在等待此锁的线程
lock.signalAll();
}
return result;
}
/**
* 保持锁。
*/
public void remainLock(AtomicLockRemainTask task) {
//锁对象
AtomicLock lock = task.getAtomicLock();
//指定的线程
Thread thread = task.getThread();
//----------------------------------------------------------
// 该key是否被锁定
boolean isLocked = this.atomicTouch(thread, lock);
if (this.enableLogWarn) {
StringBuilder buf = new StringBuilder(120);
buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" remainLock thread=").append(thread.getName()).append(" lock=").append(lock
.getName()).append(" isLocked=").append(isLocked);
logger.warn(buf.toString());
}
//----------------------------------------------------------
//如果指定的线程,不在加锁成功的线程队列中,就终止任务。这种情况在线程主动unlock后会发生。
if (!lock.isThreadSuccess(thread)) {
//终止任务
task.getFuture().cancel(true);
if (this.enableLogWarn) {
String log = this.buildLog("AtomicLockRemainTask is cancelled ", thread, lock,
"thread not lock success anymore , this may be normal !");
logger.warn(log);
}
return;
}
//----------------------------------------------------------
//如果没有锁。就尝试重新加锁。(正常情况不应该发生此事)
if (!isLocked) {
StringBuilder buf = new StringBuilder(100);
//尝试重新加锁。加锁成功。
if (this.atomicLock(thread, lock)) {
buf.append(" >> retry lock success ! GOOD !");
//加锁成功。
}
//加锁失败。
else {
buf.append(" >> retry lock fail , then cancel task , this may be normal ! Please find reason !");
//标记线程加锁失败
lock.markThreadFail(thread);
//终止任务
task.getFuture().cancel(true);
//某线程被动丢失了锁
AtomicLockFireEventTask task2 = new AtomicLockFireEventTask(XLockEventEnum.LOSE_LOCK, thread, lock);
XlockUtil.getScheduler().execute(task2);
}
String log = this.buildLog("AtomicLockRemainTask # lose lock , so retry lock again ", thread, lock,
buf.toString());
logger.error(log);
}
}
/**
* 创建线程的uuid,全局标识一个线程。
* 必须全局唯一。
*/
public String createThreadUUID(Thread thread, AtomicLock lock) {
return UUID.randomUUID().toString();
}
/**
* 如果锁空闲,就唤醒等待队列里面的线程,去尝试加锁
*/
public void signalAllIfFree(AtomicLock atomicLock) {
//锁是否被锁住了
boolean isLocked = this.atomicIsLocked(atomicLock);
//如果锁没有被锁,可以尝试加锁
if (!isLocked) {
//唤醒正在等待此锁的线程
atomicLock.signalAll();
//------------------------------------------------
if (this.enableLogWarn) {
StringBuilder buf = new StringBuilder(120);
buf.append(" >> ").append(XlockUtil.nowDatetime()).append(" AtomicLockCheckFreeTask ").append(" lock=").append(atomicLock
.getName()).append(" isLocked=").append(isLocked).append(" . So signalAll to try lock !");
logger.warn(buf.toString());
}
}
}
//--------------------------------------------------
public int getHeartbeatSeconds() {
return heartbeatSeconds;
}
public void setHeartbeatSeconds(int heartbeatSeconds) {
this.heartbeatSeconds = heartbeatSeconds;
}
public int getSessionSeconds() {
return sessionSeconds;
}
public void setSessionSeconds(int sessionSeconds) {
this.sessionSeconds = sessionSeconds;
}
public boolean isEnableLogWarn() {
return enableLogWarn;
}
public void setEnableLogWarn(boolean enableLogWarn) {
this.enableLogWarn = enableLogWarn;
}
public long getCheckLockFreeMillis() {
return checkLockFreeMillis;
}
public void setCheckLockFreeMillis(long checkLockFreeMillis) {
this.checkLockFreeMillis = checkLockFreeMillis;
}
}
/**
* atomic 锁对象。
*
* @author gumao
* @since 2016-11-15
*/
public class AtomicLock implements XLock {
private static Logger logger = LoggerFactory.getLogger(AtomicLock.class);
//名称
private String name;
//锁管理器
private AtomicLockManager atomicLockManager;
//加锁成功的线程
private Set<Thread> successThreads = new HashSet<Thread>(4);
//等待队列
private Set<Thread> waitQueue = new HashSet<Thread>(10);
//并发锁
private Lock reentrantLock = new ReentrantLock();
//可以重试加锁的条件
private Condition canTryLockCondition = reentrantLock.newCondition();
//校验锁是否空闲
private AtomicLockCheckFreeTask checkFreeTask = new AtomicLockCheckFreeTask();
//事件监听器
private Map<XLockEventEnum, Set<XLockEventListener>> eventListenerMap = new ConcurrentHashMap<XLockEventEnum, Set<XLockEventListener>>(8);
//线程保持锁的任务
private Map<Thread, AtomicLockRemainTask> remainTaskMap = new HashMap<Thread, AtomicLockRemainTask>();
//线程的唯一键
private Map<Thread, String> threadUUIDMap = new HashMap<Thread, String>();
//线程的扩展数据
private Map<Thread, Object> threadExtendMap = new HashMap<Thread, Object>();
//--------------------------------------------------
public AtomicLock(String name) {
String regexp = "^[\\w\\d\\._\\-\\+/,;:#=]*$";
if (!name.matches(regexp)) {
throw new IllegalArgumentException("AtomicLock name must match regexp " + regexp);
}
this.name = name;
init();
}
/**
* 某些初始化
*/
private void init() {
for (XLockEventEnum event : XLockEventEnum.values()) {
eventListenerMap.put(event, new HashSet<XLockEventListener>(4));
}
}
/**
* 添加一个事件监听器
*/
public void addEventListener(XLockEventListener listener) {
Set<XLockEventListener> listeners = eventListenerMap.get(listener.getEvent());
synchronized (listeners) {
listeners.add(listener);
}
}
/**
* 删除一个事件监听器
*/
public void removeEventListener(XLockEventListener listener) {
Set<XLockEventListener> listeners = eventListenerMap.get(listener.getEvent());
synchronized (listeners) {
listeners.remove(listener);
}
}
/**
* 触发某事件
*/
public void fireEvent(XLockEventEnum event, Thread thread) {
Set<XLockEventListener> listeners = eventListenerMap.get(event);
synchronized (listeners) {
for (XLockEventListener listener : listeners) {
try {
listener.fire(thread, this);
} catch (Exception e) {
logger.error("fireEvent", e);
continue;
}
}
}
}
/**
* 把线程放入等待队列
*/
public void addToWaitQueue(Thread thread) {
synchronized (this.waitQueue) {
//加入等待队列
this.waitQueue.add(thread);
//如果为1,表示刚开始有线程在等待,就启动任务去检测锁是否空闲
int size = this.waitQueue.size();
if (1 == size) {
//重试加锁的任务
checkFreeTask.setAtomicLockManager(atomicLockManager);
checkFreeTask.setAtomicLock(this);
Future taskFuture = XlockUtil.getScheduler().scheduleAtFixedRate(checkFreeTask, atomicLockManager.getCheckLockFreeMillis(),
atomicLockManager.getCheckLockFreeMillis(), TimeUnit.MILLISECONDS);
checkFreeTask.setFuture(taskFuture);
if (this.atomicLockManager.isEnableLogWarn()) {
logger.warn(" >> " + XlockUtil.nowDatetime() + " waitQueue size is 1 , so start AtomicLockCheckFreeTask ");
}
}
}
}
/**
* 把线程移出等待队列
*/
public void removeFromWaitQueue(Thread thread) {
synchronized (this.waitQueue) {
//移出等待队列
this.waitQueue.remove(thread);
//如果小于等于0,表示没有线程在等待,就不需要检查锁的状态
int size = this.waitQueue.size();
if (size <= 0) {
//终止任务
checkFreeTask.getFuture().cancel(true);
if (this.atomicLockManager.isEnableLogWarn()) {
logger.warn(" >> " + XlockUtil.nowDatetime() + " waitQueue size is 0 , so stop AtomicLockCheckFreeTask ");
}
}
}
}
/**
* 标记线程加锁成功
*/
public void markThreadSuccess(Thread thread) {
synchronized (successThreads) {
successThreads.add(thread);
}
}
/**
* 标记线程加锁失败
*/
public void markThreadFail(Thread thread) {
synchronized (successThreads) {
successThreads.remove(thread);
}
}
/**
* 线程是否加锁成功
*/
public boolean isThreadSuccess(Thread thread) {
synchronized (successThreads) {
return successThreads.contains(thread);
}
}
/**
* 触发某事件
*/
private void fireEventTask(XLockEventEnum event, Thread thread, AtomicLock lock) {
//如果没有监听器,就不需要触发
Set<XLockEventListener> listeners = eventListenerMap.get(event);
synchronized (listeners) {
if (listeners.isEmpty()) {
return;
}
}
//异步执行事件触发器
AtomicLockFireEventTask task = new AtomicLockFireEventTask(event, thread, lock);
XlockUtil.getScheduler().execute(task);
}
/**
* 尝试一次加锁。不阻塞,不等待。
* 加锁成功返回true,否则返回false。
*/
public boolean tryLock() throws Exception {
Thread thread = Thread.currentThread();
boolean result = this.atomicLockManager.tryLock(thread, this);
if (result) {
fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
} else {
fireEventTask(XLockEventEnum.LOCK_FAIL, thread, this);
}
return result;
}
/**
* 尝试一次加锁。
* 加锁成功,返回true。
* 加锁失败,就等待最大时长。最终加锁成功,返回true;否则,返回false。
*/
public boolean tryLock(long maxWaitMillis) throws Exception {
Thread thread = Thread.currentThread();
boolean result = this.atomicLockManager.tryLock(this, maxWaitMillis);
if (result) {
fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
} else {
fireEventTask(XLockEventEnum.LOCK_FAIL, thread, this);
}
return result;
}
/**
* 加锁。
* 加锁成功,返回true。
* 否则,一直等待,直到加锁成功。
*/
public void lock() throws Exception {
this.atomicLockManager.tryLock(this, 0);
Thread thread = Thread.currentThread();
fireEventTask(XLockEventEnum.LOCK_SUCCESS, thread, this);
}
/**
* 解锁
*/
public boolean unLock() throws Exception {
Thread thread = Thread.currentThread();
boolean result = this.atomicLockManager.unLock(this);
if (result) {
fireEventTask(XLockEventEnum.UNLOCK_SUCCESS, thread, this);
} else {
fireEventTask(XLockEventEnum.UNLOCK_FAIL, thread, this);
}
return result;
}
/**
* 唤醒正在等待此锁的线程
*/
public void signalAll() {
try {
this.reentrantLock.lock();
//唤醒等待线程
this.canTryLockCondition.signalAll();
} catch (Exception e) {
logger.error("signalAll", e);
} finally {
this.reentrantLock.unlock();
}
}
/**
* 启动线程保持锁
*/
public void startRemainTask(Thread thread) {
synchronized (this.remainTaskMap) {
AtomicLockRemainTask task = this.remainTaskMap.get(thread);
if (null == task) {
//保持锁的任务
task = new AtomicLockRemainTask();
task.setAtomicLockManager(this.atomicLockManager);
task.setAtomicLock(this);
task.setThread(thread);
//放入缓存
this.remainTaskMap.put(thread, task);
}
//定时器执行
Future taskFuture = XlockUtil.getScheduler().scheduleAtFixedRate(task, 1, this.atomicLockManager.getHeartbeatSeconds(), TimeUnit.SECONDS);
task.setFuture(taskFuture);
}
}
/**
* 停止线程保持锁
*/
public void stopRemainTask(Thread thread) {
synchronized (this.remainTaskMap) {
AtomicLockRemainTask task = this.remainTaskMap.remove(thread);
if (null != task) {
//取消任务
task.getFuture().cancel(true);
}
}
}
/**
* 找到线程的唯一键
*/
public String findThreadUUID(Thread thread) {
synchronized (this.threadUUIDMap) {
String uuid = this.threadUUIDMap.get(thread);
if (null == uuid) {
//创建线程的uuid
uuid = this.atomicLockManager.createThreadUUID(thread, this);
//放入缓存
this.threadUUIDMap.put(thread, uuid);
}
return uuid;
}
}
/**
* 删除线程的唯一键
*/
public void deleteThreadUUID(Thread thread) {
synchronized (this.threadUUIDMap) {
this.threadUUIDMap.remove(thread);
}
}
/**
* 线程的扩展对象
*/
public void setThreadExtend(Thread thread, Object obj) {
synchronized (this.threadExtendMap) {
this.threadExtendMap.put(thread, obj);
}
}
/**
* 线程的扩展对象
*/
public Object getThreadExtend(Thread thread) {
synchronized (this.threadExtendMap) {
return this.threadExtendMap.get(thread);
}
}
/**
* 线程的扩展对象
*/
public void deleteThreadExtend(Thread thread) {
synchronized (this.threadExtendMap) {
this.threadExtendMap.remove(thread);
}
}
/**
* 锁的名称
*/
public String getName() {
return name;
}
public AtomicLockManager getAtomicLockManager() {
return atomicLockManager;
}
public void setAtomicLockManager(AtomicLockManager atomicLockManager) {
this.atomicLockManager = atomicLockManager;
}
public Lock getReentrantLock() {
return reentrantLock;
}
public void setReentrantLock(Lock reentrantLock) {
this.reentrantLock = reentrantLock;
}
public Condition getCanTryLockCondition() {
return canTryLockCondition;
}
public void setCanTryLockCondition(Condition canTryLockCondition) {
this.canTryLockCondition = canTryLockCondition;
}
}
Tair是蚂蚁内部的缓存工具
【注意事项】
只能使用 ldb 。mdb可能导致不同的机房都写成功,但是值不一样。
【Tair version 方法】
ResultCode put(int namespace, Serializable key, Serializable value, int version, int expireTime)
初始version设置为大于1的整数,比如100 。多线程同时执行这个操作,只会有一个成功。
加锁成功后,客户端保存线程的这个锁的version,每次touch锁,本地version加1,和Tair锁的version保持一致。version一致,才能更新成功。
把线程的uuid写到Tair value中,则客户端和Tair锁都有同一个uuid。用uuid判断线程是否拥有这个锁。
【Tair incr 方法】
Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime, int lowBound, int upperBound)
value设为1,defaultValue设为0 。判断返回值,如果为1,成功;否则,失败。
tair 锁管理器。抽象父类,保存一些公共方法。
/**
* tair 锁管理器
*
* @author gumao
* @since 2016-11-16
*/
public abstract class AbstractTairLockManager extends AtomicLockManager {
private static Logger logger = LoggerFactory.getLogger(AbstractTairLockManager.class);
//超时次数的最大值
private static int maxTimeoutTimes = 3;
//tair namespace
protected int tairNamespace;
//建议使用ldb。不能使用mdb。
protected TairManager tairManager;
//重试次数
protected int tairRetryTimes = 3;
/**
* 是否超时
*/
protected boolean isTimeout(Result result) {
if (result == null) {
return true;
}
ResultCode code = result.getRc();
return this.isTimeout(code);
}
/**
* 是否超时
*/
protected boolean isTimeout(ResultCode code) {
if (ResultCode.CONNERROR.equals(code) || ResultCode.TIMEOUT.equals(code) || ResultCode.UNKNOW.equals(code)) {
return true;
}
return false;
}
/**
* 超时发生时,是否应该继续。
*/
protected boolean shouldTimeoutContinue(Result result, Integer timeoutCount) {
if (this.isTimeout(result)) {
timeoutCount++;
if (timeoutCount > maxTimeoutTimes) {
String log = ">> Tair timeoutCount beyond . maxTimeoutTimes=" + maxTimeoutTimes + " timeoutCount="
+ timeoutCount + " Result=" + result;
logger.error(log);
return false;
}
return true;
}
return false;
}
/**
* 超时发生时,是否应该继续。
*/
protected boolean shouldTimeoutContinue(ResultCode code, Integer timeoutCount) {
if (this.isTimeout(code)) {
timeoutCount++;
if (timeoutCount > maxTimeoutTimes) {
String log = ">> Tair timeoutCount beyond . maxTimeoutTimes=" + maxTimeoutTimes + " timeoutCount="
+ timeoutCount + " ResultCode=" + code;
logger.error(log);
return false;
}
return true;
}
return false;
}
/**
* 取版本号
*/
protected int getTairElementVersion(String key) {
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
Result<DataEntry> result = tairManager.get(tairNamespace, key);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = " Tair timeout key=" + key + " Result=" + result;
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != result && null != result.getValue()) {
DataEntry entry = result.getValue();
int version = entry.getVersion();
return version;
} else {
break;
}
} catch (Exception e) {
logger.error("getTairElementVersion", e);
continue;
}
}
String log = " Tair getTairElementVersion Error ! key=" + key;
logger.error(log);
return -1;
}
public int getTairNamespace() {
return tairNamespace;
}
public void setTairNamespace(int tairNamespace) {
this.tairNamespace = tairNamespace;
}
public TairManager getTairManager() {
return tairManager;
}
public void setTairManager(TairManager tairManager) {
this.tairManager = tairManager;
}
public int getTairRetryTimes() {
return tairRetryTimes;
}
public void setTairRetryTimes(int tairRetryTimes) {
this.tairRetryTimes = tairRetryTimes;
}
}
/**
* tair 锁管理器(通过 version 实现)
*
* @author gumao
* @since 2016-11-16
*/
public class TairLockManagerByVersion extends AbstractTairLockManager {
private static Logger logger = LoggerFactory.getLogger(TairLockManagerByVersion.class);
//除0和1之外的正整数。version初始值是0,写一次变成1,所以要排除这2个。
private static int VERSION_LOCK = 40000;
/**
* 加锁。
* 成功返回true,失败返回false
*/
protected boolean atomicLock(Thread thread, AtomicLock lock) {
//先看下是否已经有锁
boolean lockSuccess = this.atomicIsLocked(thread, lock);
//如果已经有锁,就直接返回成功
if (lockSuccess) {
return true;
}
//---------------------------------------------------
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
//线程的uuid
String uuid = lock.findThreadUUID(thread);
String val = uuid + "__" + XlockUtil.nowDatetime();
//用version put。当key不存在,此操作才会成功。
//把uuid作为锁值,方便匹配线程。
ResultCode ret = tairManager.put(tairNamespace, lockName, val, VERSION_LOCK, this.getSessionSeconds());
//---------------------------------------------------
//判断超时
if (this.isTimeout(ret)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + ret);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != ret && ret.isSuccess()) {
//加锁成功
lockSuccess = true;
} else {
//因为超时的原因,有可能tair已经加锁成功,但是返回值是失败,所以这里多校验一次。
lockSuccess = this.atomicIsLocked(thread, lock);
}
//---------------------------------------------------
//如果加锁成功
if (lockSuccess) {
//key被更新了一次,当前version是1
AtomicInteger version = new AtomicInteger(1);
//保存
lock.setThreadExtend(thread, version);
return true;
}
return false;
} catch (Exception e) {
logger.error("atomicLock", e);
continue;
}
}
return false;
}
/**
* 让锁的有效期延长。
* 返回线程是否有锁。
*/
protected boolean atomicTouch(Thread thread, AtomicLock lock) {
//线程是否有锁
boolean hasLock2 = this.atomicIsLocked(thread, lock);
//没有锁,就不能操作。
if (!hasLock2) {
return false;
}
//---------------------------------------------------
String lockName = lock.getName();
//线程对应的版本
AtomicInteger version = (AtomicInteger) lock.getThreadExtend(thread);
//没有version,表示已经主动解锁了
if (null == version) {
return false;
}
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
//线程的uuid
String uuid = lock.findThreadUUID(thread);
String val = uuid + "__" + XlockUtil.nowDatetime();
//用version put。version匹配成功,才能写成功
//把uuid作为锁值,方便匹配线程。
// if(version.get()%1000==0){
// logger.error(" localVersion="+version.get());
// }
ResultCode ret = tairManager.put(tairNamespace, lockName, val, version.get(), this.getSessionSeconds());
//---------------------------------------------------
//判断超时
if (this.isTimeout(ret)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + ret);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != ret && ret.isSuccess()) {
//版本加1
version.incrementAndGet();
return true;
} else {
//1、超时的原因,有可能tair已经更新成功,但是返回值是失败,所以这里多校验一次。
//2、tair version最大值是65534,之后会变成1 。所以重新设置version。
//线程是否有锁
boolean hasLock = this.atomicIsLocked(thread, lock);
if (hasLock) {
//取远程版本号
int remoteVersion = this.getTairElementVersion(lockName);
String log = this.buildLog(" Reset local version ", thread, lock,
" localVersion=" + version.get() + " remoteVersion=" + remoteVersion);
logger.error(log);
//更新版本号
version.set(remoteVersion);
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
return false;
}
/**
* 线程是否有锁。
*/
protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != result && null != result.getValue()) {
DataEntry entry = result.getValue();
String val = (String) entry.getValue();
//线程的uuid
String uuid = lock.findThreadUUID(thread);
//线程的uuid,匹配锁值。
if (null != val && val.startsWith(uuid)) {
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
return false;
}
/**
* 锁是否被锁住了。
*/
protected boolean atomicIsLocked(AtomicLock lock) {
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", null, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != result && null != result.getValue()) {
DataEntry entry = result.getValue();
String val = (String) entry.getValue();
//有值,就是锁被加了
if (null != val && val.length() > 0) {
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
return false;
}
/**
* 解锁
* 成功返回true,失败返回false
*/
protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
String lockName = lock.getName();
boolean unlockSuccess = false;
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
//先看线程是否有锁
boolean hasLock = this.atomicIsLocked(thread, lock);
//如果线程没有锁,就不能继续
if (!hasLock) {
return false;
}
//----------------------------------------------------------
//删除锁
ResultCode result = tairManager.invalid(this.tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (result != null && result.isSuccess()) {
unlockSuccess = true;
} else {
//因为超时的原因,有可能tair已经操作成功,但是返回值是失败,所以这里多校验一次。
//线程是否有锁
boolean hasLock2 = this.atomicIsLocked(thread, lock);
// 如果线程已经没有锁了,表示解锁成功了。
if (!hasLock2) {
unlockSuccess = true;
}
}
//---------------------------------------------------
//如果解锁成功
if (unlockSuccess) {
//删除线程的uuid。下次加锁的时候,可以重新生成uuid。
lock.deleteThreadUUID(thread);
//删除线程对应的锁version。下次加锁的时候,可以重新生成version。
lock.deleteThreadExtend(thread);
return true;
}
return false;
} catch (Exception e) {
logger.error("atomicUnlock", e);
continue;
}
}
return false;
}
}
/**
* tair 锁管理器(通过 incr 实现)
*
* @author gumao
* @since 2016-11-16
*/
public class TairLockManagerByIncr extends AbstractTairLockManager {
private static Logger logger = LoggerFactory.getLogger(TairLockManagerByIncr.class);
private static int INCR_DEFAULT = 0;
private static int LOCK_INCR_MIN = INCR_DEFAULT;
private static int LOCK_INCR_MAX = 5;
/**
* 加锁。
* 成功返回true,失败返回false
*/
protected boolean atomicLock(Thread thread, AtomicLock lock) {
//原子加1
Integer val = this.tairIncr(lock.getName(), 1);
//从0到1,表示加锁成功。其他,表示失败。
if (1 == val) {
return true;
}
return false;
}
/**
* 让锁的有效期延长。
* 返回线程是否有锁。
*/
protected boolean atomicTouch(Thread thread, AtomicLock lock) {
//加0。
Integer val = this.tairIncr(lock.getName(), 0);
//大于0,表示有锁
if (val > 0) {
return true;
}
return false;
}
/**
* 线程是否有锁。
*/
protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != result && null != result.getValue()) {
DataEntry entry = result.getValue();
Integer val = (Integer) entry.getValue();
if (null != val && val > 0) {
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
return false;
}
/**
* 锁是否被锁住了。
*/
protected boolean atomicIsLocked(AtomicLock lock) {
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
Result<DataEntry> result = tairManager.get(tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", null, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (null != result && null != result.getValue()) {
DataEntry entry = result.getValue();
Integer val = (Integer) entry.getValue();
//值为正,就是锁被加了
if (null != val && val > 0) {
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
return false;
}
/**
* 解锁
* 成功返回true,失败返回false
*/
protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
String lockName = lock.getName();
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
//先看线程是否有锁
boolean hasLock = this.atomicIsLocked(thread, lock);
//如果线程没有锁,就失败
if (!hasLock) {
return false;
}
//----------------------------------------------------------
//删除锁
ResultCode result = tairManager.invalid(this.tairNamespace, lockName);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", thread, lock, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (result != null && result.isSuccess()) {
return true;
}
return false;
} catch (Exception e) {
logger.error("atomicUnlock", e);
continue;
}
}
return false;
}
private Integer tairIncr(String key, int value) {
for (int i = 1; i <= tairRetryTimes; ++i) {
try {
//原子加数
Result<Integer> result = this.tairManager.incr(this.tairNamespace, key, value, INCR_DEFAULT, this.getSessionSeconds(),
LOCK_INCR_MIN, LOCK_INCR_MAX);
//---------------------------------------------------
//判断超时
if (this.isTimeout(result)) {
String log = this.buildLog(" Tair timeout ", null, null, " " + result);
logger.error(log);
continue;
}
//---------------------------------------------------
if (result != null && result.isSuccess()) {
return result.getValue();
}
return INCR_DEFAULT;
} catch (Exception e) {
logger.error("tairManager # incr", e);
continue;
}
}
return INCR_DEFAULT;
}
}
【 Redis set 方法】
public String set(final String key, final String value, final String nxxx, final String expx, final long time)
String nxxx, 使用这个参数。值如下。
NX – Only set the key if it does not already exist. 不存在时,操作才会成功。
XX – Only set the key if it already exist. 存在时,操作才会成功。
把线程的uuid写到Redis value中,则客户端和Redis锁都有同一个uuid。用uuid判断线程是否拥有这个锁。
依赖jedis jar版本,比这个版本大,应该也可以。当然也可以考虑用redission替换jedis实现一个版本
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.0</version>
</dependency>
/**
* redis 锁管理器
*
* @author gumao
* @since 2016-11-16
*/
public class RedisLockManager extends AtomicLockManager {
private static Logger logger = LoggerFactory.getLogger(RedisLockManager.class);
private static String REDIS_OK = "OK";
//redis 客户端池,线程安全
private JedisPool jedisPool;
//redis 库
private int jedisDBIndex = 0;
//重试次数
private int redisRetryTimes = 3;
//状态是成功
private static boolean isRedisSuccess(String statusCode) {
if (null != statusCode && REDIS_OK.equals(statusCode)) {
return true;
}
return false;
}
/**
* public String set(final String key, final String value, final String nxxx, final String expx, final long time)
*
* NX -- Only set the key if it does not already exist.
* XX -- Only set the key if it already exist.
*
* EX = seconds
* PX = milliseconds
*/
/**
* 加锁。
* 成功返回true,失败返回false
*/
protected boolean atomicLock(Thread thread, AtomicLock lock) {
Jedis jedis = null;
try {
String uuid = lock.findThreadUUID(thread);
jedis = jedisPool.getResource();
jedis.select(this.jedisDBIndex);
String lockName = lock.getName();
for (int i = 1; i <= redisRetryTimes; ++i) {
try {
//NX,如果key不存在,才能set成功。
String val = uuid + "__" + XlockUtil.nowDatetime();
//更新值和过期时间
String ret = jedis.set(lockName, val, "NX", "EX", this.getSessionSeconds());
//操作是否成功
boolean isSucc = isRedisSuccess(ret);
return isSucc;
} catch (Exception e) {
logger.error("atomicLock", e);
continue;
}
}
} catch (Exception e) {
logger.error("jedis", e);
} finally {
jedisPool.returnResource(jedis);
}
return false;
}
/**
* 让锁的有效期延长。
* 返回线程是否有锁。
*/
protected boolean atomicTouch(Thread thread, AtomicLock lock) {
Jedis jedis = null;
try {
String uuid = lock.findThreadUUID(thread);
jedis = jedisPool.getResource();
jedis.select(this.jedisDBIndex);
String lockName = lock.getName();
for (int i = 1; i <= redisRetryTimes; ++i) {
try {
//先校验当前线程是否有锁
boolean hasLock = this.atomicIsLocked(thread, lock);
//没有锁就不能后续操作
if (!hasLock) {
return false;
}
//-------------------------------------------------
//XX,如果key已经存在,才能set成功。
String val = uuid + "__" + XlockUtil.nowDatetime();
//更新值和过期时间
String ret = jedis.set(lockName, val, "XX", "EX", this.getSessionSeconds());
//操作是否成功
boolean isSucc = isRedisSuccess(ret);
return isSucc;
} catch (Exception e) {
logger.error(" xx", e);
continue;
}
}
} catch (Exception e) {
logger.error("jedis", e);
} finally {
jedisPool.returnResource(jedis);
}
return false;
}
/**
* 线程是否有锁。
*/
protected boolean atomicIsLocked(Thread thread, AtomicLock lock) {
Jedis jedis = null;
try {
String uuid = lock.findThreadUUID(thread);
jedis = jedisPool.getResource();
jedis.select(this.jedisDBIndex);
String lockName = lock.getName();
for (int i = 1; i <= redisRetryTimes; ++i) {
try {
//有值就表示被锁住了
String val = jedis.get(lockName);
if (null != val) {
//如果是线程uuid前缀,表示线程拥有锁
if (val.startsWith(uuid)) {
return true;
}
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
} catch (Exception e) {
logger.error("jedis", e);
} finally {
jedisPool.returnResource(jedis);
}
return false;
}
/**
* 锁是否被锁住了。
*/
protected boolean atomicIsLocked(AtomicLock lock) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.select(this.jedisDBIndex);
String lockName = lock.getName();
for (int i = 1; i <= redisRetryTimes; ++i) {
try {
//有值就表示被锁住了
String val = jedis.get(lockName);
//有值就表示被锁了
if (null != val) {
return true;
}
return false;
} catch (Exception e) {
logger.error("atomicIsLocked", e);
continue;
}
}
} catch (Exception e) {
logger.error("jedis", e);
} finally {
jedisPool.returnResource(jedis);
}
return false;
}
/**
* 解锁
* 成功返回true,失败返回false
*/
protected boolean atomicUnlock(Thread thread, AtomicLock lock) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.select(this.jedisDBIndex);
String lockName = lock.getName();
for (int i = 1; i <= redisRetryTimes; ++i) {
try {
//先看线程是否有锁
boolean hasLock = this.atomicIsLocked(thread, lock);
//如果线程没有锁,就失败
if (!hasLock) {
return false;
}
//----------------------------------------------------------
//解锁
Long ret = jedis.del(lockName);
//如果删除成功
if (null != ret && 1 == ret) {
//删除线程的uuid
lock.deleteThreadUUID(thread);
return true;
}
return false;
} catch (Exception e) {
logger.error("atomicUnlock", e);
continue;
}
}
} catch (Exception e) {
logger.error("jedis", e);
} finally {
jedisPool.returnResource(jedis);
}
return false;
}
public JedisPool getJedisPool() {
return jedisPool;
}
public void setJedisPool(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
public int getRedisRetryTimes() {
return redisRetryTimes;
}
public void setRedisRetryTimes(int redisRetryTimes) {
this.redisRetryTimes = redisRetryTimes;
}
public int getJedisDBIndex() {
return jedisDBIndex;
}
public void setJedisDBIndex(int jedisDBIndex) {
this.jedisDBIndex = jedisDBIndex;
}
}
未实现,待补充
注意:如下的技术要点,并不是每个版本的分布式锁都会用到。比如,zookeeper内置了连接和状态管理,就不需要调度线程池执行保持锁的任务。
“线程+锁” 标识一个分布式锁单元。
底层实现代码中,许多方法都有 thread 和 lock 参数。
【Tair version】
ResultCode put(int namespace, Serializable key, Serializable value, int version, int expireTime)
初始version设置为大于1的整数,比如100 。多线程同时执行这个操作,只会有一个成功。
【Tair incr】
Result incr(int namespace, Serializable key, int value, int defaultValue, int expireTime, int lowBound, int upperBound)
value设为1,defaultValue设为0 。判断返回值,如果为1,成功;否则,失败。
【Redis set】
public String set(final String key, final String value, final String nxxx, final String expx, final long time)
String nxxx, 使用这个参数。值如下。
NX – Only set the key if it does not already exist. 不存在时,操作才会成功。
XX – Only set the key if it already exist. 存在时,操作才会成功。
【Zookeeper】
Zookeeper : 序列子节点创建,同步通知。(暂时略)
【Mysql : 乐观锁更新】
update table_lock set expire_time = aaa , version_field = bbb+1, lock_value = eee where lock_key = ccc and version_field = bbb
【客户端的锁对象】
一个客户端的锁对象,存在于一个进程中。
锁对象保存许多信息。比如,线程的保持锁任务,线程的自定义uuid,线程扩展信息,线程等待队列,等。
锁的线程等待队列,使用在等待性加锁。实现线程阻塞和唤醒,最大等待时长。
【调度线程池】
使用 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
【等待性加锁的控制】
等待性加锁: tryLock(long maxWaitMillis) , lock()
ReentrantLock和Condition。一个锁创建一个"可以尝试加锁的条件",canTryLockCondition。
await和signalAll。线程等待性加锁,失败就放入锁的等待队列,并且await。某线程解锁或检测到锁可用,就signalAll,唤醒等待队列,都尝试加锁。
zookeeper,用tcp连接,心跳,会话。
其他,通过 “定时任务+心跳+会话” 来实现。会话时长,就是锁的有效期,是心跳时长*N 。加锁成功后,创建保持锁的定时任务,间隔心跳时长来执行,重置锁的会话时长。如果机器挂掉,或网络故障,锁超过有效期,就失效,然后丢失锁。
自定义事件枚举XLockEventEnum,和监听器XLockEventListener。
需要注意的是事件LOSE_LOCK(“被动丢失锁”)。被动指,不是主动的unlock,但是丢失了锁。比如网络断了30秒,锁已经失效,然后被别的机器上的线程加锁成功,之前的线程就被动丢失了锁。业务方根据需要加点逻辑处理,比如打印日志,终止业务流程,尝试重新加锁。
【超时策略】
【封装性和易用性】
内部逻辑还是比较复杂的。上层提供接口,方便替换不同的实现方式。
提供给外部的功能方法,少而简单,易用性好。
【测试】
功能点比较多,测试起来不容易。
目前主要依靠自己测试。
缺少复杂业务场景,需要更多检验。
logback的pom配置有点繁琐。如下是一个logback配置。
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>
如下是一点测试
>> 2016-11-24 10:52:14,298 waitQueue size is 1 , so start AtomicLockCheckFreeTask
>> 2016-11-24 10:52:14,386 tryLock lockResult=true thread=TXX_1 lock=lockf extra=
>> 2016-11-24 10:52:14,386 waitQueue size is 0 , so stop AtomicLockCheckFreeTask
>> 2016-11-24 10:52:14,388 waitQueue size is 1 , so start AtomicLockCheckFreeTask
### listener 2016-11-24 10:52:14,394 LOCK_SUCCESS TXX_1 lockf
>> 2016-11-24 10:52:14,419 tryLock lockResult=false thread=TXX_2 lock=lockf extra=
>> 2016-11-24 10:52:14,459 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
>> 2016-11-24 10:52:15,397 remainLock thread=TXX_1 lock=lockf isLocked=true
### listener 2016-11-24 10:52:16,409 UNLOCK_SUCCESS TXX_1 lockf
>> 2016-11-24 10:52:16,422 tryLock lockResult=true thread=TXX_2 lock=lockf extra=
### listener 2016-11-24 10:52:16,422 LOCK_SUCCESS TXX_2 lockf
>> 2016-11-24 10:52:16,439 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
>> 2016-11-24 10:52:17,461 remainLock thread=TXX_2 lock=lockf isLocked=true
### listener 2016-11-24 10:52:18,475 UNLOCK_SUCCESS TXX_2 lockf
>> 2016-11-24 10:52:18,527 tryLock lockResult=true thread=TXX_3 lock=lockf extra=
>> 2016-11-24 10:52:18,527 waitQueue size is 0 , so stop AtomicLockCheckFreeTask
### listener 2016-11-24 10:52:18,528 LOCK_SUCCESS TXX_3 lockf
>> 2016-11-24 10:52:19,546 remainLock thread=TXX_3 lock=lockf isLocked=true
### listener 2016-11-24 10:52:20,562 UNLOCK_SUCCESS TXX_3 lockf
>> 2016-11-24 10:52:26,414 waitQueue size is 1 , so start AtomicLockCheckFreeTask
>> 2016-11-24 10:52:26,428 tryLock lockResult=true thread=TXX_1 lock=lockf extra=
>> 2016-11-24 10:52:26,428 waitQueue size is 0 , so stop AtomicLockCheckFreeTask
### listener 2016-11-24 10:52:26,429 LOCK_SUCCESS TXX_1 lockf
>> 2016-11-24 10:52:27,445 remainLock thread=TXX_1 lock=lockf isLocked=true
>> 2016-11-24 10:52:28,480 waitQueue size is 1 , so start AtomicLockCheckFreeTask
>> 2016-11-24 10:52:28,516 tryLock lockResult=false thread=TXX_2 lock=lockf extra=
>> 2016-11-24 10:52:29,482 remainLock thread=TXX_1 lock=lockf isLocked=true
>> 2016-11-24 10:52:30,596 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
>> 2016-11-24 10:52:31,444 remainLock thread=TXX_1 lock=lockf isLocked=true
>> 2016-11-24 10:52:31,907 AtomicLockCheckFreeTask lock=lockf isLocked=false . So signalAll to try lock !
>> 2016-11-24 10:52:31,926 tryLock lockResult=true thread=TXX_2 lock=lockf extra=
### listener 2016-11-24 10:52:31,926 LOCK_SUCCESS TXX_2 lockf
>> 2016-11-24 10:52:31,943 tryLock lockResult=false thread=TXX_3 lock=lockf extra=
>> 2016-11-24 10:52:32,939 remainLock thread=TXX_2 lock=lockf isLocked=true
>> 2016-11-24 10:52:33,438 remainLock thread=TXX_1 lock=lockf isLocked=false
### listener 2016-11-24 10:52:33,456 LOSE_LOCK TXX_1 lockf
==============================
AtomicLockRemainTask # lose lock , so retry lock again
now =2016-11-24 10:52:33,456
thread =TXX_1 24
lock =lockf
extra = >> retry lock fail , then cancel task , this may be normal ! Please find reason !
==============================
>> 2016-11-24 10:52:34,940 remainLock thread=TXX_2 lock=lockf isLocked=true
>> 2016-11-24 10:52:36,941 remainLock thread=TXX_2 lock=lockf isLocked=true
>> 2016-11-24 10:52:38,938 remainLock thread=TXX_2 lock=lockf isLocked=true
针对分布式锁的实现,目前比较常用的有以下几种方案:
在分析这几种实现方案之前我们先来想一下,我们需要的分布式锁应该是怎么样的?(这里以方法锁为例,资源锁同理)
可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好
【基于数据库表】
要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
创建这样一张数据库表:
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
当我们想要锁住某个方法时,执行以下SQL:
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
delete from methodLock where method_name ='method_name'
上面这种简单的实现有以下几个问题:
1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是不可重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
当然,我们也可以有其他方式解决上面的问题。
【基于数据库排他锁】
除了可以通过增删操作数据表中的记录以外,其实还可以借助数据中自带的锁来实现分布式的锁。
我们还用刚刚创建的那张数据库表。可以通过数据库的排他锁来实现分布式锁。
基于MySql的InnoDB引擎,可以使用以下方法来实现加锁操作:
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。(这里再多提一句,InnoDB引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给method_name添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上)
我们可以认为获得排它锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,再通过以下方法解锁:
public void unlock(){
connection.commit();
}
通过connection.commit()操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
这里还可能存在另外一个问题,虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。。。
还有一个问题,就是我们要使用排他锁来进行分布式锁的lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。
【数据库锁方案总结】
总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是通过表中的记录的存在情况确定当前是否有锁存在,另外一种是通过数据库的排他锁来实现分布式锁。
相比较于基于数据库实现分布式锁的方案来说,基于缓存来实现在性能方面会表现的更好一点,而且很多缓存是可以集群部署的,可以解决单点问题。
目前有很多成熟的缓存产品,包括Redis,memcached以及阿里内部的Tair。
这里以Tair为例来分析下使用缓存实现分布式锁的方案。关于Redis和memcached在网络上有很多相关的文章,并且也有一些成熟的框架及算法可以直接使用。
基于Tair的实现分布式锁在内网中有很多相关文章,其中主要的实现方式是使用TairManager.put方法来实现。
public boolean trylock(String key) {
ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0);
if (ResultCode.SUCCESS.equals(code))
return true;
else
return false;
}
public boolean unlock(String key) {
ldbTairManager.invalid(NAMESPACE, key);
}
以上实现方式同样存在几个问题:
当然,同样有方式可以解决。
【缓存分布式锁方案总结】
可以使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。
使用缓存实现分布式锁的优点
性能好,实现起来较为方便。
使用缓存实现分布式锁的缺点
通过超时时间来控制锁的失效时间并不是十分的靠谱。
基于zookeeper临时有序节点可以实现的分布式锁。
大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。
当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
来看下Zookeeper能不能解决前面提到的问题。
可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
public boolean tryLock(long timeout, TimeUnit unit) throws Exception {
return interProcessMutex.acquire(timeout, unit);
}
public void unlock() throws Exception {
interProcessMutex.release();
}
Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
使用ZK实现的分布式锁好像完全符合了本文开头我们对一个分布式锁的所有期望。但是,其实并不是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。
其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)
【Zookeeper实现分布式锁方案总结】
上面几种方式,哪种方式都无法做到完美。就像CAP一样,在复杂性、可靠性、性能等方面无法同时满足,所以,根据不同的应用场景选择最适合自己的才是王道。
从理解的难易程度角度(从低到高)
数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高)
Zookeeper >= 缓存 > 数据库
从性能角度(从高到低)
缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低)
Zookeeper > 缓存 > 数据库