悲观锁
乐观锁
自旋锁
public class Test { public static void main(String[] args){ synchronized(Test.class){ System.out.println("hello"); } } }
截取部分字节码,如下
4: monitorenter 5: getstatic #9 // Field java/lang/System.out:Ljava/io/PrintStream; 8: ldc #15 // String hello 10: invokevirtual #17 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 13: aload_1 14: monitorexit
字节码出现了4: monitorenter和14: monitorexit两个指令;字面理解就是监视进入,监视退出。可以理解为代码块执行前的加锁,和退出同步时的解锁
objectMonitor.cpp ObjectMonitor() { _header = NULL; _count = 0; \\用来记录该线程获取锁的次数 _waiters = 0, _recursions = 0; \\锁的重入次数 _object = NULL; _owner = NULL; \\当前持有ObjectMonitor的线程 _WaitSet = NULL; \\wait()方法调用后的线程等待队列 _WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; \\阻塞等待队列 FreeNext = NULL ; _EntryList = NULL ; \\synchronized 进来线程的排队队列 _SpinFreq = 0 ; _SpinClock = 0 ; \\自旋计算 OwnerIsThread = 0 ; }
void ATTR ObjectMonitor::enter(TRAPS) { ... //获取锁:cmpxchg_ptr原子操作,尝试将_owner替换为自己,并返回旧值 cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; ... // 重复获取锁,次数加1,返回 if (cur == Self) { _recursions ++ ; return ; } //首次获取锁情况处理 if (Self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); _recursions = 1 ; _owner = Self ; OwnerIsThread = 1 ; return ; } ... //尝试自旋获取锁 if (Knob_SpinEarly && TrySpin (Self) > 0) { ...
void ATTR ObjectMonitor::exit(TRAPS)...
代码太长,就不贴了。主要是recursions减1、count减少1或者如果线程不再持有owner(非重入加锁)则设置owner为null,退锁的持有状态,并唤醒Cxq队列的线程总结
public synchronized void lock(){ System.out.println("world"); } .... public synchronized void lock(); descriptor: ()V flags: (0x0029) ACC_PUBLIC, ACC_SYNCHRONIZED Code: stack=2, locals=0, args_size=0 0: getstatic #20 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #26 // String world 5: invokevirtual #28 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
class ObjectWaiter : public StackObj { public: enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ; enum Sorted { PREPEND, APPEND, SORTED } ; ObjectWaiter * volatile _next; ObjectWaiter * volatile _prev; Thread* _thread; ParkEvent * _event; volatile int _notified ; volatile TStates TState ; Sorted _Sorted ; // List placement disposition bool _active ; // Contention monitoring is enabled public: ObjectWaiter(Thread* thread); void wait_reenter_begin(ObjectMonitor *mon); void wait_reenter_end(ObjectMonitor *mon); };
调用对象锁的wait()方法时,线程会被封装成ObjectWaiter,最后使用park方法挂起
//objectMonitor.cpp void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS){ ... //线程封装成 ObjectWaiter对象 ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; ... //一系列判断操作,当线程确实加入WaitSet时,则使用park方法挂起 if (node._notified == 0) { if (millis <= 0) { Self->_ParkEvent->park () ; } else { ret = Self->_ParkEvent->park (millis) ; } }
而当对象锁使用notify()时
Atomic::cmpxchg_ptr
指令自旋操作加入cxq队列或者直接unpark唤醒void ObjectMonitor::notify(TRAPS){ CHECK_OWNER(); //waitSet为空,则直接返回 if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } ... //通过DequeueWaiter获取_WaitSet列表中的第一个ObjectWaiter Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; if (iterator != NULL) { .... if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } }
voidObjectMonitor::notifyAll(TRAPS)
,流程和notify类似。不过会通过for循环取出WaitSet的ObjectWaiter节点,再依次唤醒所有线程偏向锁
轻量级锁
重量级锁
自旋锁
锁粗化
Test.class //编译器会考虑将两次加锁合并 public void test(){ synchronized(this){ System.out.println("hello"); } synchronized(this){ System.out.println("world"); } }
锁消除
-server -XX:+DoEscapeAnalysis -XX:+EliminateLocks
//StringBuffer的append操作会加上synchronized, //但是变量buf不加锁也安全的,编译器会把锁消除 public void test() { StringBuffer buf = new StringBuffer(); buf.append("hello").append("world"); }
其他锁优化方法
volatile int i = 0; i++
中,volatile类型的读写是原子同步的,但是i++却不能保证同步性,我们该怎么呢?int expectedValue = 1; public boolean compareAndSet(int newValue) { if(expectedValue == 1){ expectedValue = newValue; return ture; } return false; }
在jdk是有提供同步版的CAS解决方案,其中使用了UnSafe.java的底层方法
//UnSafe.java @HotSpotIntrinsicCandidate public final native boolean compareAndSetInt(Object o, long offset, int expected, int x) .. @HotSpotIntrinsicCandidate public final native int compareAndExchangeInt(Object o, long offset, int expected, int x)...
我们再来看看本地方法,Unsafe.cpp中的compareAndSwapInt
//unsafe.cpp UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); oop p = JNIHandles::resolve(obj); jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END
在Linux的x86,Atomic::cmpxchg方法的实现如下
/** 1 __asm__表示汇编的开始; 2 volatile表示禁止编译器优化;//禁止指令重排 3 LOCK_IF_MP是个内联函数, 根据当前系统是否为多核处理器, 决定是否为cmpxchg指令添加lock前缀 //内存屏障 */ inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { int mp = os::is_MP(); __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" : "=a" (exchange_value) : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) : "cc", "memory"); return exchange_value; }
到这一步,可以总结到:jdk提供的CAS机制,在汇编层级,会禁止变量两侧的指令优化,然后使用cmpxchg指令比较并更新变量值(原子性),如果是多核则使用lock锁定(缓存锁、MESI)
ABA问题
只能保证一个共享变量的原子操作
先说说实现锁的要素
要素1:可以利用CAS的原子性来实现,任意时刻只有一个线程能成功操作变量
要素2:使用volatile修饰状态变量,禁止指令重排
要素3:还是用volatile,volatile变量写指令前后会插入内存屏障
//伪代码 volatile state = 0 ; // 0-无锁 1-加锁;volatile禁止指令重排,加入内存屏障 ... if(cas(state, 0 , 1)){ // 1 加锁成功,只有一个线程能成功加锁 ... // 2 同步代码块 cas(state, 1, 0); // 3 解锁时2的操作具有可见性 }
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time);
class PlatformParker : public CHeapObj { protected: //互斥变量类型 pthread_mutex_t _mutex [1] ; //条件变量类型 pthread_cond_t _cond [1] ; ... } class Parker : public os::PlatformParker { private: volatile int _counter ; ... public: void park(bool isAbsolute, jlong time); void unpark(); ... }
unpark和park执行顺序不同时,counter和cond的状态变化如下
//AbstractQueuedSynchronizer.java public class AbstractQueuedSynchronizer{ //线程节点 static final class Node { ... volatile Node prev; volatile Node next; volatile Thread thread; ... } .... //head 等待队列头尾节点 private transient volatile Node head; private transient volatile Node tail; // The synchronization state. 同步状态 private volatile int state; ... //提供CAS操作,状态具体的修改由子类实现 protected final boolean compareAndSetState(int expect, int update) { return STATE.compareAndSet(this, expect, update); } }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
在AQS还存一个ConditionObject的内部类,它的使用机制和Object.wait、notify类似
//AbstractQueuedSynchronizer.java public class ConditionObject implements Condition, java.io.Serializable { //条件队列;Node 复用了AQS中定义的Node private transient Node firstWaiter; private transient Node lastWaiter; ...
//类似Object.wait public final void await() throws InterruptedException{ ... Node node = addConditionWaiter(); //构造Node,加入条件队列 int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //挂起线程 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //notify唤醒线程后,加入同步队列继续竞争锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
//类似Object.notify private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
protected boolean tryAcquire(int arg);//尝试独占性加锁 protected boolean tryRelease(int arg);//对应tryAcquire释放锁 protected int tryAcquireShared(int arg);//尝试共享性加锁 protected boolean tryReleaseShared(int arg);//对应tryAcquireShared释放锁 protected boolean isHeldExclusively();//该线程是否正在独占资源,只有用到condition才需要取实现它
abstract static class Sync extends AbstractQueuedSynchronizer{ .... final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //直接CAS状态加锁,非公平操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... //重写了tryRelease protected final boolean tryRelease(int releases) { c = state - releases; //改变同步状态 ... //修改volatile 修饰的状态变量 setState(c); return free; } }
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } .... static final class FairSync extends Sync { protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ....
public class TwinsLock implements Lock { private final Sync sync = new Sync(2); @Override public void lockInterruptibly() throws InterruptedException { throw new RuntimeException(""); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {throw new RuntimeException("");} @Override public Condition newCondition() { return sync.newCondition(); } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } } @Override public boolean tryLock() { return sync.tryAcquireShared(1) > -1; } }
再来看看Sync的代码
class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero"); } setState(count); } @Override public int tryAcquireShared(int reduceCount) { for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int returnCount) { for (; ; ) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } public Condition newCondition() { return new AbstractQueuedSynchronizer.ConditionObject(); } }