使用synchronized关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。虽然这种方式简化了同步的管理,但是扩展性没有显示的锁获取和释放来的好。
lock.lock(); try{ } finally{ lock.unlock(); }
在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。
不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。
可重入锁,顾名思义,就是支持重进入的锁。它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。
ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
ReentrantLock提供了一个构造函数,能够控制锁是否是公平的:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock是通过组合自定义同步器(AQS)来实现锁的获取与释放。
已获得锁的线程重新获取锁
以非公平性(默认的)实现为例:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。
锁的最终释放
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)
方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
公平方式获取锁:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
他们获取锁的方式都是:首先判断state
是否为0,如果为0,说明这个锁还没有被线程占有,那么就CAS将state
的值加一(compareAndSetState(0, acquires)
),然后将该线程设置为具有这把锁的访问权限的线程(setExclusiveOwnerThread(current)
);如果state
不等于0,说明这把锁已经被某个线程对象持有了,接下来判断想要获取这把锁的线程是不是已经拥有这把锁的线程(判断重入),如果是的话,就将state
的值用CAS的方式加一,并且返回true,表示重入获取锁成功。否则直接返回false,表示获取锁失败。
公平方式获取锁tryAcquire(int acquires)
与非公平方式获取锁nonfairTryAcquire(int acquires)
的唯一的不同点就是当state
为0即这个锁还没有被线程占有时,会先调用hasQueuedPredecessors()
方法判断加入了同步队列中当前节点是否有前驱节点,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
ReentrantLock和synchronized都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
ReentrantReadWriteLock
实现了ReadWriteLock
接口,ReadWriteLock
仅定义了获取读锁和写锁的两个方法,即readLock()
方法和writeLock()
方法;而ReentrantReadWriteLock
除了接口方法之外,还提供了一些便于外界监控其
内部工作状态的方法:
ReentrantReadWriteLock
有五个内部类,Sync继承自AQS、NonfairSync和FairSync继承自Sync类(通过构造函数传入的布尔值决定要构造哪一种Sync实例);ReadLock和WriteLock实现了Lock接口:
读锁ReadLock
是共享锁:
public void lock() { sync.acquireShared(1); }
写锁WriteLock
是独占锁:
public void lock() { sync.acquire(1); }
但是一个AQS对象只有一个state
,想要表示两种不同的状态,就需要使用“按位切割”的方法,将整型的state
的二进制位数切分为两部分,高16位表示读,低16位表示写(比如这张图就说明一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁):
假设当前同步状态值为c
获取写状态(获取写锁数量exclusiveCount
): c & ((1 << 16) - 1) (0x0000ffff
/65535
)(将高16位全部抹去)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
获取读状态(获取读锁数量sharedCount
): c >>> 16(无符号右移16位)
static final int SHARED_SHIFT = 16; static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
写状态加一: c + 1
compareAndSetState(c, c + 1)
读状态加一: c +(1<<16)
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); compareAndSetState(c, c + SHARED_UNIT)
c不等于0时,当写状态等于0时,则读状态一定大于0,即读锁已被获取。
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁的获取由tryAcquire()
实现:
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); //写线程数量(即获取独占锁的重入数) int w = exclusiveCount(c); //当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁 if (c != 0) { //总锁数量不为0,但写锁数量为0,说明读锁数量不为0,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取。原因在于:读写锁要确保写锁的操作对读锁可见。如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取。而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。
写锁的释放与ReentrantLock
的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
protected final boolean tryRelease(int releases) { //若锁的持有者不是当前线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //写锁的新线程数 int nextc = getState() - releases; //如果独占模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) //若写锁的新线程数为0,则将锁的持有者设置为null setExclusiveOwnerThread(null); //设置写锁的新线程数 //不管独占模式是否被释放,更新独占重入数 setState(nextc); return free; }
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。
protected final int tryAcquireShared (int unused){ for (;;) { int c = getState(); int nextc = c + (1 << 16); if (nextc < c) throw new Error("Maximum lock count exceeded"); if (exclusiveCount(c) != 0 && owner != Thread.currentThread()) return -1; if (compareAndSetState(c, nextc)) return 1; } }
如果其他线程已经获取了写锁(exclusiveCount(c) != 0
),则当前线程获取读锁失败,进入等待状态(返回-1)。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁(返回1)。
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是(1<<16)。
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; }
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指在已获取一把写锁的前提下,再获取到读锁,随后释放先前拥有的写锁的过程。
LockSupport
是一个编程工具类,主要是为了阻塞和唤醒线程用的。
它的内部其实两类主要的方法:park
(停车阻塞线程)和unpark
(启动唤醒线程)。他们也都是调用的UnSafe
类的park
和unpark
方法。
释放锁方法unpark()
可以优先于获取锁park()
方法调用:
释放锁unpark()
可以多次调用
LockSupport
锁是不可重入的,他不像synchronized
那样可重入是通过对ObjectWaiter
的_count
进行加减操作,也不像ReentrantLock
那样可重入是通过对AQS
的state
进行加减操作,他只有一个“许可证”,这个许可证如果被一个线程占有了,不但其他线程拿不到,即便是已拥有“许可证”的线程想要重新获取都是不可以的,只有等该线程释放掉“许可证”后才可以重新获取“许可证”。
写一段例子代码,线程A执行一段业务逻辑后调用wait阻塞住自己。主线程调用notify方法唤醒线程A,线程A然后打印自己执行的结果。
public class Main { public static void main(String[] args) { final Object lock = new Object(); Thread A = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { sum += i; } try { lock.wait(); } catch (Exception e) { e.printStackTrace(); } System.out.println(sum); }); A.start(); Thread.sleep(1000); lock.notify(); } }
这段代码会报非法监视器异常IllegalMonitorStateException
:
因为wait()、notify()/notifyAll()方法只能在同步代码块中使用:
public class Main { public static void main(String[] args) { final Object lock = new Object(); Thread A = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { sum += i; } try { synchronized (lock) { lock.wait(); } } catch (Exception e) { e.printStackTrace(); } System.out.println(sum); }); A.start(); Thread.sleep(1000); synchronized (lock) { lock.notify(); } } }
使用synchronized给lock对象上锁后再调用wait()/notify()方法就不会报错了。
使用LockSupport
也可以实现:
public class Main { public static void main(String[] args) { Thread A = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { sum += i; } LockSupport.park(); System.out.println(sum); }); A.start(); Thread.sleep(1000); LockSupport.unpark(A); } }
对于wait()/notify()而言,线程A启动后需要调用Thread.sleep()
方法确保线程A执行完毕进入wait状态,如果没有这句代码,很有可能导致线程A还没有执行完累加操作,就已经调用了外面的notify()
方法,即在wait()
方法之前调用notify()
方法,导致线程永远不会被唤醒:
public class Main { public static void main(String[] args) { final Object lock = new Object(); Thread A = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { sum += i; } try { synchronized (lock) { lock.wait(); System.out.println("wait"); } } catch (Exception e) { e.printStackTrace(); } System.out.println(sum); }); A.start(); //Thread.sleep(1000); synchronized (lock) { lock.notify(); System.out.println("notify"); } } }
而使用LockSupport
则永远不会产生类似问题:
public class Main { public static void main(String[] args){ Thread A = new Thread(() -> { int sum = 0; for (int i = 0; i < 10; i++) { sum += i; } LockSupport.park(); System.out.println(sum); }); A.start(); //Thread.sleep(1000); LockSupport.unpark(A); } }
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法:
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();
ConditionObject
是同步器AbstractQueuedSynchronizer
的内部类,实现了Condition
接口。每个Condition
对象都包含着一个队列(以下称为等待队列),该队列是Condition
对象实现等待/通知功能的关键。
每个ConditionObject对象都包含一个等待队列,ConditionObject对象拥有首节点(firstWaiter)和尾节点(lastWaiter)。等待队列是一个FIFO的队列,在队列中的每个节点(AbstractQueuedSynchronizer.Node)都包含了一个线程引用,该线程就是在ConditionObject对象上等待的线程,如果一个线程调用了它的await()
方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态(addConditionWaiter()
):
ConditionObject对象拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock对象(更确切地说是同步器)拥有一个同步队列和多个等待队列(一个Lock对象可以调用多次newCondition()
方法获得多个Condition对象):
调用ConditionObject对象的await()
方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()
方法返回时,当前线程一定获取了ConditionObject对象相关联的锁。
如果从队列(同步队列和等待队列)的角度看await()
方法,调用await()
方法时相当于将同步队列的首节点(获取了锁的节点)移动到等待队列中(添加为等待队列的尾结点):
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //当前线程加入等待队列 Node node = addConditionWaiter(); //释放同步状态(释放锁) int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
调用await()
的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用signal()
方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException
。
调用await()
方法时相当于将等待队列的首节点(获取了锁的节点)移动到同步队列中(添加为同步队列的尾结点)并唤醒:
public final void signal() { //当前线程必须是获取了锁的线程 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将等待队列的头结点添加到同步队列(作为同步队列的尾结点) Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //唤醒刚刚从等待队列加入到同步队列的结点 LockSupport.unpark(node.thread); return true; }
signalAll()
方法相当于对等待队列中的每个节点均执行一次signal()
方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。