首先,强调一点,ReentrantLock默认的构造器就是非公平锁。
public ReentrantLock() { sync = new NonfairSync(); }
final void lock() { acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
!tryAcquire(arg)的源码为
//这里写一下state++,表示一下,看的清晰 ReentrantLock lock = new ReentrantLock(); //三次加锁 state = 3 lock.lock(); lock.lock(); lock.lock(); //三次解锁 lock.unlock(); lock.unlock(); lock.unlock();
protected final boolean tryAcquire(int acquires) { //给当前线程赋值 final Thread current = Thread.currentThread(); //获得这个线程的state 这里getState()方法我也放下面了 //state也放下面了 int c = getState(); //判断下获取到的state是不是0 if (c == 0) { //如果是零 // hasQueuedPredecessors() 判断它的前面是否有节点 // 如果没有节点 则 !hasQueuedPredecessors()就是true //compareAndSetState(0, acquires))通过CAS的方式将state设为1 //如果成功将state设置为1,则compareAndSetState(0, acquires)) //返回true(这句话连着上句话)就是这个线程成功获得锁 //因为这里可能会有多个线程同时进来,所以要使用CAS //如果多个线程来枪锁,只有一个能抢到,剩下的就返回false //然后去入队了 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //保证了这是第一个获得锁的线程,并且成功让他获得了锁 //这里获得锁的意思就是将state设置为1 //将其设置为当前获得锁的线程 setExclusiveOwnerThread(current); //返回true return true; } } //以下代码判断的是是否为重入,如果是重入的话就state++ //最后返回true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //如果既不是第一个获得锁的线程,也不是重入,返回false(需要入队) //就是等待锁被释放,之后这个线程在获得锁 return false; } } //getState() protected final int getState() { return state; } //state 注 这里务必要用 volatile修饰,为的是保证其可见性 private volatile int state;
private Node addWaiter(Node mode) { //初始化节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 这里判断加入的节点前面有没有节点 // 如果前面有节点就执行下面这段代码 if (pred != null) { node.prev = pred; //通过CAS锁的方式让tail指向node if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果是插入的第一个结点,就执行enq(node); enq(node); return node; } private Node enq(final Node node) { //for(;;)相当于while(true) for (;;) { Node t = tail; if (t == null) { // Must initialize //这里 new 了一个结点 给 head if (compareAndSetHead(new Node())) //这里让 head 和 tail指向同一个结点 //然后继续循环 //注意,此时还没有用到 传进来的node tail = head; } else { node.prev = t; //直到成功的将尾结点 tail通过CAS改为node //然后返回t //这里可能存在线程安全问题,所以用CAS if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
1. 插入之前已经有线程在排队了
2. 第一个来排队的结点
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } //**************shouldParkAfterFailedAcquire(p, node)****************** static final int SIGNAL = -1; //总体来讲,就是要把前一个结点的waitStatus设置为-1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //**************parkAndCheckInterrupt()******************** //这个方法就是通过park方法锁住当前线程 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //释放锁成功 Node h = head; if (h != null && h.waitStatus != 0) //unpark(); unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { //先释放头head int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //释放第二个 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //释放成功 //这里释放成功 LockSupport.unpark(s.thread); }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
//*******************************公平锁****************************** protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //主要就是这不一样 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } //*******************************非公平锁****************************** final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //这里少了一个判断等待队列中有没有 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }