Java教程

Java并发之AQS原理解读(三)

本文主要是介绍Java并发之AQS原理解读(三),对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

上一篇:Java并发之AQS原理解读(二)

前言

本文从源码角度分析AQS共享锁工作原理,并介绍下使用共享锁的子类如何工作的。

共享锁工作原理

共享锁与独占锁的不同之处在于,获取锁和释放锁成功后,都会循环唤醒从表头开始的第一个阻塞结点,直到表头没有改变。

doReleaseShared方法存在无效的调用,即存在无效的线程唤醒,但为了避免程序出现问题,无伤大雅。PROPAGATE状态用于setHeadAndPropagate方法判断是否唤醒阻塞结点。

获取锁

1、先尝试 tryAcquireShared,如果获取锁成功直接返回;
2、否则,addWaiter先将线程封装成Node入队,再判断当前节点的前驱是否是head头结点,是的话尝试tryAcquireShared获取锁,如果锁资源>=0则将当前节点设置为head并循环唤醒从表头开始的第一个等待结点,直到表头节点没有改变;不是头结点的话,将当前节点的前驱的waitStatus设为SIGNAL,并阻塞当前节点。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    // 结点入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置为头结点并唤醒后继结点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 当发生线程中断时,补偿设置中断标识
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 设置前驱结点等待状态 waitStatus 为 SIGNAL,并阻塞自己
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    
    /** 
      * 当锁资源大于 0 时,或者旧头结点 waitStatus 为 PROPAGATE 或者新节点 waitStatus 为  PROPAGATE
      * 并且新节点的下一节点是可共享的时,唤醒下一个结点
      * 
      * h == null 和 (h = head) == null 一定不成立,因为之前调用了 addWaiter
      */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 当队列存储等待结点时
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 将过渡状态 0 设为可传播 PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 如果头结点发生了改变,则继续唤醒下一个等待结点
        if (h == head)
            break;
    }
}

释放锁

1、直接尝试释放锁,如果失败则返回;
2、如果释放锁成功,则循环唤醒从表头开始的第一个等待结点,直到表头节点没有改变。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 当队列存储等待结点时
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 将过渡状态 0 设为可传播 PROPAGATE
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        // 如果头结点发生了改变,则继续唤醒下一个等待结点
        if (h == head)
            break;
    }
}

CountDownLatch

state设置为初始化倒数,每次countDown都递减state。如果state != 0时,tryAcquireShared都返回没有资源,线程执行到await就写入队列阻塞;直到某个线程执行完state == 0tryAcquireShared返回有资源后,就会唤醒等待队列中的第一个线程往下执行。

// 设置state
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

// 递减 state
public void countDown() {
    sync.releaseShared(1);
}

protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

/**
  * 阻塞等待,直到 state == 0
  * 因为当 state == 0 时,tryAcquireShared 才返回正数,表示有锁资源
  * 从而 sync#acquireSharedInterruptibly 可以执行成功,不再阻塞
  */
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

参考:
setHeadAndPropagate源码分析

这篇关于Java并发之AQS原理解读(三)的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!