C/C++教程

源码分析Lock实现原理 - ReentrantLock

本文主要是介绍源码分析Lock实现原理 - ReentrantLock,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

从jdk1.5出现的Lock接口,与synchronized相比,提供了更广泛的操作,可以有不同的属性,支持多个关联对象

目录

前提条件

lock加锁实现

unlock释放锁实现


前提条件

先看它的一个实现ReentrantLock两个继承关系图:

          

ReentrantLock有一个抽象静态内部类Sync,FairSync和NonfairSync分别是Sync公平和非公平锁的两个实现

知道了这些前提,使用Lock锁先要初始化ReentrantLock实例,ReentrantLock有两个构造方法:

/** Synchronizer providing all implementation mechanics  同步器提供所有实现 */
private final Sync sync;
public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Sync是抽象的要初始化ReentrantLock给sync赋值就要找它的实现类,无参构造给了非公平实现,有参构造传入true 的时候才是公平锁实现

lock加锁实现

    public void lock() {
        sync.lock();
    }

这里实现交给了sync,初始化的时候是NonfairSync,那就是非公平锁的lock,如下:

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

 非公平锁不遵循等待,是会竞争抢夺锁资源,抢夺的方式就是cas无锁操作,这里先通过compareAndSetState(0,1)判断是否操作成功,这两个参数第一个0是希望当前线程是第一个抢占资源的,初始state值会是0,第二个参数是希望设置成1,内部使用unsafe的cas操作判断state是否是0,是就把state设置成1并返回true,说名抢锁成功随即设置当前线程为资源独占线程,由于state值不是0了,其它线程在这里执行cas都会返回false,else分支执行下面的代码(公平锁的lock实现执行的代码也是下面这块,只是实现不同)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire的非公平锁实现:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

先获取state同步状态值

如果是0,说明当前线程读取state之前没有任何线程获得锁,然后通过cas操作修改state,修改成功即抢锁成功,设置当前线程独占之后返回true,否则返回false抢锁失败

如果不是0,有线程已经抢占,判断抢占的线程是否是当前线程,如果是,当前加锁是重入锁,state值加1,表示加锁次数,已经是当前线程无需设置独占,返回true,如果不是当前线程返回false抢锁失败

如果tryAcquire返回了true,就没有后面的事了,只有抢锁失败返回false才会执行后面方法,到这里就可以看出,非公平锁只是在首次抢夺的时候直接通过cas方式抢锁,如果抢锁失败且当前线程不是占用锁的线程,就会乖乖的去排队,和公平锁执行一样了。再看addWaiter内容:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // unsafe操作
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // unsafe操作
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // unsafe操作
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

获取tail尾部节点,判断是否为空

不为空,新添加的节点的pre设置成原尾部节点,cas操作把新节点设置成尾节点,如果设置成功,把原尾节点的next设置成新节点,返回新节点,如果设置失败继续往下执行

上面执行完成,进入一个死循环,目的是一定要把node加到链表的尾部,还是先获取尾节点tail,如果是空就初始化一个放到head,tail也指向head首尾一样,保证下次循环tail不为空,再次循环进入else(一种是上次循环new的放到了head,一种是原来链表就有tail),cas操作把新node设置到tail,如果成功tail的next指向新节点,返回旧tail,如果不成功会一直循环下去,直到cas设置成功

enq执行完成并没有使用返回值,addWaiter是把新node返回,返回执行acquireQueued方法:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里也是一个死循环,predecessor是prev节点,判断prev是否是head节点,如果是就尝试获取锁,这里tryAcquire还是非公平锁实现,可能在try的过程中head释放了锁就能抢成功,

如果成功当前node设置成head,prev的next设置成null和当前node断开引用帮助gc回收,返回true抢锁成功。这里failed是在抢锁成功之后设置成false,这样finally里的cancelAcquire就不会执行,如果抢锁过程出现异常(predecessor中可能有NullPointerException,tryAcquire中有Error),会执行cancelAcquire,这里会把waitStatus设置成1,并且跳过自己,把pre的next指向自己的next(奇怪的是并没有把自己的next中的pre指向自己的prev,因为找pre的时候使用while循环找到那个没有waitStatus不大于0的pre),然后唤醒next节点中的thread,当前被跳过的node为了gc回收,让next指向了自己,但是prev依然是前面可用的引用,这点有点没想明白。前面处理的是前后都有节点的逻辑,如果自己的尾节点,直接cas把pre的next设置成空。

如果失败执行shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 第一次进来waitStatus是0
        if (ws == Node.SIGNAL) // signal是-1
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
                这里会把waitStatus设置成-1,目地是下一次循环走上面的if返回true
                从signal也可以看出这个线程需要一个信号,要信号干什么?要一个信号继续往下
                执行,也就是当前线程要阻塞在这里
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }


    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);  // 阻塞,等待锁被释放
        return Thread.interrupted();
    }

    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

首次循环shouldParkAfterFailedAcquire这个方法返回false,waitStatus被改成-1,parkAndCheckInterrupt方法不会执行,下次循环shouldParkAfterFailedAcquire返回true,执行parkAndCheckInterrupt当前线程阻塞在this对象这里,等待别的线程唤醒它。

下面被唤醒之后首先执行了一下interrupted方法重置了打断状态

如果之前没有打断过就没有影响,这里返回false,到acquiteQueued中加锁成功后interrupted也是false直接返回,直到返回acquire方法中什么也不用执行

如果之前被打断过会返回true,到acquireQueued方法中会把interrupted设置成true,在下次抢锁成功会返回这个interrupted,回到acquire方法中会执行selfInterrupt再次打断,这种情况是因为等待加锁的线程在等待期间,线程被调用interrupt方法,如果这个线程没有被唤醒,按照前面分析会重置打断状态,抢到了锁之后又重新设置了打断,应该是为了前后保持状态统一,这个也是可以模拟出来的(准备中)

unlock释放锁实现

    public void unlock() {
        sync.release(1);
    }

 释放锁实现交给了sync的release方法,因为有锁重入,所以每次释放state值减一,直到减到0完全释放,释放锁不区分公平与非公平,都是用的一个方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

 tryRelease方法让state值减1,感觉它这个应该先判断当前线程是不是独占线程,是的话再让state减1,避免异常导致前面state计算浪费时间,只要state没有减到0,都不算释放锁成功只更新state值,state到0之后置空当前独占线程,返回true

下面判断head节点如果是空,说明后面没有排队抢锁线程,直接返回true,如果不为空并且waitStatus不等于0,前面加锁过程分析可以看到大于0的时候是因为线程抢锁异常取消抢锁,小于0的时候是排队阻塞的线程等待被唤醒。

小于0的node,首先通过cas重置0,如果有next排队线程,就unpark唤醒。

前面分析取消锁的线程不会正常调用unlock,如果调用就会进入因为waitStatus大于0寻找有效的next,并unpark唤醒它。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 这里释放锁之后只通知next节点一个线程
            LockSupport.unpark(s.thread);
    }

有些没有想明白的地方,或者有误的地方,希望大家指点。

这篇关于源码分析Lock实现原理 - ReentrantLock的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!