暂且把面试的事搁下,咱们今天来聊 LockSupport.park() 和 LockSupport.unpark() 的底层原理。
为什么会聊到这两个方法呢?原因是在阅读 AQS 的源码的时候发现这两个方法调用的次数非常多!所以在继续深入阅读 AQS 源码之前,先来熟悉一下 LockSupport.park() 和 LockSupport.unpark() 的底层实现,为后续 AQS 的学习打下基础。
park 翻译成中文是 "停放" 的意思,在代码中的该方法含义是 "挂起" 当前线程。
实际上,LockSupport 类中所提供的方法有许多,常用的有下面这几个:
其中 unpark 用于唤醒线程,其他三个均用于挂起线程。
挂起线程又分为无限期和有限期挂起,对应到线程的状态是 WAITING(无限期等待)和 TIMED_WAITING(限期等待)。
其他线程调用了 unpark 方法,参数为被挂起的线程
The call spuriously (that is, for no reason) returns.
由于虚假唤醒的存在,在调用 park 时一般采用自旋的方式,伪代码如下:
while (!canProceed()) { ... LockSupport.park(this); } 复制代码
This class associates, with each thread that uses it, a permit. 复制代码
Java 文档里说到了,每个线程都关联一个许可(permit)。
当许可可用时,调用 park 会立即返回,否则可能(虚假唤醒则不会被挂起)被挂起;如果许可不可用时,调用 unpark 会使得许可变成可用,而如果许可本身是可用时,调用 unpark 不会有任何影响。
public static void exampleOne() { Thread thread = new Thread(() -> { while (flag) { } System.out.println("before first park"); LockSupport.park(); System.out.println("after first park"); LockSupport.park(); System.out.println("after second park"); }); thread.start(); flag = false; sleep(20); System.out.println("before unpark."); LockSupport.unpark(thread); } 复制代码
before first park before unpark after first park 复制代码
首先,许可初始是不可用的;所以在调用 park 后 thread 被挂起,后续主线程调用了 unpark 方法唤醒了被挂起的 thread,输出 after first park
,紧接着 thread 调用 park 继续被挂起。
private static void exampleTwo() { Thread thread = new Thread(() -> { while (flag) { } System.out.println("before first park"); LockSupport.park(); System.out.println("after first park"); LockSupport.park(); System.out.println("after second park"); }); thread.start(); LockSupport.unpark(thread); LockSupport.unpark(thread); flag = false; } 复制代码
before first park after first park 复制代码
主线程先对 thread 执行两次 unpark 操作,然后 thread 再连续调用两次 park 方法,结果发现,第二个 park 会挂起 thread;这里主要体现了 unpark 效果不会被累积,当许可可用时,调用 unpark 方法不会产生任何效果。
private static void exampleThree() { Thread thread = new Thread(() -> { System.out.println("before first park"); LockSupport.park(); System.out.println("after first park"); LockSupport.park(); System.out.println("after second park"); System.out.println("isInterrupted is " + Thread.interrupted()); System.out.println("isInterrupted is " + Thread.interrupted()); LockSupport.park(); System.out.println("after third park"); }); thread.start(); sleep(200); thread.interrupt(); } 复制代码
before first park after first park after second park isInterrupted is true isInterrupted is false 复制代码
thread 先后共调用了三次 park,前两次调用没啥区别,在第三次调用之前调用了两次 Thread.interrupted();从输出结果来看,发现前两次 park 并没有生效,只有第三次 park 将线程挂起了,Why?
我们先来看 Thread.interrupted() 的作用:**判断当前线程的中断状态,同时将中断状态清除。**实际上这里只需要调用一次 Thread.interrupted() 即可,调用了两次是为了查看线程中断状态的变化。
当线程的中断状态为 true 时,park 失去了效果,不会挂起线程;而当调用了 Thread.interrupted() 将中断状态清除之后,park 又恢复了效果。
所以这里可以得出的结论:线程中断会使 park 失效。
重头戏来啦!上面讲到了 park 和 unpark 的几个方法,其实它们最终对应于 UNSAFE.park(boolean isAbsolute, long time) 和 UNSAFE.unpark(Object thread) 这两个 native 方法。
下面我们就去看看这两个 native 方法是如何实现的。
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); EventThreadPark event; #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ ... UNSAFE_END 复制代码
这里有几个分支判断,不过可以看出无论是哪个分支,最终都会走 park(bool isAbsolute, jlong time)
void Parker::park(bool isAbsolute, jlong time) { ... if (Atomic::xchg(0, &_counter) > 0) return; Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; ... if (Thread::is_interrupted(thread, false)) { return; } // Next, demultiplex/decode time arguments timespec absTime; if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all return; } if (time > 0) { unpackTime(&absTime, isAbsolute, time); } ... if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { return; } int status ; if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; // Paranoia to ensure our locked and lock-free paths interact // correctly with each other and Java-level accesses. OrderAccess::fence(); return; } assert(_cur_index == -1, "invariant"); if (time == 0) { _cur_index = REL_INDEX; // arbitrary choice when not timed status = pthread_cond_wait (&_cond[_cur_index], _mutex) ; } else { _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { pthread_cond_destroy (&_cond[_cur_index]) ; pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr()); } } ... _counter = 0 ; status = pthread_mutex_unlock(_mutex) ; OrderAccess::fence(); // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } } 复制代码
首先通过 Atomic::xchg(0, &_counter) 方法将 _counter 置 0,如果原来的 _counter > 0,说明原来的许可是可用的,直接返回;
Thread::is_interrupted(thread, false) 只会判断线程的中断状态,不会重置其中断状态;Thread.interrupted() 调用的是 Thread::is_interrupted(thread, true),判断线程的中断状态同时将其重置。
关于 pthread_cond_wait 可以看一下这篇文章: pthread_cond_wait 详解
感觉和 Object.wait、Object.notify、Object.notifyAll 的机制很类似。
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) UnsafeWrapper("Unsafe_Unpark"); Parker* p = NULL; if (jthread != NULL) { oop java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { jlong lp = java_lang_Thread::park_event(java_thread); if (lp != 0) { // This cast is OK even though the jlong might have been read // non-atomically on 32bit systems, since there, one word will // always be zero anyway and the value set is always the same p = (Parker*)addr_from_java(lp); } else { // Grab lock if apparently null or using older version of library MutexLocker mu(Threads_lock); java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { JavaThread* thr = java_lang_Thread::thread(java_thread); if (thr != NULL) { p = thr->parker(); if (p != NULL) { // Bind to Java thread for next time. java_lang_Thread::set_park_event(java_thread, addr_to_java(p)); } } } } } } if (p != NULL) { #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__unpark, p); #else /* USDT2 */ HOTSPOT_THREAD_UNPARK( (uintptr_t) p); #endif /* USDT2 */ p->unpark(); } UNSAFE_END 复制代码
前面一大段代码是在给 Parker* p 赋值,最终调用的是 p 的 unpark 方法:p->unpark()。
void Parker::unpark() { int s, status ; status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; s = _counter; _counter = 1; if (s < 1) { // thread might be parked if (_cur_index != -1) { // thread is definitely parked if (WorkAroundNPTLTimedWaitHang) { status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant"); status = pthread_cond_signal (&_cond[_cur_index]); assert (status == 0, "invariant"); } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } else { pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } } 复制代码
unpark 方法其实很简单,首先通过 pthread_mutex_lock 获取锁,然后将 _counter 置为 1,再判断当前是否有线程被挂起,如果有,则通过 pthread_cond_signal 唤醒被挂起的线程,然后释放锁。
就好比看到 pthread_cond_wait 的机制时,就会联想到 Object.wait,因为两者的实现有很多相像的地方,理解起来也就比较简单。
最后留一个问题,大家可以思考一下:LockSupport.park 和 Object.wait 两者有何区别?