Java教程

AQS源码,你也可以

本文主要是介绍AQS源码,你也可以,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

需要有一定的JUC基础来进行观看。

文章目录

    • AQS源码
      • Lock.lock
        • acquire开始
        • tryAcquire(arg)(一)
        • acquireQueued(addWaiter(Node.EXCLUSIVE), arg))(二)
        • acquireQueued方法(三)
        • acquire结束
      • Lock.unlock
        • release
        • tryRelease
        • unparkSuccessor

AQS源码

图解大概描述了过程,使用state,类似于操作系统的信号量的操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E000uBCQ-1645527324567)(C:\Users\wang\AppData\Roaming\Typora\typora-user-images\1645521733726.png)]

初始化lock对象。这里默认是非公平锁,也就是true。以下讲解都会以非公平锁讲解,后面会讲其区别

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

一段同步代码是从lock()方法开始。

Lock.lock

public void lock() {
        sync.acquire(1);
    }

compareAndSetState(0, 1)指用CAS的方法将state由0变为1,底层是Unsafe.compareAndSwapInt()(后面章节会讲述)。

acquire开始

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Java实现的中断(调用Thread.interrupt()方法),实际就是从线程外界,修改线程内部的一个标志变量,或者让线程中的一些阻塞方法,抛出InterruptedException。以此”通知“线程去做一些事情, 至于做什么,做不做,实际完全是由线程内的业务代码自己决定的。不过一般都是释放资源并结束线程。

这里补充interrupt方法:如果线程处于被阻塞状态,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。仅此而已。如果线程处于正常活动状态,那么会将该线程的中断标志设置为 true,仅此而已。被设置中断标志的线程将继续正常运行,不受影响。

interrupt() 并不能真正的中断线程,需要被调用的线程自己进行配合才行。

Thread thread = new Thread(() -> {
    while (!Thread.interrupted()) {
        // do more work.
    }
});
thread.start();
// 一段时间以后
thread.interrupt();

例如,这里在执行了thread.interrupt()方法后,while循环将退出。

acquire方法中if语句里需要分为三个部分来看

tryAcquire(arg)(一)

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
//尝试抢占锁
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
// 重入锁的情况
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
// false 表示抢占失败
    return false;
}

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))(二)

抢占锁失败后,进入队列。

addWaiter方法

private Node addWaiter(Node mode) {
    //mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);

    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //失败则通过enq入队。
    enq(node);
    return node;
}

enq方法

private Node enq(final Node node) {
    //自旋,直到成功加入队尾
    for (;;) {
        Node t = tail;
        // 队列为空,则创建一个空的标志结点作为head结点,并将tail也指向它。
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法(三)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//是否成功拿到资源
    try {
        boolean interrupted = false;//等待过程中是否被中断过

        //自旋
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是第一个非head节点释放完资源唤醒自己,也可能被interrupt)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                failed = false; // 成功获取资源
                return interrupted;//返回等待过程中是否被中断过。下面会对其进行记录。
            }

            //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        if (failed) // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
            cancelAcquire(node);
    }
}

acquireQueued中的shouldParkAfterFailedAcquire()

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱获取资源后后通知自己,那就可以安心挂起
        return true;
    if (ws > 0) {
        //如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
        // 循环向前查找取消节点,把取消节点从队列中剔除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //设置前任节点等待状态为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

acquireQueued中的parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
    //挂起自己
    LockSupport.park(this);
    //返回当前线程的中断状态,并清除标记
    return Thread.interrupted();
}

acquire结束

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

selfInterrupt();中

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

至此加锁过程完成

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VcGEGr8f-1645527324568)(C:\Users\wang\AppData\Roaming\Typora\typora-user-images\1645526165642.png)]

总结:

1.调用tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2.没成功,则addWaiter()将该线程加入等待队列的尾部(正常节点的后面,并清除不正常的被取消的节点),并标记为独占模式;
3.acquireQueued()是一个循环,线程如果可以拿到资源则返回,否则在等待队列中休息,当被千亿节点唤醒(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。但并不会对其运行产生影响。具体可看上面介绍。

Lock.unlock

public void unlock() {
        sync.release(1);
    }

release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

tryRelease

// 方法返回当前锁是不是没有被线程持有
protected final boolean tryRelease(int releases) {
	// 减少可重入次数
	int c = getState() - releases;
	// 当前线程不是持有锁的线程,抛出异常
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	// 如果持有线程全部释放,将当前独占锁所有线程设置为null,并更新state
	if (c == 0) {
		free = true;
		setExclusiveOwnerThread(null);
	}
	setState(c);
	return free;
}

unparkSuccessor

private void unparkSuccessor(Node node) {
    //node为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
     如果下个节点是null或者下个节点被cancelled,就找到队列最开始的非cancelled的节点
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

至此就全部结束

下篇会讲解公平和非公平,可重入和不可重入,共享和排他锁的细节。

这篇关于AQS源码,你也可以的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!