• nacos 源码阅读 发布订阅模式


     nacos中很多地方都用到了发布订阅模式,这点我们可以从Subscriber继承树上来看到

     这里发布订阅模式的使用也很有借鉴意义,我们首先看下核心代码

    1. package com.alibaba.nacos.common.notify;
    2. import com.alibaba.nacos.common.notify.listener.Subscriber;
    3. import com.alibaba.nacos.common.utils.CollectionUtils;
    4. import com.alibaba.nacos.common.utils.ConcurrentHashSet;
    5. import com.alibaba.nacos.common.utils.ThreadUtils;
    6. import org.slf4j.Logger;
    7. import org.slf4j.LoggerFactory;
    8. import java.util.concurrent.ArrayBlockingQueue;
    9. import java.util.concurrent.BlockingQueue;
    10. import java.util.concurrent.Executor;
    11. import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
    12. import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize;
    13. /**
    14. * The default event publisher implementation.
    15. *
    16. *

      Internally, use {@link ArrayBlockingQueue } as a message staging queue.

    17. *
    18. * @author liaochuntao
    19. * @author zongtanghu
    20. */
    21. public class DefaultPublisher extends Thread implements EventPublisher {
    22. private volatile boolean initialized = false;
    23. private volatile boolean shutdown = false;
    24. private Classextends Event> eventType;
    25. protected final ConcurrentHashSet subscribers = new ConcurrentHashSet<>();
    26. private int queueMaxSize = -1;
    27. private BlockingQueue queue;
    28. protected volatile Long lastEventSequence = -1L;
    29. private static final AtomicReferenceFieldUpdater UPDATER = AtomicReferenceFieldUpdater
    30. .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
    31. @Override
    32. public void init(Class type, int bufferSize) {
    33. setDaemon(true);
    34. setName("nacos.publisher-" + type.getName());
    35. this.eventType = type;
    36. this.queueMaxSize = bufferSize;
    37. this.queue = new ArrayBlockingQueue<>(bufferSize);
    38. start();
    39. }
    40. @Override
    41. public synchronized void start() {
    42. if (!initialized) {
    43. // start just called once
    44. super.start();
    45. if (queueMaxSize == -1) {
    46. queueMaxSize = ringBufferSize;
    47. }
    48. initialized = true;
    49. }
    50. }
    51. @Override
    52. public void run() {
    53. openEventHandler();
    54. }
    55. void openEventHandler() {
    56. try {
    57. // This variable is defined to resolve the problem which message overstock in the queue.
    58. int waitTimes = 60;
    59. // To ensure that messages are not lost, enable EventHandler when
    60. // waiting for the first Subscriber to register
    61. for (; ; ) {
    62. if (shutdown || hasSubscriber() || waitTimes <= 0) {
    63. break;
    64. }
    65. ThreadUtils.sleep(1000L);
    66. waitTimes--;
    67. }
    68. for (; ; ) {
    69. if (shutdown) {
    70. break;
    71. }
    72. final Event event = queue.take();
    73. receiveEvent(event);
    74. UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
    75. }
    76. } catch (Throwable ex) {
    77. LOGGER.error("Event listener exception : ", ex);
    78. }
    79. }
    80. /**
    81. * Receive and notifySubscriber to process the event.
    82. *
    83. * @param event {@link Event}.
    84. */
    85. void receiveEvent(Event event) {
    86. final long currentEventSequence = event.sequence();
    87. if (!hasSubscriber()) {
    88. LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
    89. return;
    90. }
    91. // Notification single event listener
    92. for (Subscriber subscriber : subscribers) {
    93. if (!subscriber.scopeMatches(event)) {
    94. continue;
    95. }
    96. // Whether to ignore expiration events
    97. if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
    98. LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
    99. event.getClass());
    100. continue;
    101. }
    102. // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
    103. // Remove original judge part of codes.
    104. notifySubscriber(subscriber, event);
    105. }
    106. }
    107. @Override
    108. public void notifySubscriber(final Subscriber subscriber, final Event event) {
    109. LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
    110. final Runnable job = () -> subscriber.onEvent(event);
    111. final Executor executor = subscriber.executor();
    112. if (executor != null) {
    113. executor.execute(job);
    114. } else {
    115. try {
    116. job.run();
    117. } catch (Throwable e) {
    118. LOGGER.error("Event callback exception: ", e);
    119. }
    120. }
    121. }
    122. }

    这里省略了一些常规代码和注释。

    我们可以看到,DefaultPublisher这继承了Thread类,并覆写了start()run()方法。在run()方法的具体内容中,通过for(;;){...}来持续遍历事件队列queue

    对于线程的生命管理,有通过一个shutdown线程阀门,当其为true时即线程结束。

    这里比较优秀的一点是在存储Event时使用了BlockingQueue,可以在线程的执行过程中避免无意义的notify()。

    同时我们可以看到,在notifySubscriber()方法中,判断了消息订阅者是否使用了线程池。这里的实现方式对发布者处理效率至关重要。

    欢迎小伙伴们在评论区分享自己的见解。

  • 相关阅读:
    力扣打卡之三个数的最大乘积
    1412. 查找成绩处于中游的学生
    JVM——二、内存结构
    go context 源码刨析(一)
    【python与数据分析】Matplotlib数据可视化(续)
    JavaScript的字符串介绍
    JavaScript-bind实现原理
    1.http和https
    Sentinel整合Gateway
    JAVA设计模式之模板方法模式
  • 原文地址:https://blog.csdn.net/zcy_wxy/article/details/127116787