• zookeeper应用场景(二)


    单机环境下可以利用jvm级别的锁,比如synchronized、Lock等来实现锁,如果是多机部署就需要一个共享数据存储区域来实现分布式锁

    一、分布式锁实现方式

    1、基于数据库实现分布式锁

    可以用数据库唯一索引来实现

    2、基于redis实现分布式锁

    redis实现的分布式锁始终会有一些问题,即便使用多数写入,主节点挂了,数据丢失还是会存在加锁问题,就是主节点宕机,客户端无法感知

    3、基于zookeeper实现分布式锁

    1)实现方式一

    使用临时节点创建成功获取锁,否则监听临时节点,有个问题,比如1000个线程只有一个会加锁成功,当删除临时节点时999个线程都会去竞争

    2)实现方式二

    公平锁的实现

    4、Curator可重入分布式锁工作流程

    从InterProcessMutex类找到acquire()加锁方法

    1. public void acquire() throws Exception {
    2. if (!this.internalLock(-1L, (TimeUnit)null)) {
    3. throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
    4. }
    5. }
    1)加锁 
    1. private boolean internalLock(long time, TimeUnit unit) throws Exception {
    2. // 获取当前线程
    3. Thread currentThread = Thread.currentThread();
    4. // threadData类型是ConcurrentMap,从threadData中去拿LockData加锁对象
    5. InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
    6. // 如果拿到了 证明之前加锁了,lockCount重入次数+1
    7. if (lockData != null) {
    8. lockData.lockCount.incrementAndGet();
    9. return true;
    10. } else {
    11. // 没有拿到开始从zookeeper上创建lock节点
    12. String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
    13. // 创建成功 加锁成功把对象放到threadData中
    14. if (lockPath != null) {
    15. InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
    16. this.threadData.put(currentThread, newLockData);
    17. return true;
    18. } else {
    19. // 加锁失败
    20. return false;
    21. }
    22. }
    23. }
    24. private static class LockData {
    25. // 持有锁的线程
    26. final Thread owningThread;
    27. // 在zookeeper的锁路径
    28. final String lockPath;
    29. // 线程加锁次数
    30. final AtomicInteger lockCount;
    31. private LockData(Thread owningThread, String lockPath) {
    32. this.lockCount = new AtomicInteger(1);
    33. this.owningThread = owningThread;
    34. this.lockPath = lockPath;
    35. }
    36. }
     2)创建节点返回路径
    1. String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
    2. long startMillis = System.currentTimeMillis();
    3. Long millisToWait = unit != null ? unit.toMillis(time) : null;
    4. byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
    5. // 重试次数
    6. int retryCount = 0;
    7. String ourPath = null;
    8. boolean hasTheLock = false;
    9. boolean isDone = false;
    10. while(!isDone) {
    11. isDone = true;
    12. try {
    13. // 创建临时有序节点
    14. ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
    15. // 创建的节点是否为最小节点
    16. hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
    17. } catch (NoNodeException var14) {
    18. // 加锁失败 重试设置重试策略
    19. if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
    20. throw var14;
    21. }
    22. isDone = false;
    23. }
    24. }
    25. return hasTheLock ? ourPath : null;
    26. }
    1. public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
    2. String ourPath;
    3. // 是否要给节点设置属性 创建的都是临时有序节点
    4. if (lockNodeBytes != null) {
    5. ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
    6. } else {
    7. ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
    8. }
    9. return ourPath;
    10. }
    1. private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    2. boolean haveTheLock = false;
    3. boolean doDelete = false;
    4. try {
    5. if (this.revocable.get() != null) {
    6. ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
    7. }
    8. while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
    9. // 将子节点进行排序
    10. List children = this.getSortedChildren();
    11. // 截取创建的节点的序号
    12. String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
    13. // 判断是否为最小序号
    14. PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
    15. if (predicateResults.getsTheLock()) {
    16. // 加锁成功
    17. haveTheLock = true;
    18. } else {
    19. // 拿到上一个节点的路径
    20. String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
    21. synchronized(this) {
    22. try {
    23. // 监听上一个节点
    24. ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
    25. if (millisToWait == null) {
    26. // 等待
    27. this.wait();
    28. } else {
    29. millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
    30. startMillis = System.currentTimeMillis();
    31. if (millisToWait > 0L) {
    32. // 超时等待
    33. this.wait(millisToWait);
    34. } else {
    35. doDelete = true;
    36. break;
    37. }
    38. }
    39. } catch (NoNodeException var19) {
    40. }
    41. }
    42. }
    43. }
    44. } catch (Exception var21) {
    45. ThreadUtils.checkInterrupted(var21);
    46. doDelete = true;
    47. throw var21;
    48. } finally {
    49. if (doDelete) {
    50. this.deleteOurPath(ourPath);
    51. }
    52. }
    53. return haveTheLock;
    54. }
     3)解锁
    1. public void release() throws Exception {
    2. Thread currentThread = Thread.currentThread();
    3. InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
    4. if (lockData == null) {
    5. // 分布式场景下什么情况都可能有 所以判断一下
    6. throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
    7. } else {
    8. // 重入次数减1
    9. int newLockCount = lockData.lockCount.decrementAndGet();
    10. if (newLockCount <= 0) {
    11. if (newLockCount < 0) {
    12. throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
    13. } else {
    14. try {
    15. // 这里之前应该还有个大于0的判断 在curator5.1.0&zookeeper 3.9.0版本去掉了
    16. this.internals.releaseLock(lockData.lockPath);
    17. } finally {
    18. this.threadData.remove(currentThread);
    19. }
    20. }
    21. }
    22. }
    23. }

     

    5、总结

    优点:Zookeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题

    缺点:因为需要频繁的创建和删除节点,性能上不如redis

    在高性能、高并发场景下,不建议用Zookeeper的分布式锁。而由于Zookeeper的高可靠性,因此在并发量不是太高的应用场景下,还是推荐使用Zookeeper的分布式锁

    二、服务注册与发现

    1、设计思路

    2、实现注册中心的优缺点

    优点:

    • 高可用性:ZooKeeper是一个高可用的分布式系统,可以通过配置多个服务器实例来提供容错能力。如果其中一个实例出现故障,其他实例仍然可以继续提供服务。
    • 强一致性:ZooKeeper保证了数据的强一致性。当一个更新操作完成时,所有的服务器都将具有相同的数据视图。这使得ZooKeeper非常适合作为服务注册中心,因为可以确保所有客户端看到的服务状态是一致的。
    • 实时性:ZooKeeper的监视器(Watcher)机制允许客户端监听节点的变化。当服务提供者的状态发生变化时(例如,上线或下线),客户端会实时收到通知。这使得服务消费者能够快速响应服务的变化,从而实现动态服务发现。

    缺点:

    • 性能限制:ZooKeeper的性能可能不如一些专为服务注册中心设计的解决方案,如nacos或Consul。尤其是在大量的读写操作或大规模集群的情况下,ZooKeeper可能会遇到性能瓶颈。

    3、整合Spring Cloud Zookeeper实现微服务注册中心 

    Spring Cloud Zookeeper

    第一步:在父pom文件中指定Spring Cloud版本

    1. <parent>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-parentartifactId>
    4. <version>2.3.2.RELEASEversion>
    5. <relativePath/>
    6. parent>
    7. <properties>
    8. <java.version>1.8java.version>
    9. <spring-cloud.version>Hoxton.SR8spring-cloud.version>
    10. properties>
    11. <dependencyManagement>
    12. <dependencies>
    13. <dependency>
    14. <groupId>org.springframework.cloudgroupId>
    15. <artifactId>spring-cloud-dependenciesartifactId>
    16. <version>${spring-cloud.version}version>
    17. <type>pomtype>
    18. <scope>importscope>
    19. dependency>
    20. dependencies>
    21. dependencyManagement>

    注意: springboot和springcloud的版本兼容问题

    第二步:微服务pom文件中引入Spring Cloud Zookeeper注册中心依赖

    1. <dependency>
    2. <groupId>org.springframework.cloudgroupId>
    3. <artifactId>spring-cloud-starter-zookeeper-discoveryartifactId>
    4. <exclusions>
    5. <exclusion>
    6. <groupId>org.apache.zookeepergroupId>
    7. <artifactId>zookeeperartifactId>
    8. exclusion>
    9. exclusions>
    10. dependency>
    11. <dependency>
    12. <groupId>org.apache.zookeepergroupId>
    13. <artifactId>zookeeperartifactId>
    14. <version>3.8.0version>
    15. dependency>

    注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题

    Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration

    第三步: 微服务配置文件application.yml中配置zookeeper注册中心地址

    1. spring:
    2. cloud:
    3. zookeeper:
    4. connect-string: localhost:2181
    5. discovery:
    6. instance-host: 127.0.0.1

    注册到zookeeper的服务实例元数据信息如下:

    注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定

    第四步:整合feign进行服务调用

    1. @RequestMapping(value = "/findOrderByUserId/{id}")
    2. public R findOrderByUserId(@PathVariable("id") Integer id) {
    3. log.info("根据userId:"+id+"查询订单信息");
    4. //feign调用
    5. R result = orderFeignService.findOrderByUserId(id);
    6. return result;
    7. }
  • 相关阅读:
    leetcode669. 修剪二叉搜索树(java)
    线性回归(概念+实例)
    基于Hadoop平台的大数据可视化分析实现与应用
    【全开源】JAVA打车小程序APP打车顺风车滴滴车跑腿源码微信小程序打车源码
    RabbitMQ的工作模式
    vcenter跨版本升级
    深度学习与总结JVM专辑(二):垃圾回收基础(图文+代码)
    详解FreeRTOS:FreeRTOS任务调度器开启过程源码分析(进阶篇—5)
    理解JS函数之call,apply,bind
    Rust 语法
  • 原文地址:https://blog.csdn.net/gp3056/article/details/133840229