Java教程

Semaphore

本文主要是介绍Semaphore,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Semaphore介绍

Semaphore(信号量)是通过计数器来控制线程数量,如果计数器大于0,则允许访问。 如果为0,则拒绝访问。 底层还是使用的AbstractQueuedSynchronizer那一套控制。

Semaphore初试

定义一个线程数量为8的线程池,使用for循环创建10个任务,任务中使用信号量控制。可以看出,尽管最大线程数量是8,但是实际执行过程中因为获取Semaphore资源,而Semaphore设置为4,所以同时执行的线程数量仍然是4。

  private static ExecutorService executorService = Executors.newFixedThreadPool(8);
    public static void main(String[] args) {

        Semaphore semaphore = new Semaphore(4);
        long beginTime = System.currentTimeMillis();
        System.out.println(beginTime);

        for(int i = 0 ; i < 10; i++){
            int finalI = i;
            executorService.execute( () -> {
                try {
                    semaphore.acquire();
                    System.out.println("get semaphore " + finalI + " time : "+ ((System.currentTimeMillis() - beginTime ) /1000  ));
                    Thread.sleep(3000);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            });
        }
    }

get semaphore 0 time : 0
get semaphore 2 time : 0
get semaphore 1 time : 0
get semaphore 3 time : 0
get semaphore 8 time : 3
get semaphore 4 time : 3
get semaphore 9 time : 3
get semaphore 5 time : 3
get semaphore 6 time : 6
get semaphore 7 time : 6

使用场景

一般可以用来作为线程隔离使用。比如断路器Hystrix的舱壁模式中,其中一个是基于线程池的隔离,每个微服务调用创建一个线程池,服务之间的异常互不影响,但是维护线程池会带来额外的开销;另一种就是基于信号量的控制,通过控制每一个服务的信号量,来控制每个服务使用的线程数量。

源码分析

  • 创建一个信号量
        Semaphore semaphore = new Semaphore(4);
    //底层实际创建一个sync 
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
//sync 也是调用父类Sync的构造函数
  NonfairSync(int permits) {
            super(permits);
        }

//最终把这个参数传递给了AbstractQueuedSynchronizer的state参数,用来控制信号数量
 abstract static class Sync extends AbstractQueuedSynchronizer {
        Sync(int permits) {
            setState(permits);
        }
}

  • 获取信号
                    semaphore.acquire();
//实际调用了sync的acquireSharedInterruptibly方法,参数就是控制state自减的参数
  public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

//acquireSharedInterruptibly 方法尝试去获取信号,即调用tryAcquireShared(arg)方法,如果state自减之后小于0,表示无信号可用,当前线程调用doAcquireSharedInterruptibly挂起
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

// tryAcquireShared方法调用了semaphore静态内部类Sync的nonfairTryAcquireShared方法。
//nonfairTryAcquireShared主要使用一个自循环,对state字段做自减操作,当下列任意条件满足时退出:
// 1、当前可用信号量小于0  2、自减操作成功  返回当前可用信号量
  final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

//上一步的if对返回值进行判断,当小于0时,表示无信号可用,当前线程调用doAcquireSharedInterruptibly挂起,底层调用的还是AbstractQueuedSynchronizer的doAcquireSharedInterruptibly方法

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
//新增一个node节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
//返回该节点的上一个节点
                final Node p = node.predecessor();
//如果是头节点,尝试去获取信号
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
//获取信号成功,释放p节点,即新增节点的上一个节点,并且将新增节点置为头节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
//判断节点状态,去除一些非正常状态的节点,然后将线程LockSupport.park(this)挂起,等待唤起的时候,调用一下Thread.interrupted()方法判断线程是否异常。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  • 释放信号
                    semaphore.release();
  public void release() {
//这里调用的还是AbstractQueuedSynchronizer的releaseShared方法
        sync.releaseShared(1);
    }

   public final boolean releaseShared(int arg) {
//这里使用自循环对state做相反的自加操作,直到操作成功返回
        if (tryReleaseShared(arg)) {
//然后唤醒头节点
            doReleaseShared();
            return true;
        }
        return false;
    }


private void doReleaseShared() {
//这里也是使用了一个自循环来剔除异常节点,超时这些,直到获取到一个正常的Node.SIGNAL状态节点,修改他的状态为正常执行,并且unparkSuccessor唤醒该线程
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
这篇关于Semaphore的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!