• 观察者(observer)模式(二) —— 实现线程安全的监听器


    1. observer模式 VS 事件监听模式

    • 之前有总结过Java的时间监听机制:java的事件监听

    • 上一篇博客中,也明确指出observer模式又叫 source - listener模式,即事件监听模式

    • 仔细对比observer模式和事件监听模式,不难发现:subject对应事件源,observer对应listener,subject状态变化时向observer传递的数据对应event

    • 下图来自博客:设计模式之 —— 观察者模式进阶:监听器模式,很好地展示了各部分之间的对应关系

    • 可以说,事件监听模式,就是observer模式的另一种应用形式

    2 事件监听模式的不同实现方法

    • 博客java的事件监听中,通过实现EventListener接口,实现了对动物进食事件的监听
    • 还可以使用匿名内部类、lambda表达式自定义监听器

    2.1 匿名内部类

    • 使用匿名内部类实现对就诊时叫号的监听

      // 自定义事件监听器接口
      public interface CallEventListener {
          void onCallEvent(CallEvent event);
      }
      
      // 基于EventObject,定义事件
      public class CallEvent extends EventObject {
          public CallEvent(Object source) {
              super(source);
              System.out.println(Thread.currentThread().getName() + " ---- 生成callEvent, 事件源: " + source);
          }
      }
      
      // 定义事件源,支持注册、移除、触发事件监听器
      public class Caller {
          private final int room;
          private int number;
          private final List<CallEventListener> listeners;
      
          public Caller(int room) {
              this.room = room;
              this.listeners = new ArrayList<>();
          }
      
          public void addCallEventListener(CallEventListener listener) {
              if (listener != null && !listeners.contains(listener)) {
              	System.out.println(Thread.currentThread().getName() + " ---- 开始添加listener");
                  listeners.add(listener);
              }
          }
      
          public void deleteCallEventListener(CallEventListener listener) {
              listeners.remove(listener);
          }
      
          public void call(int number) {
         		System.out.println(Thread.currentThread().getName() + " ---- 开始叫号" + number);
              if (number != this.number) {
                  this.number = number;
                  // 自身作为source,新建新建CallEvent,通知所有注册的listener
                  CallEvent callEvent = new CallEvent(this);
                  System.out.println(Thread.currentThread().getName() + " ---- 触发callEvent, 事件源: " + this);
                  for (CallEventListener listener : listeners) {
                      listener.onCallEvent(callEvent);
                  }
              }
          }
      
          @Override
          public String toString() {
              return "Caller@" + this.hashCode() + "{" +
                      "room=" + room + ", number=" + number + '}';
          }
      
          public int getRoom() {
              return room;
          }
      
          public int getNumber() {
              return number;
          }
      }
      
      // 使用匿名内部类注册事件监听器,测试整个程序
      public class Main {
          public static void main(String[] args) {
              Caller caller = new Caller(3);
              // 通过匿名内部类,注册监听器
              caller.addCallEventListener(new CallEventListener() {
                  @Override
                  public void onCallEvent(CallEvent event) {
                      if (event.getSource() instanceof Caller) {
                          Caller source = (Caller) event.getSource();
                          if (source.getNumber() == 2) {
                              System.out.printf(Thread.currentThread().getName() + " ---- 我是%d号病人,马上去%d诊室就诊\n", source.getNumber(), source.getRoom());
                          }
                      }
                  }
              });
              // 开始叫号
              caller.call(1);
              caller.call(2);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
    • 最终执行结果如下:

    • 匿名内部类的实现方式,是Java GUI实现事件监听最常用的实现方式

    2.2 使用lambda表达式

    • 从代码可知,CallEventListener就是一个函数式接口,可以使用lambda表达式进行实现

      // 通过lambda表达式,注册监听器
      caller.addCallEventListener(event -> {
          if (event.getSource() instanceof Caller) {
              Caller source = (Caller) event.getSource();
              if (source.getNumber() == 1) {
                  logger.info("我是{}号病人,马上去{}诊室就诊", source.getNumber(), source.getRoom());
              }
          }
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    2.3 listener内存泄漏

    • 不知读者是否发现一个问题:通过匿名内部类或者lambda表达式实现的listener,主程序无法获得其引用,也就无法调用事件源Caller的deleteCallEventListener()方法注销listener

    • 这样的实现,将存在上一篇博客提到的Lapsed listener problem带来的内存泄漏问题

    • 可以将addCallEventListener()方法稍作修改,使其返回注册后的listener

      public CallEventListener addCallEventListener(CallEventListener listener) {
          if (listener != null && !listeners.contains(listener)) {
              listeners.add(listener);
          }
      
          return listener;
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    3. 线程安全的监听器

    3.1 线程不安全

    • 以上代码单线程运行,不会出现任何问题,但在多线程环境下就会出现各种意想不到的错误

    • 例如,一个线程在添加listener时,另一个线程在执行叫号操作,新添加的listener可能会收到/收不到这次的叫号通知。

    • 直接实现Patient类,作为listener

      public class Patient implements CallEventListener{
          private final int number;
      
          public Patient(int number) {
              this.number = number;
          }
      
          @Override
          public void onCallEvent(CallEvent event) {
              if (event.getSource() instanceof Caller) {
                  Caller source = (Caller) event.getSource();
                  if (source.getNumber() == number) {
                      System.out.printf(Thread.currentThread().getName() + " ---- 我是%d号病人,马上去%d诊室就诊\n", number, source.getRoom());
                  }
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
    • 多线程添加listener、叫号,程序执行结果多种多样,甚至执行失败

      public static void main(String[] args) {
          Caller caller = new Caller(3);
          // 添加病人的同时,进行叫号操作,病人可能没法收到叫号通知,从而错过叫号
          new Thread(() -> caller.addCallEventListener(new Patient(3))).start();
          new Thread(() -> caller.addCallEventListener(new Patient(1))).start();
          new Thread(() -> caller.call(1)).start();
       
          new Thread(() -> caller.call(2)).start();
          new Thread(() -> caller.addCallEventListener(new Patient(2))).start();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
    • 例如,下面的执行结果中,没有一个病人被成功叫号

    • 甚至,可能因为注册listener的同时迭代listener list,出现ConcurrentModificationException异常

    3.2 synchronized保证线程安全

    • 多线程同时访问Caller中各方法时,存在线程安全问题

    • 最简单的解决办法,为每个方法添加synchronized关键字,保证多线程间的同步

      public synchronized void addCallEventListener(CallEventListener listener)
      
      public synchronized void deleteCallEventListener(CallEventListener listener)
      
      public synchronized void call(int number)
      
      • 1
      • 2
      • 3
      • 4
      • 5
    • 期望的执行结果如下:

    • 使用synchronized关键字,保证同一时刻只有一个线程访问Caller,不会因为多线程交替执行而产生各种奇怪的执行结果

    • synchronized可以看做是一个重量级锁、互斥锁

      • 进行注册、删除listener(即病人)这样的写操作时,互斥是必要的。
      • 但通知listener这样的读操作(call()方法),没必要互斥,可以多线程同时执行(叫号这样要求有顺序的场景是不行的,都怪自己一开始给错了需求场景 😢
      • 而且,迭代调用所有的listener的事件处理方法(这里为onCallEvent()方法)需要一定的时间:可能是listener很多,也可能是listener执行事件处理方法需要一定的时间
    • 同时,synchronized不保证操作的执行顺序。

      • 例如,实际执行时,Thread A叫1号的操作早于Thread B叫2号的操作
      • 但是由于synchronized锁被其他线程占有,使得两个线程都将阻塞并在同步队列中等待synchronized锁
      • 等到synchronized锁释放后,Thread B因为竞争synchronized锁成功,使得call 2号病人先于call 1号病人

    3.3 公平的ReentrantReadWriteLock

    • 上面的场景中,注册、删除listener这样的写操作,必须和其他线程的读写操作互斥;通知listener的操作,得按照叫号顺序依次执行
    • 这时,可以考虑使用公平ReentrantReadWriteLock,满足以上需求,解决synchronized关键字存在的问题
    • 由于作者是个菜鸡,需求场景没给对,导致这里没法基于已有的代码给出一个正确的示例
    • 具体可以参考博客The Observer Pattern Using Java 8中,关于Ordered Notification of Listeners的示例程序
    • 关键思想: 只要Thread 1获取锁的操作早于Thread 2, Thread 1一定能早于Thread 2获取到锁,而非像使用synchronized关键字一样,两个线程靠运气去竞争锁

    3.5 另一篇文章推荐

    • Thread safe Observer design pattern in Java,这篇文章及其后面的评论都值得一读
    • 从单线程的observer模式实现为例,探讨了如何一步一步实现一个线程安全的observer模式
      • 单线程存在各种无法预料的问题,例如存放observer的set,多次初始化、以及observer丢失等
      • 基于ConcurrentHashMap创建存储observer的即时初始化(相对 lazy initialization而言) 的set,保证了线程安全,但不保证调用notifyObservers()时,基于最新的observer集合进行迭代
      • 使用synchronized关键字,有两个问题:① 容易因为notifyObservers()迭代通知所有的observer时间太长,其他操作阻塞;② observer执行自身的notify()方法时,如果尝试获取其内部的synchronized锁会被阻塞(暂时不太能理解)
      • 更新synchronized的锁范围:先复制set,然后基于set副本迭代通知observer;其中,对set的复制操作需要加锁
    • 评论中的建议:
      • 基于ConcurrentHashMap创建存储observer的set,改为直接使用CopyOnWriteArraySet,可以免除复制set的操作
      • 使用ReentrantReadWriteLock + 公平队列,仍需复制存储observer的set

    5. 后记

    5.1 其他

    • 博客The Observer Pattern Using Java 8,还介绍了listener的其他小技巧
    • 技巧一: 一个listener接口需要处理多种事件时(多个事件处理方法),可以考虑基于Listener接口实现Adapter类;用户创建listener时,只需要继承Adapter类,并按照需求实现某个方法,而无需重写所有方法 —— Java AWT中的MouseListenerMouseAdapter就是采用了这种方法
      • JDK 8以后,可以将Listener接口中的方法定义为default方法
      • 将Listener接口改为所有方法体为空的Listener类,但其后期不支持继承多个Listene类(Java的单继承、多实现
    • 技巧二: 如果listener中的事件处理方法是复杂的,甚至是阻塞的,该如何处理?
      • 为每个listener分配一个线程,避免串行执行带来的副作用
      • 为事件源(observer中叫subject)的notifyListeners()方法分配一个线程
      • Queue the listener function invocations and have a set of threads execute the listener functions,暂时还不是特别懂

    5.2 参考链接

  • 相关阅读:
    Java后端模拟面试,题集①
    什么是阿里云轻量应用服务器?
    基于SpringBoot+Redis的前后端分离外卖项目-苍穹外卖(五)
    【PostgreSQL内核学习(十六)—— (pg_statistic 表)】
    如何去掉照片中多余路人?一分钟帮你搞定
    linux sed命令:sed替换命令使用环境变量,且环境变量中有文件分隔符时,sed命令不work (unknown option to s)
    vue2+element-ui实现表格分页显示
    【探索Linux】—— 强大的命令行工具 P.7(进程 · 进程的概念)
    CVE-2022-41622 & CVE-2022-41800 F5 BIG-IP和BIG-IQ远程代码执行漏洞复现
    C++编程法则365天一天一条(14)sizeof运算符使用
  • 原文地址:https://blog.csdn.net/u014454538/article/details/126083660