nacos中很多地方都用到了发布订阅模式,这点我们可以从Subscriber继承树上来看到
这里发布订阅模式的使用也很有借鉴意义,我们首先看下核心代码
- package com.alibaba.nacos.common.notify;
-
- import com.alibaba.nacos.common.notify.listener.Subscriber;
- import com.alibaba.nacos.common.utils.CollectionUtils;
- import com.alibaba.nacos.common.utils.ConcurrentHashSet;
- import com.alibaba.nacos.common.utils.ThreadUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.Executor;
- import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
- import static com.alibaba.nacos.common.notify.NotifyCenter.ringBufferSize;
-
- /**
- * The default event publisher implementation.
- *
- *
Internally, use {@link ArrayBlockingQueue } as a message staging queue.
- *
- * @author liaochuntao
- * @author zongtanghu
- */
- public class DefaultPublisher extends Thread implements EventPublisher {
- private volatile boolean initialized = false;
- private volatile boolean shutdown = false;
- private Class extends Event> eventType;
- protected final ConcurrentHashSet
subscribers = new ConcurrentHashSet<>(); - private int queueMaxSize = -1;
- private BlockingQueue
queue; - protected volatile Long lastEventSequence = -1L;
- private static final AtomicReferenceFieldUpdater
UPDATER = AtomicReferenceFieldUpdater - .newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
-
- @Override
- public void init(Class extends Event> type, int bufferSize) {
- setDaemon(true);
- setName("nacos.publisher-" + type.getName());
- this.eventType = type;
- this.queueMaxSize = bufferSize;
- this.queue = new ArrayBlockingQueue<>(bufferSize);
- start();
- }
-
-
- @Override
- public synchronized void start() {
- if (!initialized) {
- // start just called once
- super.start();
- if (queueMaxSize == -1) {
- queueMaxSize = ringBufferSize;
- }
- initialized = true;
- }
- }
-
-
- @Override
- public void run() {
- openEventHandler();
- }
-
- void openEventHandler() {
- try {
-
- // This variable is defined to resolve the problem which message overstock in the queue.
- int waitTimes = 60;
- // To ensure that messages are not lost, enable EventHandler when
- // waiting for the first Subscriber to register
- for (; ; ) {
- if (shutdown || hasSubscriber() || waitTimes <= 0) {
- break;
- }
- ThreadUtils.sleep(1000L);
- waitTimes--;
- }
-
- for (; ; ) {
- if (shutdown) {
- break;
- }
- final Event event = queue.take();
- receiveEvent(event);
- UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
- }
- } catch (Throwable ex) {
- LOGGER.error("Event listener exception : ", ex);
- }
- }
-
-
- /**
- * Receive and notifySubscriber to process the event.
- *
- * @param event {@link Event}.
- */
- void receiveEvent(Event event) {
- final long currentEventSequence = event.sequence();
-
- if (!hasSubscriber()) {
- LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
- return;
- }
-
- // Notification single event listener
- for (Subscriber subscriber : subscribers) {
- if (!subscriber.scopeMatches(event)) {
- continue;
- }
-
- // Whether to ignore expiration events
- if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
- LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
- event.getClass());
- continue;
- }
-
- // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
- // Remove original judge part of codes.
- notifySubscriber(subscriber, event);
- }
- }
-
- @Override
- public void notifySubscriber(final Subscriber subscriber, final Event event) {
-
- LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
-
- final Runnable job = () -> subscriber.onEvent(event);
- final Executor executor = subscriber.executor();
-
- if (executor != null) {
- executor.execute(job);
- } else {
- try {
- job.run();
- } catch (Throwable e) {
- LOGGER.error("Event callback exception: ", e);
- }
- }
- }
- }
这里省略了一些常规代码和注释。
我们可以看到,DefaultPublisher这继承了Thread类,并覆写了start()和run()方法。在run()方法的具体内容中,通过for(;;){...}来持续遍历事件队列queue。
对于线程的生命管理,有通过一个shutdown的线程阀门,当其为true时即线程结束。
这里比较优秀的一点是在存储Event时使用了BlockingQueue,可以在线程的执行过程中避免无意义的notify()。
同时我们可以看到,在notifySubscriber()方法中,判断了消息订阅者是否使用了线程池。这里的实现方式对发布者处理效率至关重要。
欢迎小伙伴们在评论区分享自己的见解。