• 观察者模式


    如果大家觉得文章有错误内容,欢迎留言或者私信讨论~

    原理及应用场景

    Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

      观察者模式,也被称为发布订阅模式,上面的定义翻译成中文就是:在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。

      我们先来看其中最经典的一种实现方式,具体实现代码如下:

    public interface Subject {
    	void registerObserver(Observer observer);
    	void removeObserver(Observer observer);
    	void notifyObservers(Message message);
    }
    
    public interface Observer {
    	void update(Message message);
    }
    
    public class ConcreteSubject implements Subject {
    	private List<Observer> observers = new ArrayList<Observer>();
    	
    	@Override
    	public void registerObserver(Observer observer) {
    		observers.add(observer);
    	} 
    	
    	@Override
    	public void removeObserver(Observer observer) {
    		observers.remove(observer);
    	} 
    	
    	@Override
    	public void notifyObservers(Message message) {
    		for (Observer observer : observers) {
    			observer.update(message);
    		}
    	}
    }
    
    public class ConcreteObserverOne implements Observer {
    	@Override
    	public void update(Message message) {
    		//TODO: 获取消息通知,执行自己的逻辑...
    		System.out.println("ConcreteObserverOne is notified.");
    	}
    }
    
    public class ConcreteObserverTwo implements Observer {
    	@Override
    	public void update(Message message) {
    		//TODO: 获取消息通知,执行自己的逻辑...
    		System.out.println("ConcreteObserverTwo is notified.");
    	}
    }
    
    public class Demo {
    	public static void main(String[] args) {
    		ConcreteSubject subject = new ConcreteSubject();
    		subject.registerObserver(new ConcreteObserverOne());
    		subject.registerObserver(new ConcreteObserverTwo());
    		subject.notifyObservers(new Message());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

      事实上,上面的代码只是观察者模式的一种“模板”,只能反映大体的设计思路。在真实的软件开发中,并不需要照搬上面的模板代码,因为观察者的实现方式各式各样,函数、类的命名则会根据业务的不同有很大的差别。

      代码实现和原理都比较简单,那么重点就来了。什么情况下需要用到这种设计模式呢?或者说这种设计模式是用来解决什么问题的?

      假设我们在开发一个 P2P 投资理财系统,用户注册成功之后,我们会给用户发放投资体验金:

    public class UserController {
    	private UserService userService; // 依赖注入
    	private PromotionService promotionService; // 依赖注入
    	
    	public Long register(String telephone, String password) {
    		//省略输入参数的校验代码
    		//省略userService.register()异常的try-catch代码
    		long userId = userService.register(telephone, password);
    		promotionService.issueNewUserExperienceCash(userId);
    		return userId;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

      虽然注册接口做了两件事情,注册和发放体验金,违反单一职责原则,但是,如果没有扩展和修改的需求,现在的代码实现是可以接受的。如果非得用观察者模式,就需要引入更多的类和更加复杂的代码结构,反倒是一种过度设计。

      但是随着业务状态,需求频繁变动。比如,用户注册成功之后,不再发放体验金,而是改为发放优惠券,并且还要给用户发送一封“欢迎注册成功”的站内信。这种情况下,我们就需要频繁地修改 register() 函数中的代码,违反开闭原则。而且,如果注册成功之后需要执行的后续操作越来越多,那 register() 函数的逻辑会变得越来越复杂,也就影响到代码的可读性和可维护性。

      这时候观察者模式就会派上用场,使用观察者模式重构如下代码:

    public interface RegObserver {
    	void handleRegSuccess(long userId);
    }
    
    public class RegPromotionObserver implements RegObserver {
    	private PromotionService promotionService; // 依赖注入
    	
    	@Override
    	public void handleRegSuccess(long userId) {
    		promotionService.issueNewUserExperienceCash(userId);
    	}
    }
    
    public class RegNotificationObserver implements RegObserver {
    	private NotificationService notificationService;
    	
    	@Override
    	public void handleRegSuccess(long userId) {
    		notificationService.sendInboxMessage(userId, "Welcome...");
    	}
    }
    
    public class UserController {
    	private UserService userService; // 依赖注入
    	private List<RegObserver> regObservers = new ArrayList<>();
    	
    	// 一次性设置好,之后也不可能动态的修改
    	public void setRegObservers(List<RegObserver> observers) {
    		regObservers.addAll(observers);
    	} 
    	
    	public Long register(String telephone, String password) {
    		//省略输入参数的校验代码
    		//省略userService.register()异常的try-catch代码
    		long userId = userService.register(telephone, password);
    		for (RegObserver observer : regObservers) {
    			observer.handleRegSuccess(userId);
    		} 
    		return userId;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

      当我们有新的后续的追加业务,基于观察者模式的代码实现,UserController 类的 register() 函数完全不需要修改,只需要再添加一个实现了 RegObserver 接口的类,并且通过 setRegObservers()函数将它注册到 UserController 类中即可。

    基于不同的应用场景的不同实现

      之前的应用场景下我们是基于同步阻塞的方式实现的,观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间(实际上,大家看到这里应该都想到了消息队列,虽然引入消息队列被观察者和观察者解耦更加彻底,两部分的耦合更小,但对开发者的本身素质也有提高)。

    新线程与线程池

    // 第一种
    public class RegPromotionObserver implements RegObserver {
        private PromotionService promotionService;
    
        @Override
        public void handleRegSuccess(long userId) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    promotionService.issueNewUserExperienceCash(userId);
                }
            });
            thread.start();
        }
    }
    
    // 第二种
    public class UserController {
        private UserService userService; // 依赖注入
        private List<RegObserver> regObservers = new ArrayList<>();
        private Executor executor;
    
        public UserController(Executor executor) {
            this.executor = executor;
        }
    
        public void setRegObservers(List<RegObserver> observers) {
            regObservers.addAll(observers);
        }
    
        public Long register(String telephone, String password) {
            //省略输入参数的校验代码
            //省略userService.register()异常的try-catch代码
            long userId = userService.register(telephone, password);
            for (RegObserver observer : regObservers) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        observer.handleRegSuccess(userId);
                    }
                });
            }
           return userId;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

      对于第一种实现方式,频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。第二种实现方式,尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本。

      如果我们的产品经理需求更加极端一点,需要在同步阻塞和异步非阻塞之间灵活切换,那就要不停地修改 UserController 的代码。

      我们知道,框架的作用有:隐藏实现细节,降低开发难度,做到代码复用,解耦业务与非业务代码,让程序员聚焦业务开发。针对异步非阻塞观察者模式,我们也可以将它抽象成框架来达到这样的效果,而这个框架就是接下来要讲的 EventBus

    EventBus

      EventBus 翻译为“事件总线”,它提供了实现观察者模式的骨架代码,不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式。我们用它再来重构一次上面的代码:

    public class UserController {
    	private UserService userService; // 依赖注入
        private EventBus eventBus;
        private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
    
        public UserController() {
            eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE));
        }
    
        public void setRegObservers(List<Object> observers) {
            for (Object observer : observers) {
                eventBus.register(observer);
            }
        }
    
        public Long register(String telephone, String password) {
            //省略输入参数的校验代码
            //省略userService.register()异常的try-catch代码
            long userId = userService.register(telephone, password);
            eventBus.post(userId);
            return userId;
        }
    }
    
    public class RegPromotionObserver {
    	private PromotionService promotionService; // 依赖注入
    	
    	@Subscribe
    	public void handleRegSuccess(long userId) {
    		promotionService.issueNewUserExperienceCash(userId);
    	}
    }
    
    public class RegNotificationObserver {
    	private NotificationService notificationService;
    	
    	@Subscribe
    	public void handleRegSuccess(long userId) {
    		notificationService.sendInboxMessage(userId, "...");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

      利用 EventBus 框架实现的观察者模式,跟从零开始编写的观察者模式相比,从大的流程上来说,实现思路大致一样,都需要定义 Observer,并且通过 register() 函数注册Observer,也都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)。

      但在实现细节方面,它们又有些区别。基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。

      接下来,详细讲一下 EventBus 的几个主要的类和函数。

    • EventBus、AsyncEventBus

      EventBus 实现了同步阻塞的观察者模式,AsyncEventBus 继承自 EventBus,提供了异步非阻塞的观察者模式。具体使用方式如下所示:

    EventBus eventBus = new EventBus(); // 同步阻塞模式
    // 异步阻塞
    EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(8));
    
    • 1
    • 2
    • 3
    • register() 函数

      EventBus 类提供了 register() 函数用来注册观察者。具体的函数定义如下所示。它可以接受任何类型(Object)的观察者。而在经典的观察者模式的实现中,register() 函数必须接受实现了同一 Observer 接口的类对象。

    public void register(Object object);
    
    • 1
    • unregister() 函数

      顾名思义,它的功能就是从 EventBus 中删除某个观察者,它的定义如下:

    public void unregister(Object object);
    
    • 1
    • post() 函数

      用来给观察者发送消息。具体的函数定义如下所示:

    public void post(Object event);
    
    • 1

      这里 post 发动的消息是发送给可匹配的观察者,由@Subscribe 注解来标明

    • @Subscribe 注解

      EventBus 通过 @Subscribe 注解来标明,某个函数能接收哪种类型的消息。具体的使用代码如下所示:

      当通过 register() 函数将 DObserver 类对象注册到 EventBus 的时候,EventBus 会根据@Subscribe 注解找到 f1() 和 f2(),并且将两个函数能接收的消息类型记录下来(PMsg->f1,QMsg->f2)。当我们通过 post() 函数发送消息(比如 QMsg 消息)的时候,EventBus 会通过之前的记录(QMsg->f2),调用相应的函数(f2)。

    手动实现一个 EventBus

      了解了 EventBus 的功能之后,我们就可以尝试 “山寨” 一个 EventBus 出来。我们重点来看,EventBus 中两个核心函数 register() 和 post() 的实现原理。弄懂了它们,基本上就弄懂了整个 EventBus 框架。下面两张图是这两个函数的实现原理图:

    在这里插入图片描述

    在这里插入图片描述

      从图中我们可以看出,最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接收消息函数的对应关系。当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。当调用 post() 函数发送消息的时候,
    EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 的反射语法来动态地创建对象、执行函数。对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。
      明白了之后,我们就来实现它们吧。

    1. Subscribe

      Subscribe 是一个注解,用于标明观察者中的哪个函数可以接收消息。

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    @Beta
    public @interface Subscribe {}
    
    • 1
    • 2
    • 3
    • 4
    1. ObserverAction

      ObserverAction 类用来表示 @Subscribe 注解的方法,其中,target 表示观察者类,method 表示方法。它主要用在 ObserverRegistry 观察者注册表中。

    public class ObserverAction {
    	private Object target;
    	private Method method;
    	
    	public ObserverAction(Object target, Method method) {
    		this.target = Preconditions.checkNotNull(target);
    		this.method = method;
    		this.method.setAccessible(true);
    	} 
    	
    	public void execute(Object event) { // event是method方法的参数
    		try {
    			method.invoke(target, event);
    		} catch (InvocationTargetException | IllegalAccessException e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. ObserverRegistry

      ObserverRegistry 类就是前面讲到的 Observer 注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了 Java 的反射语法,不过代码整体来说都不难理解,其中,一个比较有技巧的地方是 CopyOnWriteArraySet 的使用。

    public class ObserverRegistry {
    	// 在写入数据的时候,会创建一个新的 set,并且将原始数据 clone 到新的 set 中,在新的 set 中写入数据完成之后,再用新的 set 替换老的set
    	// 这样就能保证在写入数据的时候,不影响数据的读取操作,以此来解决读写并发问题
         private ConcurrentHashMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> register = new ConcurrentHashMap<>();
    
        public void register(Object observer) {
            Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
            for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
                Class<?> eventType = entry.getKey();
                Collection<ObserverAction> eventActions = entry.getValue();
                CopyOnWriteArraySet<ObserverAction> registeredEventActions = register.get(eventType);
                if (registeredEventActions == null) {
                    register.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
                    registeredEventActions = register.get(eventType);
                }
                registeredEventActions.addAll(eventActions);
            }
        }
    
        public List<ObserverAction> getMatchedObserverActions(Object event) {
            List<ObserverAction> matchedObservers = new ArrayList<>();
            Class<?> postedEventType = event.getClass();
            for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : register.entrySet()) {
                Class<?> eventType = entry.getKey();
                CopyOnWriteArraySet<ObserverAction> eventActions = entry.getValue();
                if (postedEventType.isAssignableFrom(eventType)) {
                    matchedObservers.addAll(eventActions);
                }
            }
            return matchedObservers;
        }
    
        public Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
            Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
            Class<?> clazz = observer.getClass();
    
            for (Method method : getAnnotatedMethods(clazz)) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                Class<?> eventType = parameterTypes[0];
                if (!observerActions.containsKey(eventType)) {
                    observerActions.put(eventType, new ArrayList<>());
                }
                observerActions.get(eventType).add(new ObserverAction(observer, method));
            }
    
            return observerActions;
        }
    
        private List<Method> getAnnotatedMethods(Class<?> clazz) {
            List<Method> annotatedMethods = new ArrayList<>();
            for (Method method : clazz.getDeclaredMethods()) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                Preconditions.checkArgument(parameterTypes.length == 1,
                        "Method %s has @Subscribe annotation but has %s parameters."
                        + "Subscriber methods must have exactly 1 parameter.",
                        method, parameterTypes.length);
                annotatedMethods.add(method);
            }
    
            return annotatedMethods;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    1. EventBus

      EventBus 实现的是阻塞同步的观察者模式。看代码你可能会有些疑问,这明明就用到了线程池 Executor 啊。实际上,MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用:

    public class EventBus {
        private Executor executor;
        private ObserverRegistry registry = new ObserverRegistry();
    
        public EventBus() {
            this(MoreExecutors.directExecutor());
        }
    
        protected EventBus(Executor executor) {
            this.executor = executor;
        }
    
        public void register(Object object) {
            registry.register(object);
        }
    
        public void post(Object event) {
            List<ObserverAction> observerActions = registry.getMatchedObserverActions(event);
            for (ObserverAction observerAction : observerActions) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        observerAction.execute(event);
                    }
                });
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    1. AsyncEventBus

      有了 EventBus,AsyncEventBus 的实现就非常简单了。为了实现异步非阻塞的观察者模式,它就不能再继续使用 MoreExecutors.directExecutor() 了,而是需要在构造函数中,由调用者注入线程池。

    public class AsyncEventBus extends EventBus{
    
        public AsyncEventBus(Executor executor) {
            super(executor);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

      至此,我们的EventBus就实现了,不过我还是建议你去看一下它的源码,在实现细节方面,相对上面的“山寨版”,做了很多的优化,比如在注册表中查找消息可匹配函数的算法等。

  • 相关阅读:
    2023-11-08 monetdb-事务-只有RR隔离级别-原因分析
    systrace/perfetto如何看surfaceflinger的vsync信号方法-android framework实战车载手机系统开发
    1002:输出第二个整数
    Go基础-文件、字符
    傻瓜式快速下载TCGA数据(win x86版本)
    Java 泛型中的通配符
    【华为OD机试真题 python】叠积木【2022 Q4 | 200分】
    java+springboot基于性别网上学习特征问卷调查及可视化系统
    复现Thinkphp 2.x 任意代码执行漏洞
    idea常用快捷键 idea搜索快捷键
  • 原文地址:https://blog.csdn.net/qq_43654226/article/details/126677833