顾名思义,分布式锁,就是分布式系统中的锁,是为了解决分布式场景下控制一些共享资源访问的问题,即某时刻同一个方法只有一个线程在运行。
分布式锁的设计原则:互斥性、原子性、安全性、容错性、可重用性、公平性、高可用性、高性能、持久性、支持阻塞和非阻塞
实现方式:
1. 基于数据库排他锁
2. 基于表的唯一索引
缺点:性能比较差
实现方式:
1. setnx+expire命令(错误):
直接分开使用命令其实是有问题的,因为不具备原子性,若执行了setnx的时候程序异常了,那么锁将永远不会过期
2. lua脚本:
通过lua脚本保证setnx和expire的原子性
3. SET key uniqueId [EX time] [NX|XX]
- nx:当key不存在时设置值
- xx:当key存在时设置值
4. 使用实现redlock算法的redisson
缺点:CAP中满足AP模型,存在一致性问题
缺点:性能上不如使用缓存实现的分布式锁,因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能
etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据TTL失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群节点的状态
因为 etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序
保持独占,即所有试图获取锁的用户最终只有一个可以得到。etcd为此提供了一套实现分布式锁原子操作CAS(CompareAndSwap)的API。通过设置prevExist值,可以保证在多个节点同时创建某个目录时,只有一个成功,而该用户即可认为是获得了锁。
控制时序,即所有试图获取锁的用户都会进入等待队列,获得锁的顺序是全局唯一的,同时决定了队列执行顺序。etcd为此也提供了一套API(自动创建有序键),对一个目录建值时指定为POST动作,这样etcd会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用API按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号
实现方式:
实现方式:
public class SharedReentrantLockTest {
private static final String lockPath = "/test/sharedreentrantlock";
private static final Integer clientNums = 5;
final static FakeLimitedResource resource = new FakeLimitedResource(); // 共享的资源
private static CountDownLatch countDownLatch = new CountDownLatch(clientNums);
@Autowired
private static BaseZookeeper baseZookeeper;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client= CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
Random random = new Random();
try {
final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
// 每个客户端请求10次共享资源
for (int j = 0; j < 10; j++) {
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能得到互斥锁");
}
try {
System.out.println(j + ". " + clientName + " 已获取到互斥锁");
resource.use(); // 使用资源
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(j + ". " + clientName + " 不能再次得到互斥锁");
}
System.out.println(j + ". " + clientName + " 已再次获取到互斥锁");
lock.release(); // 申请几次锁就要释放几次锁
} finally {
System.out.println(j + ". " + clientName + " 释放互斥锁");
lock.release();
}
Thread.sleep(random.nextInt(100));
}
} catch (Throwable e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
System.out.println(clientName + " client close!");
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("ending");
}
}
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
// 模拟只能单线程操作的资源
public void use() throws InterruptedException {
if (!inUse.compareAndSet(false, true)) {
// 在正确使用锁的情况下,此异常不可能抛出
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (100 * Math.random()));
} finally {
inUse.set(false);
}
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < clientNums; i++) {
final String clientName = "client#" + i;
new Thread(new Runnable() {
@Override
public void run() {
//CuratorFramework client = ZKUtils.getClient();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client= CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
final InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
final InterProcessMutex readLock = lock.readLock();
final InterProcessMutex writeLock = lock.writeLock();
try {
// 注意只能先得到写锁再得到读锁,不能反过来
if (!writeLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到写锁");
}
System.out.println(clientName + " 已得到写锁");
if (!readLock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(clientName + " 不能得到读锁");
}
System.out.println(clientName + " 已得到读锁");
try {
resource.use(); // 使用资源
} finally {
System.out.println(clientName + " 释放读写锁");
readLock.release();
writeLock.release();
}
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
CloseableUtils.closeQuietly(client);
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
System.out.println("ending。。。");
}
public class SharedSemaphoreTest {
private static final int MAX_LEASE = 10;
private static final String PATH = "/test/semaphore";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
Collection<Lease> leases = semaphore.acquire(5);
System.out.println("获取租约数量:" + leases.size());
Lease lease = semaphore.acquire();
System.out.println("获取单个租约");
resource.use(); // 使用资源
// 再次申请获取5个leases,此时leases数量只剩4个,不够,将超时
Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("获取租约,如果超时将为null: " + leases2);
System.out.println("释放租约");
semaphore.returnLease(lease);
// 再次申请获取5个,这次刚好够
leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
System.out.println("获取租约,如果超时将为null: " + leases2);
System.out.println("释放集合中的所有租约");
semaphore.returnAll(leases);
semaphore.returnAll(leases2);
client.close();
System.out.println("结束!");
}
}
public class MultiSharedLockTest {
private static final String lockPath1 = "/test/MSLock1";
private static final String lockPath2 = "/test/MSLock2";
private static final FakeLimitedResource resource = new FakeLimitedResource();
public static void main(String[] args) throws Exception {
CuratorFramework client = ZKUtils.getClient();
client.start();
InterProcessLock lock1 = new InterProcessMutex(client, lockPath1); // 可重入锁
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, lockPath2); // 不可重入锁
// 组锁,多锁
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("不能获取多锁");
}
System.out.println("已获取多锁");
System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
try {
resource.use(); // 资源操作
} finally {
System.out.println("释放多个锁");
lock.release(); // 释放多锁
}
System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess());
System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess());
client.close();
System.out.println("结束!");
}
}
ZAB(Zookeeper Atomic Broadcast) 协议是为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议。在zookeeper中,主要依赖ZAB协议来实现分布式数据一致性,基于该协议,zookeeper实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性
ZAB协议包含两种基本模式:
在崩溃恢复完成选举以后,就开始进行数据同步了,在选举过程中,通过投票已经确认 Leader 服务器是最大Zxid 的节点,同步阶段就是利用 Leader 前一阶段获得的最新Proposal历史,同步集群中所有的副本