• Java多线程篇(4)——wait/notify和park/unPark


    Object - wait/notify

    object.wait()

    在这里插入图片描述
    ObjectSynchronizer::wait
    在这里插入图片描述
    从这段代码可以得到两个信息
    1:wait() 底层是对象锁(就是synchronized底层实现的那个对象锁)。这也正是 wait/notify 要在同步代码块内的原因。
    2:wait() 的调用会使得对象锁立马膨胀成重量级锁(因为需要使用mutex阻塞线程)。

    ObjectMonitor::wait

    void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
       //...
       
       // 封装成ObjectWaiter对象
       ObjectWaiter node(Self);
       node.TState = ObjectWaiter::TS_WAIT ;
       Self->_ParkEvent->reset() ;
       OrderAccess::fence();
       
       //加入 WaitSet 
       Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
       AddWaiter (&node) ;
       Thread::SpinRelease (&_WaitSetLock) ;
    
       //...
       
       //释放当前线程占用的对象锁
       exit (true, Self) ;
       
       //...
       
       	   //阻塞当前线程
           if (node._notified == 0) {
             if (millis <= 0) {
                Self->_ParkEvent->park () ;
             } else {
                ret = Self->_ParkEvent->park (millis) ;
             }
           }
    
    //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    wait主要干了三件事:
    1:封装objectWaiter对象并加入 WaitSet
    2:释放对象锁
    3:调用 ParkEvent.park() 阻塞当前线程(底层调用 pthread_mutex_lock )


    object.notify()

    同理
    ObjectSynchronizer::notify
    在这里插入图片描述
    ObjectMonitor::notify

    void ObjectMonitor::notify(TRAPS) {
      //...
    
      //Policy 移动策略,默认为 2 
      int Policy = Knob_MoveNotifyee ;
    
      //取出waitSet的第一个
      Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
      ObjectWaiter * iterator = DequeueWaiter() ;
    
      //根据 Policy 策略移动ObjectWaiter到cxq或者entryList或直接唤醒
      //  Policy == 0 :头插entrylist
      //  Policy == 1 :尾插entrylist
      //  Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列
      //  Policy == 3 :直接插入cxq 
      //  其他:直接唤醒线程,让线程直接调用enterI
      if (iterator != NULL) {
         //...
    
    	 // Policy == 0 :头插entrylist
         if (Policy == 0) {
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                 List->_prev = iterator ;
                 iterator->_next = List ;
                 iterator->_prev = NULL ;
                 _EntryList = iterator ;
            }
         }
         // Policy == 1 :尾插entrylist
         else if (Policy == 1) {
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                ObjectWaiter * Tail ;
                for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
                assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
                Tail->_next = iterator ;
                iterator->_prev = Tail ;
                iterator->_next = NULL ;
            }
         } 
         // Policy == 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列
         else if (Policy == 2) {
             if (List == NULL) {
                 iterator->_next = iterator->_prev = NULL ;
                 _EntryList = iterator ;
             } else {
                iterator->TState = ObjectWaiter::TS_CXQ ;
                for (;;) {
                    ObjectWaiter * Front = _cxq ;
                    iterator->_next = Front ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                        break ;
                    }
                }
             }
         }
         // Policy == 3 :直接插入cxq 
         else if (Policy == 3) {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Tail ;
                Tail = _cxq ;
                if (Tail == NULL) {
                    iterator->_next = NULL ;
                    if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
                       break ;
                    }
                } else {
                    while (Tail->_next != NULL) Tail = Tail->_next ;
                    Tail->_next = iterator ;
                    iterator->_prev = Tail ;
                    iterator->_next = NULL ;
                    break ;
                }
            }
         }
         // 否则直接唤醒线程,让线程直接调用enterI
         else {
            ParkEvent * ev = iterator->_event ;
            iterator->TState = ObjectWaiter::TS_RUN ;
            OrderAccess::fence() ;
            ev->unpark() ;
         }
    
         //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    notify主要就是根据 Policy 策略来决定以何种方式唤醒目标线程:
    Policy = 0 :头插entrylist
    Policy = 1 :尾插entrylist
    Policy = 2 :如果entrylist为空,那么插入entrylist,否则插入cxq队列(默认策略)
    Policy = 3 :直接插入cxq
    其他:直接唤醒线程,让线程直接调用enterI


    LockSupport - park/unpark

    核心设计思想是 ”许可“,park消费许可,unpark生产许可(同一时间最多最有一个许可)。
    _counter:许可
    _con:条件变量
    _mutex:互斥锁

    LockSupport.park()

    在这里插入图片描述
    每个线程都内置了一个 parker,通过 Parker.park() 方法进行阻塞

    Parker与ParkEvent的功能类型,在源码的注释中也提了,计划将Parker合并到ParkEvent
    注释原文: In the future we’ll want to think about eliminating Parker and using ParkEvent instead. There’s considerable duplication between the two services.

    Parker::park

    void Parker::park(bool isAbsolute, jlong time) {
      //原子替换_counter为0,如果之前_counter为1则直接返回,不阻塞当前线程
      if (Atomic::xchg(0, &_counter) > 0) return;
    
      //...
      
      //如果线程被终止,也直接返回
      if (Thread::is_interrupted(thread, false)) {
        return;
      }
    
      //解析时间参数
      timespec absTime;
      if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
        return;
      }
      if (time > 0) {
        unpackTime(&absTime, isAbsolute, time);
      }
    
      //...
    
      //如果线程被终止或者获取mutex锁失败直接返回
      if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
        return;
      }
    
      //走到这里说明获mutex锁成功
    
      //获锁后再次检查_counter是否大于0,如果是直接消费许可,无需等待。
      int status ;
      if (_counter > 0)  { // no wait needed
        _counter = 0;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
        OrderAccess::fence();
        return;
      }
    
      //...
      
      //调用 pthread_cond_wait 通过 _con 和 _mutex 配合使用阻塞当前线程直至满足_con条件
      if (time == 0) {
        _cur_index = REL_INDEX;
        status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
      }
      //有超时时间的话,就调用 safe_cond_timedwait , 不管有没有满足条件,一旦超时都会唤醒
      else {
        _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
        status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
        if (status != 0 && WorkAroundNPTLTimedWaitHang) {
          pthread_cond_destroy (&_cond[_cur_index]) ;
          pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
        }
      }
      
      //设置_counter为0并释放 mutex 锁
      _counter = 0 ;
      status = pthread_mutex_unlock(_mutex) ;
      
      //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    pthread_cond_wait :阻塞当前线程,直至条件满足。并且阻塞后会自动释放锁,唤醒后又会自动获取锁。

    为什么 pthread_cond_wait 需要搭配互斥锁使用?
    条件不成立会进入阻塞,假如在进入阻塞的这个期间,条件又成立了,此时最终的结果就是条件成立,但一直在阻塞。同理,唤醒的时候也需搭配互斥锁才能保证不漏掉临界条件。

    总结:
    1:将许可置为0,同时检查之前许可是否为1,如果为1直接返回
    2:获取mutex互斥锁
    3:在条件变量上阻塞当前线程(阻塞会自动释放锁,唤醒会自动获取锁)
    4:线程被唤醒后重置许可为0,并释放互斥锁


    LockSupport.unPark()

    同理
    Parker::unpark

    void Parker::unpark() {
      int s, status ;
    
      //获取mutex锁
      status = pthread_mutex_lock(_mutex);
      
      assert (status == 0, "invariant") ;
      s = _counter;
      
      //设置许可为1
      _counter = 1;
      
      //如果许可原本小于1表示线程可能被阻塞(parked),需要唤醒线程。
      if (s < 1) {
        
    	//如果线程确实被阻塞(即 _cur_index 不等于-1)
    	//调用 pthread_cond_signal 唤醒线程
        if (_cur_index != -1) {	
          if (WorkAroundNPTLTimedWaitHang) {
            status = pthread_cond_signal (&_cond[_cur_index]);
            assert (status == 0, "invariant");
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant");
          } else {
            int index = _cur_index;
            status = pthread_mutex_unlock(_mutex);
            assert (status == 0, "invariant");
            status = pthread_cond_signal (&_cond[index]);
            assert (status == 0, "invariant");
          }
        }
        
        //反之什么都不做,直接释放锁返回 
        else {
          pthread_mutex_unlock(_mutex);
          assert (status == 0, "invariant") ;
        }
      }
      
      //如果许可原本为1,什么都不做,直接释放锁返回 
      else {
        pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant") ;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    总结:
    1:获取互斥锁
    2:置许可为1
    3:唤醒在条件变量上等待的线程
    4:释放互斥锁


    wait/notify是一种等待/通知机制。等待啥?线程对互斥资源的等待。通知啥?通知线程互斥资源已经没人在用可以去抢占了。因为描述的是对互斥资源的竞争,所以wait/notify在object上(任何对象都可以是互斥资源)。
    而park/unPark是一种对线程的精准控制,他更多的是描述线程之间的先后顺序(比如生产者线程和消费者线程)。

    park/unPark相对于wait/notify
    更直观,以thread为操作对象。
    更精准,可以指定唤醒那个线程。
    更简单,无需在synchronized代码块内。
    更灵活,unpark方法可以在park方法前调用。

  • 相关阅读:
    【Vue】V-if成立时,元素出现;不成立时,元素不显示。
    基于随机油漆优化器 (MOSPO)求解多目标优化问题附matlab代码
    [附源码]计算机毕业设计springboot咖啡销售平台
    入门力扣自学笔记125 C++ (题目编号1656)
    基于SpringBoot的大学生体质测试管理系统
    故障分析 | MySQL 无监听端口故障排查
    [数据结构-线性表1.2] 链表与 LinkedList<T>(.NET 源码学习)
    MySQL软件常见操作
    uniApp笔记
    大端 小端?
  • 原文地址:https://blog.csdn.net/qq_43196360/article/details/132419235