我们都知道简单的生产者和消费者的实现就是通过Object内的wait和notify实现的。至于例子暂时不写了。
一、前言
常见面试题:
为什么wait和notify方法是Object类中的方法,而不是在Thread里,毕竟通常我们都是在多线程的时候在使用wait和notify方法。
解释:
1.首先我们知道生产者和消费者模型里,我们使用的是一个Object lock = new Object()。然后在新起的线程内使用的是lock.wait()和lock.notify()。并且我们配合使用的还有一个东西synchronize关键字,保证同步的。它们有可能是有联系的,毕竟我们从来没见过在同步之外去使用wait和notify。
2.看wait方法的注释:
/** * <p> * The current thread must own this object's monitor. * <p> */ public final native void wait(long timeout) throws InterruptedException;
我截图了最关键的一句话。当前线程必须拥有这个object的monitor。monitor我们在分析synchronize的时候已经有所了解。它就是一个监视器,存在于对象内。那么这就能解释了,为什么它在Object内?因为它依赖于对象,而对象的最高父类就是Object。
3.为什么需要配合synchronize使用呢?其实synchronize的底层代码中使用了monitorenter和monitorexit指令,看见没?这就解释了必须配合它才能使用wait和notify。你不进入怎么拥有这个对象的monitor啊~~~~~~~~~~~~~
二、分析和学习
需要了解wait和notify,jdk源码已经没啥用了。因为都是native方法,必须去学习HotSpot源码,还是那句话,HotSpot源码只能大致看看,不可能了解很细,扣的很细的话会另学习效率非常低,不是专业人士读起来非常费劲。
1.monitor
monitor是个很抽象的东西,在HotSpot中,monitor的实现是ObjectMonitor。其实我后来读过之后有个感受,就是它和AQS很像,超级像,AQS就是C.U.T中的一个等待队列,用于存放获取锁失败的线程,后续唤醒也是从AQS中根据一定规则进行唤醒动作。而ObjectMonitor的结构差不多也是如此。
// initialize the monitor, exception the semaphore, all other fields // are simple integers or pointers ObjectMonitor() { _header = NULL; _count = 0; _waiters = 0, _recursions = 0; _object = NULL; _owner = NULL; _WaitSet = NULL; _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; FreeNext = NULL ; _EntryList = NULL ; _SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ; _previous_owner_tid = 0; }
ObjectMonitor中存在两个队列,_WaitSet和_EntryList,都是用来保存ObjectWaiter对象,而_owner就是用来保存获取到ObjectMonitor对象的线程。下图是网络盗图一张(但是我对于此图的真实性不太相信,有些东西解释的不清楚,只是copy)
private: ObjectWaiter * volatile _cxq ; // LL of recently-arrived threads blocked on entry. // The list is actually composed of WaitNodes, acting // as proxies for Threads. protected: ObjectWaiter * volatile _EntryList ; // Threads blocked on entry or reentry. protected: ObjectWaiter * volatile _WaitSet; // LL of threads wait()ing on the monitor
1.1处于wait状态的线程会加入到Wait队列。
1.2等待锁状态的会加入到EntryList中。
1.3最近刚被阻塞的线程进入到_cxq,字面解释。
2.wait方法的HotSpot实现
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { ... // create a node to be put into the queue // Critically, after we reset() the event but prior to park(), we must check // for a pending interrupt. ObjectWaiter node(Self); //1.构造ObjectWaiter对象 node.TState = ObjectWaiter::TS_WAIT ; //线程状态是WAIT Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; //2.加入到ObjectWaiter中 Thread::SpinRelease (&_WaitSetLock) ; exit (true, Self) ; // exit the monitor //3.释放掉当前的锁,保证后续线程能够获取到锁 if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; //4.挂起自己 } } .... }
AddWaiter:(在AddWaiter中注意到是放入到_Waitset队列中)
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
ObjectWaiter:它是一个双向链表结构,保存的是当前线程和状态信息,每个等待锁的线程都会被封装成ObjectWaiter对象。
class ObjectWaiter : public StackObj { public: enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; jlong _notifier_tid; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };
Exit方法释放锁并唤醒其他线程
if (QMode == 2 && _cxq != NULL) { // QMode == 2 : cxq has precedence over EntryList. // Try to directly wake a successor from the cxq. // If successful, the successor will need to unlink itself from cxq. w = _cxq ; assert (w != NULL, "invariant") ; assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ; ExitEpilog (Self, w) ; return ; } if (QMode == 3 && _cxq != NULL) { // Aggressively drain cxq into EntryList at the first opportunity. // This policy ensure that recently-run threads live at the head of EntryList. // Drain _cxq into EntryList - bulk transfer. // First, detach _cxq. // The following loop is tantamount to: w = swap (&cxq, NULL) w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } // Append the RATs to the EntryList // TODO: organize EntryList as a CDLL so we can locate the tail in constant-time. ObjectWaiter * Tail ; for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ; if (Tail == NULL) { _EntryList = w ; } else { Tail->_next = w ; w->_prev = Tail ; } // Fall thru into code that tries to wake a successor from EntryList } if (QMode == 4 && _cxq != NULL) { // Aggressively drain cxq into EntryList at the first opportunity. // This policy ensure that recently-run threads live at the head of EntryList. // Drain _cxq into EntryList - bulk transfer. // First, detach _cxq. // The following loop is tantamount to: w = swap (&cxq, NULL) w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } // Prepend the RATs to the EntryList if (_EntryList != NULL) { q->_next = _EntryList ; _EntryList->_prev = q ; } _EntryList = w ; // Fall thru into code that tries to wake a successor from EntryList } if (QMode == 1) { // QMode == 1 : drain cxq to EntryList, reversing order // We also reverse the order of the list. ObjectWaiter * s = NULL ; ObjectWaiter * t = w ; ObjectWaiter * u = NULL ; while (t != NULL) { guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ; t->TState = ObjectWaiter::TS_ENTER ; u = t->_next ; t->_prev = u ; t->_next = s ; s = t; t = u ; } _EntryList = s ; assert (s != NULL, "invariant") ; } else { // QMode == 0 or QMode == 2 _EntryList = w ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } }
具体的解释一下:
QMode=2:取_cxq排头位置的ObjectWaiter对象,调用ExitEpilog方法,唤醒ObjectWaiter所记录的线程,然后return。
QMode=3:取_cxq中的线程放入EntryList尾部。
QMode=4:取_cxq中的线程放入EntryList头部。
基本分析这几个,可以看见只是唤醒的动作有所区别,主要目的还是在唤醒线程,策略的不同可能导致的结果就是唤醒的线程的不同。
整理一下wait()四点:
1.包装成ObjectWaiter对象,并且状态是WAIT。
2.ObjectWaiter对象被放入到_WaitSet中,注意_WaitSet是一个环形双向链表。
3.释放锁(monitor),执行Exit方法
根据QMode(QMode是什么,咱不知道)不同,将ObjectWaiter从_cxq或者EntryList中唤醒。那么被唤醒的线程会继续执行挂起之前的代码,就会通过CAS去竞争锁,此时因为当前线程已经释放了锁,可以去竞争,至于哪一个去竞争成功,不知道。
4.挂起当前线程。
3.notify方法的HotSpot实现
void ObjectMonitor::notify(TRAPS) { ... ObjectWaiter * iterator = DequeueWaiter() ; ... }采用DequeueWaiter方法取出WaitSet中的第一个ObjectWaiter节点,具体看注释,very first waiter。其实虽说notify是随机唤醒一个线程,其实在HotSpot中还是有顺序的,只要我们找到一个点,那么就可以指定唤醒哪一个线程了。当然了,这样做,得不偿失
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; }
取出第一个ObjectWaiter节点之后,根据策略将节点加入EntryList或者_cxq中。
Policy=0:放入EntryList的排头位置
Policy=1:放入EntryList的尾部。
Policy=2:EntryList为空就放入EntryList,否则放入_cxq排头位置
Policy=3: 放入_cxq的尾部,注意放入的时候做过并发兼容,正常来说,只要紧跟着Tail放入Tail-next即可,但是考虑并发的话,需要有个循环,取Tail-next,然后放入,和J.C.U.T包里有些操作类似,读过的小伙伴肯定眼前一亮吧。
if (Policy == 0) { // prepend to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { List->_prev = iterator ; iterator->_next = List ; iterator->_prev = NULL ; _EntryList = iterator ; } } else if (Policy == 1) { // append to EntryList if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { // CONSIDER: finding the tail currently requires a linear-time walk of // the EntryList. We can make tail access constant-time by converting to // a CDLL instead of using our current DLL. ObjectWaiter * Tail ; for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ; assert (Tail != NULL && Tail->_next == NULL, "invariant") ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; } } else if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) { iterator->_next = NULL ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) { break ; } } else { while (Tail->_next != NULL) Tail = Tail->_next ; Tail->_next = iterator ; iterator->_prev = Tail ; iterator->_next = NULL ; break ; } } } else { ParkEvent * ev = iterator->_event ; iterator->TState = ObjectWaiter::TS_RUN ; OrderAccess::fence() ; ev->unpark() ; } if (Policy < 4) { iterator->wait_reenter_begin(this); }整理一下notify()两点:
1.从_WaitSet中取出第一个ObjectWaiter节点。
2.根据Policy的不同,将这个线程放入_EntryList或者_cxq中的起始或者尾部位置。
3.notifyAll()方法的HotSpot实现
void ObjectMonitor::notifyAll(TRAPS) { ... for (;;) { iterator = DequeueWaiter () ; if (iterator == NULL) break ; TEVENT (NotifyAll - Transfer1) ; ++Tally ; ... }
notifyAll的方法和notify方法类似,区别就是有一个for循环,将_WaitSet里面的ObjectWaiter节点全部取出来,然后也是根据Policy规则放入_EntryList和_cxq中。
4.根据例子去完善自己的理解
public class Test { public static void main(String[] args) throws Exception{ new Thread(new Thread1()).start(); try{ Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } new Thread(new Thread2()).start(); } } class Thread1 implements Runnable{ @Override public void run() { System.out.println("Thread1 start"); synchronized (Test.class){ System.out.println("Thread1 in lock"); try { Thread.sleep(5000); //1 System.out.println("Thread1 start wait"); Test.class.wait(); }catch (Exception e){ e.printStackTrace(); } System.out.println("Thread1 end wait and process on"); //5 } } } class Thread2 implements Runnable{ @Override public void run() { System.out.println("Thread2 start"); //2 synchronized (Test.class){ System.out.println("Thread2 in lock"); //3 try { System.out.println("Thread2 start notify"); Test.class.notify(); for(int i = 0;i<100;i++){ //4 System.out.print(i+"_"); if(i==50){ System.out.println("\n"); } } System.out.println("\n"); System.out.println("Thread2 end notify"); }catch (Exception e){ e.printStackTrace(); } } } }
Thread1 start Thread1 in lock //发生停顿 Thread2 start //发生停顿 Thread1 start wait Thread2 in lock Thread2 start notify 0_1_2_3_4_5_6_7_8_9_10_11_12_13_14_15_16_17_18_19_20_21_22_23_24_25_26_27_28_29_30_31_32_33_34_35_36_37_38_39_40_41_42_43_44_45_46_47_48_49_50_ 51_52_53_54_55_56_57_58_59_60_61_62_63_64_65_66_67_68_69_70_71_72_73_74_75_76_77_78_79_80_81_82_83_84_85_86_87_88_89_90_91_92_93_94_95_96_97_98_99_ Thread2 end notify Thread1 end wait and process on
在只有两个线程的时候,wait和notify的结果比较清晰。要注意的几点:
1.Thread1进行wait之后,就释放掉了当前的对象锁,然后Thread2此时就可以获取到对象锁。
2.可以发现Thread2在执行完notify之后仍然在继续运行余下的代码,当结束之后Thread1才继续执行它剩下的代码。也就是说,此时的锁仍然在Thread2,Thread1没有获取到锁。所以执行不了。因此notify只是起到一个通知的作用。
参考博客: