单机环境下可以利用jvm级别的锁,比如synchronized、Lock等来实现锁,如果是多机部署就需要一个共享数据存储区域来实现分布式锁
可以用数据库唯一索引来实现

redis实现的分布式锁始终会有一些问题,即便使用多数写入,主节点挂了,数据丢失还是会存在加锁问题,就是主节点宕机,客户端无法感知

使用临时节点创建成功获取锁,否则监听临时节点,有个问题,比如1000个线程只有一个会加锁成功,当删除临时节点时999个线程都会去竞争

公平锁的实现

从InterProcessMutex类找到acquire()加锁方法
- public void acquire() throws Exception {
- if (!this.internalLock(-1L, (TimeUnit)null)) {
- throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
- }
- }
- private boolean internalLock(long time, TimeUnit unit) throws Exception {
- // 获取当前线程
- Thread currentThread = Thread.currentThread();
- // threadData类型是ConcurrentMap,从threadData中去拿LockData加锁对象
- InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
- // 如果拿到了 证明之前加锁了,lockCount重入次数+1
- if (lockData != null) {
- lockData.lockCount.incrementAndGet();
- return true;
- } else {
- // 没有拿到开始从zookeeper上创建lock节点
- String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
- // 创建成功 加锁成功把对象放到threadData中
- if (lockPath != null) {
- InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
- this.threadData.put(currentThread, newLockData);
- return true;
- } else {
- // 加锁失败
- return false;
- }
- }
- }
-
- private static class LockData {
- // 持有锁的线程
- final Thread owningThread;
- // 在zookeeper的锁路径
- final String lockPath;
- // 线程加锁次数
- final AtomicInteger lockCount;
-
- private LockData(Thread owningThread, String lockPath) {
- this.lockCount = new AtomicInteger(1);
- this.owningThread = owningThread;
- this.lockPath = lockPath;
- }
- }
- String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
- long startMillis = System.currentTimeMillis();
- Long millisToWait = unit != null ? unit.toMillis(time) : null;
- byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
- // 重试次数
- int retryCount = 0;
- String ourPath = null;
- boolean hasTheLock = false;
- boolean isDone = false;
-
- while(!isDone) {
- isDone = true;
-
- try {
- // 创建临时有序节点
- ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
- // 创建的节点是否为最小节点
- hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
- } catch (NoNodeException var14) {
- // 加锁失败 重试设置重试策略
- if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
- throw var14;
- }
-
- isDone = false;
- }
- }
-
- return hasTheLock ? ourPath : null;
- }
- public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
- String ourPath;
- // 是否要给节点设置属性 创建的都是临时有序节点
- if (lockNodeBytes != null) {
- ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
- } else {
- ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
- }
-
- return ourPath;
- }
- private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
- boolean haveTheLock = false;
- boolean doDelete = false;
-
- try {
- if (this.revocable.get() != null) {
- ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
- }
-
- while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
- // 将子节点进行排序
- List
children = this.getSortedChildren(); - // 截取创建的节点的序号
- String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
- // 判断是否为最小序号
- PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
- if (predicateResults.getsTheLock()) {
- // 加锁成功
- haveTheLock = true;
- } else {
- // 拿到上一个节点的路径
- String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
- synchronized(this) {
- try {
- // 监听上一个节点
- ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
- if (millisToWait == null) {
- // 等待
- this.wait();
- } else {
- millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
- startMillis = System.currentTimeMillis();
- if (millisToWait > 0L) {
- // 超时等待
- this.wait(millisToWait);
- } else {
- doDelete = true;
- break;
- }
- }
- } catch (NoNodeException var19) {
- }
- }
- }
- }
- } catch (Exception var21) {
- ThreadUtils.checkInterrupted(var21);
- doDelete = true;
- throw var21;
- } finally {
- if (doDelete) {
- this.deleteOurPath(ourPath);
- }
-
- }
-
- return haveTheLock;
- }
- public void release() throws Exception {
- Thread currentThread = Thread.currentThread();
- InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
- if (lockData == null) {
- // 分布式场景下什么情况都可能有 所以判断一下
- throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
- } else {
- // 重入次数减1
- int newLockCount = lockData.lockCount.decrementAndGet();
- if (newLockCount <= 0) {
- if (newLockCount < 0) {
- throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
- } else {
- try {
- // 这里之前应该还有个大于0的判断 在curator5.1.0&zookeeper 3.9.0版本去掉了
- this.internals.releaseLock(lockData.lockPath);
- } finally {
- this.threadData.remove(currentThread);
- }
-
- }
- }
- }
- }
优点:Zookeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题
缺点:因为需要频繁的创建和删除节点,性能上不如redis
在高性能、高并发场景下,不建议用Zookeeper的分布式锁。而由于Zookeeper的高可靠性,因此在并发量不是太高的应用场景下,还是推荐使用Zookeeper的分布式锁

优点:
缺点:
第一步:在父pom文件中指定Spring Cloud版本
- <parent>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-parentartifactId>
- <version>2.3.2.RELEASEversion>
- <relativePath/>
- parent>
- <properties>
- <java.version>1.8java.version>
- <spring-cloud.version>Hoxton.SR8spring-cloud.version>
- properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-dependenciesartifactId>
- <version>${spring-cloud.version}version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- dependencyManagement>
注意: springboot和springcloud的版本兼容问题
第二步:微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖
- <dependency>
- <groupId>org.springframework.cloudgroupId>
- <artifactId>spring-cloud-starter-zookeeper-discoveryartifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.zookeepergroupId>
- <artifactId>zookeeperartifactId>
- exclusion>
- exclusions>
- dependency>
-
- <dependency>
- <groupId>org.apache.zookeepergroupId>
- <artifactId>zookeeperartifactId>
- <version>3.8.0version>
- dependency>
注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题
Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration
第三步: 微服务配置文件application.yml中配置zookeeper注册中心地址
- spring:
- cloud:
- zookeeper:
- connect-string: localhost:2181
- discovery:
- instance-host: 127.0.0.1
注册到zookeeper的服务实例元数据信息如下:

注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定
第四步:整合feign进行服务调用
- @RequestMapping(value = "/findOrderByUserId/{id}")
- public R findOrderByUserId(@PathVariable("id") Integer id) {
- log.info("根据userId:"+id+"查询订单信息");
- //feign调用
- R result = orderFeignService.findOrderByUserId(id);
- return result;
- }