Java教程

深入了解 Java 多线程

本文主要是介绍深入了解 Java 多线程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、多线程

1.1.多线程的概述

多线程,顾名思义,即采用多个线程来处理应用程序的请求。其主要优势在于充分利用了CPU的空闲时间片,可以用尽可能少的时间来对用户的要求做出响应。同一进程的所有线程是共享同一内存,便于不同线程之前资源共享。
Java 程序的进程里面至少包含两个线程,主线程(main()方法线程)和垃圾回收机制线程。

1.2.进程和线程

进程是指一个在内存中运行的应用程序,每个进程都有一个独立的内存空间。而线程是进程内部的一个独立执行单元,堆空间是共享的,栈空间是独立的,线程消耗的资源比进程小的多。

1.3. 线程的状态

  • NEW(新建)

    线程刚被创建,但是并未启动

  • RUNNABLE(可运行)

    线程可以在Java虚拟机中运行的状态,可能在运行,也可能没有

  • BLOCKED(锁阻塞)

    当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入Blocked状态;当该线程持有锁时,该线程将变成Runnable状态

  • WAITING(无限等待)

    一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入WAITING状态。进入这个状态后是不能自动唤醒的,必须等待另一个线程调用notify或者notifyAll方法才能够唤醒。

  • TIMED_WAITING(计时等待)

    与WAITING不同的是,等待了指定时间后会被唤醒。

  • TERMINATED(被终止)

    线程因为run方法正常退出而死亡,或者因为没有捕获的异常终止了run方法而死亡

线程的流程图:

线程的状态可以通过Object类的wait方法、notify方法、notifyAll方法,以及Thread类的yield方法、join方法和sleep方法来控制。
  • wait

    使持有该对象的线程把该对象的控制权交出去,然后处于等待状态

  • notify

    通知某个正在等待这个对象的控制权的线程继续运行

  • notifyAll

    会通知所有正在等待这个对象的控制权的线程继续运行

  • yield

    让当前正在运行的线程回到可运行状态,与其他具有相同优先级的线程重新争夺CPU的执行权力

  • join

    让当前线程变为等待状态,先执行指定的线程(调用join方法的线程),再执行当前线程

  • sleep

    让当前线程等待指定的时间,指定时间后会被自动唤醒

wait与sleep区别:

  • sleep方法导致程序暂停执行指定的时间,让出CPU给其他线程,但是它的监控状态依然保持着,当指定的时间到了又会自动恢复运行状态
  • wait方法是把控制权交出去,然后进入等待此对象的等待锁定池中,处于等待状态,只有针对此对象调用notify方法后,该线程才进入对象锁定池,准备获取对象锁,进入运行状态。
  • 在调用sleep方法的过程中,线程不会释放对象锁。而当调用wait方法的时候,线程会放弃对象锁。

在Java中,可以调用Thread实例的setPriority方法来控制线程的优先级,范围为1-10,其中10最高优先级,默认值为5。优先级的默认值并不是绝对的,子线程会继承父线程的优先级。

停止一个线程,可以通过Thread实例的interrupt方法来中断线程,但需要配合Thread实例的isInterrupted方法一起使用。

public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread(new Runnable() {
        public void run() {
            while (true) {
                try {
                    System.out.println("线程执行!");
                    //判断线程的中断标志来退出循环
                    if (Thread.currentThread().isInterrupted()) {
                        break;
                    }
                    Thread.sleep(100l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    //线程处于阻塞状态,当调用线程的interrupt()方法时,
                    //会抛出InterruptException异常,跳出循环
                    break;
                }
            }
        }
    });
    t.start();
    Thread.sleep(5000l);
    //中断线程
    t.interrupt();
    System.out.println("线程中断了");
}

或者使用标志的方式,使线程正常退出

public static boolean exit = true;

public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread(new Runnable() {
        public void run() {
            while (exit) {
                try {
                    System.out.println("线程执行!");
                    Thread.sleep(100l);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    });
    t.start();
    Thread.sleep(1000l);
    exit = false;
    System.out.println("退出标识位设置成功");
}

但不用使用stop方法,来强行终止线程,因为它是极为不安全的。

1.4. 多线程并发的特征

  • 原子性

    一个操作或者多个操作要么全部执行,要么全部不执行

  • 可见性

    当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值

  • 有序性

    程序执行的顺序按照代码的先后顺序执行

2、Synchronized

2.1. Synchronized的概述

Synchronized通过加锁的方式,可以保证方法或者代码块在运行时,同一时刻只有一个线程能执行Synchronized声明的代码块。由于同一时刻只有一个线程执行,代码块或方法内语句的重排序也不会影响其执行结果,保证了线程并发时的有序性。

在线程加锁时,将清空本地内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值。线程解锁前,必须把自己工作内存中共享变量的最新值刷新到主内存中。

由于同一时刻只有一个线程执行,并且加锁前从内存中读取了最新的值,解锁前又把修改后的值写入到内存中,因此synchronized也可以保证线程并发时的可见性。

因此Synchronized可以保证线程并发的原子性、可见性和有序性。

2.2. 锁对象

在使用synchronized来保证多线程并发时,需要指定一个锁对象,在Java中任何对象都可以成为锁对象。

在同步代码块中,需要显示的指定锁对象

//创建锁
Object lock = new Object(); 

synchronized(lock){
    //需要同步操作的代码
}

在普通同步方法中,锁对象为this

public synchronized void method(){
   //可能会产生线程安全问题的代码 
}

在静态同步方法中,锁对象是当前类的class对象

public static synchronized void method(){
   //可能会产生线程安全问题的代码 
}

2.3. 线程同步的原理

synchronized的同步操作主要是monitorenter和monitorexit这两个JVM指令实现的,而这两个JVM指令实现锁的使用,主要是基于 Mark Word和monitor。

首先编写一段简单的代码:

public class SynchronizedDemo {
    public void test() {
        synchronized (this) {
        }
    }
}

编译Java文件

javac SynchronizedDemo.java

使用javap命令查看编译后的文件

javap -c SynchronizedDemo.class

其结果是:

 public com.wulianpu.thread.SynchronizedDemo();
    Code:
       0: aload_0
       1: invokespecial #1                  // Method java/lang/Object."<init>":()V
       4: return

  public void test();
    Code:
       0: aload_0
       1: dup
       2: astore_1
       3: monitorenter // 获取锁
       4: aload_1
       5: monitorexit // 释放锁
       6: goto          14
       9: astore_2
      10: aload_1
      11: monitorexit
      12: aload_2
      13: athrow
      14: return
    Exception table:
       from    to  target type
           4     6     9   any
           9    12     9   any

2.4. 锁优化

synchronized是重量级锁,效率不是很高,因此在JDK 1.6中对锁的实现引入了大量的优化,如自旋锁、适应性自旋锁、锁消除、锁粗化、偏向锁、轻量级锁等技术来减少锁操作的开销。

锁主要存在四中状态,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态,他们会随着竞争的激烈而逐渐升级,锁可以升级但是不可降级,这种策略是为了提高获得锁和释放锁的效率。

2.4.1. 锁消除

为了保证数据的完整性,进行操作时需要对这部分操作进行同步控制,但是在有些情况下,JVM检测到这部分操作不可能存在共享数据竞争,这时JVM会对这些同步锁进行锁消除,锁消除的依据是逃逸分析的数据支持。

锁消除主要是针对与JDK内置的API,如StringBuffer、Vector、HashTable等,它们通过加锁的方式来保证数据在多线程的环境下线程安全。

2.4.2. 锁粗化

连续的加锁、解锁操作,可能会导致不必要的性能损耗,因此,JVM会将程序中多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁

3、CAS

3.1. CAS的概述

CAS,Compare And Swap,即比较并交换,指的是将预期值与当前变量的值比较(compare),如果相等则使用新值替换(swap)当前变量,否则不作操作。在 Java 的同步组件中大量使用了CAS技术来实现了多线程的并发操作,比如AQS同步组件、Atomic原子类等等都是以CAS实现的,甚至ConcurrentHashMap在 JDK 1.8的版本中也调整为了CAS+Synchronized来实现多线程的并发操作

3.2. CAS的原理

CAS的思想是将预期值(线程本地缓存的值)与当前变量的值(主内存的值)比较,如果相等则使用新值(修改后的值)替换当前变量,并返回true;否则不作操作,并返回false,如果CAS操作失败,会通过自旋的方式等待并再次尝试,直到成功。

CAS在先比较后修改这个过程中,根本没有获取锁和释放锁的操作,是硬件层面的原子操作,对比synchronized的同步,少了很多的逻辑步骤,使得性能大为提高。

以AtomicInteger类为例,可以查看它的addAndGet方法:

public class AtomicInteger {
    private static final Unsafe U = Unsafe.getUnsafe();
    
    private volatile int value;
    
    public final boolean compareAndSet(int expectedValue, int newValue) {
        return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
    }
}

其底层调用的是Unsafe类的compareAndSetInt方法,继续查看其源码:

public final class Unsafe {
    public final native boolean compareAndSetInt(Object o, 
                                                 long offset,
                                                 int expected,
                                                 int x);
}

该方法为native方法,有四个参数,分别代表:对象、当前值的内存偏移地址、预期值、修改值。

Unsafe 是CAS的核心类,它提供了硬件级别的原子操作,同时也是一个比较危险的类,主要是用于执行低级别、不安全的方法集合。尽管这个类和所有的方法都是public的,但是这个类的使用仍然受限,你无法在自己的Java程序中直接使用该类,因为只有授信的代码才能获得该类的实例。

3.3. 多CPU的CAS处理

CPU的读写速度远远大于主内存的读写速度,为了解决速度差异,在它们之间通常都会架设多级缓存,如L1级缓存、L2级缓存、L3级缓存,其速度L1最快、L2次快、L3最慢。

线程在读写数据时,会先在最快的L1级缓存中找,L1级缓存中找不到再去找次快的L2级缓存,如果是找不到,再去找L3级缓存找。

对一个CPU来说,可以把L3缓存看做内存的一部分,线程通过CAS操作数据时,会先看本地缓存(核心内的缓存)的数据和内存(L3缓存)的数据是否相等,如果相等,则表示数据没有被改变过,直接更新数据。如果不相等,表示数据被修改过,重新从内存读取数据到本地缓存,再次进行CAS操作。

但是对于有多个核心的CPU来说,就不能简单的把L3缓存看做内存的一部分了,每个处理器内部都有一个L3缓存,当多线程并发读写时,就会出现缓存数据不一致的情况。

因此,CPU提供了两种方法来实现多处理器的原子操作:

  • 总线加锁

    当一个CPU要操作数据时,在 BUS 总线上发出一个 Lock 信号,这个处理器就可以独享内存了,但缺点也很明显,在锁定期间,其他处理器都不能读取内存中的数据,请求将被阻塞增,加系统的开销。

  • 缓存加锁

    缓存加锁就是修改数据的内存地址,利用缓存一致性协议(MESI)来保证原子性。缓存一致性机制可以保证同一个内存区域的数据仅能被一个处理器修改,当不同的CPU对同一数据进行缓存后,其中一个CPU要对这个数据进行修改,这时这个CPU会出一个信号,通知其他CPU过期这个数据,重新加载。

3.4. CAS的不足

  • 自旋时间太长

    当CAS操作失败时,会通过自旋的方式等待再次尝试,如果CAS一直失败,则会给CPU带来非常大的开销,在JUC中有些地方就限制了CAS自旋的次数,例如BlockingQueue的SynchronousQueue。

  • 只能保证一个数据是原子操作

    CAS操作的是一个数据,所以只能保证一个数据的操作是原子操作。如果需要保证多个数据都是原子的,只能通过加锁的方式实现了。

  • ABA问题

    在CAS操作的过程中,会检测操作的数据有没有发生改变,如果没有发生改变则更新。当两个线程同时读取一个数据(A)后,第一个线程对数据进行改变(B),然后又改回来了(A),第二给线程对数据进行改变,然后发现内存中的数据与本地缓存中的数据一致,然后进行了修改,虽然CAS检查的时候会发现没有改变,但是它实质上已经发生了改变。对于ABA问题的解决方案就是加上版本号,每次修改数据都要检测版本号是否一致。

4、Volatile

4.1. Volatile的概述

Volatile是Java中的一个关键字,用于修饰变量,当一个变量被修饰为Volatile时,如果某个线程对这个变量进行更新,那么其他线程可以立马看到这个更新,也就是解决了线程并发放的可见性问题。

Volatile实现内存可见性原理:

线程进行写操作时,通过在写操作指令后加入一条store屏障指令,让本地内存中变量的值能够刷新到主内存中

线程进行读操作时,通过在读操作指令前加入一条load屏障指令,及时读取到变量在主内存的值

内存屏障(Memory Barrier)是一种CPU指令,用于控制特定条件下的重排序和内存可见性问题,Java编译器也会根据内存屏障的规则禁止重排序,所以也解决了线程并发的有序性。

使用的内存屏障:

  • StoreStore屏障可以保证在volatile写之前,其前面的所有普通写操作都已经刷新到主内存中

  • StoreLoad屏障的作用是避免volatile写与后面可能有的volatile读/写操作重排序

  • LoadLoad屏障用来禁止处理器把上面的volatile读与下面的普通读重排序

  • LoadStore屏障用来禁止处理器把上面的volatile读与下面的普通写重排序

4.2. Volatile原子性问题

使用Volatile关键字修饰的变量在进行读操作前,会通过加入load屏障指令,能及时读取到变量在主内存的值,但是在多线程的环境下,线程先后读取到主内存最新的值,然后线程先后对变量进行修改,即使修改后能刷新到主内存中,但仍会出现修改覆盖的情况。

public static void main(String[] args) throws InterruptedException {
    VolatileDemo demo = new VolatileDemo();
    for (int i = 0; i < 5; i++) {
        Thread t = new Thread(demo);
        t.start();
    }

    Thread.sleep(1000);
    System.out.println(demo.count);
}

static class VolatileDemo implements Runnable {
    public volatile int count;

    public void run() {
        addCount();
    }

    public void addCount() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }
}

程序执行完后,按照正常逻辑,count的值应该是50000,但结果却不是50000,也就是存在着原子性问题。

解决方案:

  • 使用synchronized
  • 使用ReentrantLock
  • 使用Atomic包下的原子变量类,如AtomicInteger

前面两种都是通过加锁的方式来解决问题,而AtomicInteger则是通过CAS的方式来解决问题,相比之下,AtomicInteger的性能更好。

public static void main(String[] args) throws InterruptedException {
    VolatileDemo demo = new VolatileDemo();
    for (int i = 0; i < 5; i++) {
        Thread t = new Thread(demo);
        t.start();
    }

    Thread.sleep(1000);
    System.out.println(demo.count);
}

static class VolatileDemo implements Runnable {
    public volatile AtomicInteger count = new AtomicInteger(0);

    public void run() {
        addCount();
    }

    public void addCount() {
        for (int i = 0; i < 10000; i++) {
            count.incrementAndGet();
        }
    }
}

4.3. Volatile和Synchronized比较

在变量真正独立于其他变量和自己以前的值,在单独使用的时候,适合用Volatile。

使用Volatile可以保证多线程并发的可见性和有序性,但无法保证原子性,而Synchronized既能保证可见性、有序性,又能保证原子性。但Volatile不需要加锁,比Synchronized更轻便,不会阻塞线程,性能更好。

5、 Atomic

Atomic是指在java.util.concurrent.atomic包的所有原子操作的类,Atomic包下的原子操作的类,用于方便程序员在多线程环境下,无锁的进行原子操作,核心操作是 CAS 原子操作。

5.1. 基本类型

使用原子的方式更新基本类型的类,主要又AtomicInteger(整型原子操作类)、AtomicLong(长整型原子操作类)、AtomicBoolean(布尔型原子操作类)。

AtomicInteger,主要API如下:

  • get()

    直接返回值

  • getAndAdd(int)

    增加指定的数据,返回变化前的数据

  • getAndDecrement()

    减少1,返回减少前的数据

  • getAndIncrement()

    增加1,返回增加前的数据

  • getAndSet(int)

    设置指定的数据,返回设置前的数据

  • addAndGet(int)

    增加指定的数据后返回增加后的数据

  • decrementAndGet()

    减少1,返回减少后的值

  • incrementAndGet()

    增加1,返回增加后的值

  • lazySet(int)

    仅仅当get时才会set

  • compareAndSet(int, int)

    参数1为期待值,参数2为修改后的值,将期待的值与当前值对比,若相同,则修改为指定的值并返回true,否则返回false

5.2. 引用类型

引用类型的原子操作类有:AtomicReference(引用类型原子操作类)、 AtomicStampedRefrence(带计数器功能引用类型原子操作类)、AtomicMarkableReference (带有标记位的引用类型原子操作类)

AtomicReference引用类型和基本类型的作用基本一样,AtomicStampedReference则是在AtomicReference的基础上增加了一个计数器,作用类似于版本号。

AtomicMarkableReference同样是在AtomicReference的基础上增加了一个布尔类型的标记位,只有true和false两种状态,相当于未修改和已修改。

通过AtomicStampedRefrence和AtomicMarkableReference可以有效避免CAS操作的ABA问题。

5.3. 数组类型

数组类型的原子操作类有:AtomicIntegerArray(整形数组原子操作类)、AtomicLongArray(长整形数组原子操作类)、 AtomicReferenceArray(引用类型数组原子操作类)

AtomicIntegerArray,主要API如下:

  • addAndGet(int, int)

    执行加法,参数1为数组的下标,参数2为增加的数量,返回增加后的结果

  • compareAndSet(int, int, int)

    对比修改,参数1为数组下标,参数2为期待的值,参数3为修改目标值,成功返回true,否则false

  • decrementAndGet(int)

    参数为数组下标,将数组对应数字减少1,返回减少后的数据

  • incrementAndGet(int)

    参数为数组下标,将数组对应数字增加1,返回增加后的数据

  • getAndAdd(int, int)

    和addAndGet类似,区别是返回值是变化前的数据

  • getAndDecrement(int)

    和decrementAndGet类似,区别是返回变化前的数据

  • getAndIncrement(int)

    和incrementAndGet类似,区别是返回变化前的数据

  • getAndSet(int, int)

    将对应下标的数字设置为指定值,第二个参数为设置的值,返回是变化前的数据

5.4. 对象属性更新器

对象属性的更新器类型有:AtomicIntegerFieldUpdater(原子更新整形字段的更新器)、 AtomicLongFieldUpdater(原子更新长整形字段的更新器 )、AtomicReferenceFieldUpdater (原子更新引用类形字段的更新器)

使用更新器有以下几个限制:

  • 操作的目标不能是static类型
  • 操作的目标不能是final类型的
  • 操作的目标必须是volatile类型的数据
  • 操作的目标必须对当前的Updater所在的区域是可见的

其实现方式是通过反射对属性进行操作

5.5. JDK1.8新增的类

JDK1.8新增的类有:LongAdder(长整型原子操作类)、 DoubleAdder(双浮点型原子操作类)、 LongAccumulator:(长整型原子操作类)、 DoubleAccumulator:(双浮点型原子操作类)

LongAccumulator与LongAdder的区别是,LongAccumulator要传入一个函数式接口。

LongAdder是JDK1.8提供的累加器,基于Striped64实现,所提供的API基本上可以替换原先的AtomicLong。

因为在非常高的并发请求下,AtomicLong虽然使用CAS来操作,但是CAS失败后还是通过无限循环的自旋锁,来不断尝试,这太浪费CPU资源了。

AtomicLong由于过多线程同时去竞争一个变量的更新而造成性能降低的,LongAdder则是将一个变量分解为多个变量,让同样多的线程去竞争这多个资源。
image
假如,有5个线程需要更新变量的值,需要更新的变量值为10

线程一,分解得到的值为1,增加了1,结果为2

线程二,分解得到的值为2,增长了2,结果为4

线程三,分解得到的值为3,增长了3,结果为6

线程四,分解得到的值为4,增长了4,结果为8

线程五,分解得到的值为0,增长了5,结果为5

然后将结果累加,结果为25

LongAdder和AtomicLong的性能对比测试结果如下:
image
从上结果图可以看出,在并发比较低的时候,LongAdder和AtomicLong的效果非常接近,但是当并发较高时,两者的差距会越来越大。

6、AQS

6.1. AQS的概述

AQS(AbstractQueuedSynchronizer),即队列同步器。它是JUC并发包中的核心基础组件,是构建锁或者其他同步组件的基础框架,如ReentrantLock、ReentrantReadWriteLock、Semaphore等。

AQS解决了实现同步器时涉及到的大量细节问题,例如获取同步状态、FIFO同步队列等。

6.2. state

AQS内部维护了一个volatile修饰的int类型的变量state,state用于表示当前线程的同步状态,当state>0时,表示已经获取了锁,当state = 0时,表示释放了锁。

AQS提供了三个方法来对state进行操作:

  • getState()

    返回同步状态的当前值

  • setState()

    设置当前的同步状态

  • compareAndSetState()

    使用CAS的方式设置当前状态

6.3. 资源共享方式

AQS定义了两种资源共享方式:

  • Exclusive

    独占,只有一个线程能执行,如ReentrantLock

  • Share

    共享,多个线程可同时执行,如Semaphore、CountDownLatch

不同的自定义同步器争用共享资源的方式也不同,自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

实现自定义同步器时主要实现以下几种方法:

  • isHeldExclusively()

    当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占,只有用到condition才需要去实现它。

  • tryAcquire(int)

    独占方式,尝试获取同步状态,成功则返回true,失败则返回false。

  • tryRelease(int)

    独占方式,尝试释放同步状态,成功则返回true,失败则返回false。

  • tryAcquireShared(int)

    共享方式,尝试获取同步状态。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

  • tryReleaseShared(int)

    共享方式,尝试释放同步状态,如果释放后,允许唤醒后续的等待结点,返回true,否则返回false。

6.4. CLH同步队列

CLH同步队列是一个FIFO的双向队列,AQS依赖它来完成同步状态的管理。

image

当前线程如果获取同步状态失败时,AQS则会将当前线程的信息构造成一个节点(Node),并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

CLH队列入列非常简单,就是将同步器的tail属性指向新节点,新节点的prev属性指向最后的一个节点,最后的一个节点的next属性指向新节点。
image

查看AQS(AbstractQueuedSynchronizer)的addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            // 通过CAS的方式设置新节点为尾节点
            if (compareAndSetTail(oldTail, node)) {
                // 将AQS的tail属性指向新节点
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

在addWaiter方法中,AQS通过死循环的方式来保证节点可以正确添加,只有成功添加后,当前线程才会从该方法返回,否则会一直执行下去。

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。
image

7、锁的类型

7.1. 自旋锁

线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作,势必会给系统的并发性能带来很大的压力。同时现实在许多应用中,对象锁的锁状态只会持续很短一段时间,为了这一段很短的时间而频繁地阻塞和唤醒线程是非常不值得的,所以引入自旋锁。

自旋锁就是让线程等待一段时间,不会被立即挂起,看持有锁的线程是否会很快释放锁,底层就是执行一段无意义的循环。AbstractQueuedSynchronizer就在其内部大量使用了自旋锁。

但是自旋等待并不能替代阻塞,虽然它可以避免线程切换带来的开销,但是它占用了处理器的时间。如果持有锁的线程很快就释放了锁,那么自旋的效率就非常好,反之,自旋的线程就会白白消耗掉处理的资源,它不会做任何有意义的工作,这样反而会带来性能上的浪费。

所以自旋等待的时间(自旋的次数)必须要有一个限度,如果自旋超过了定义的时间仍然没有获取到锁,则应该被挂起。自旋的默认次数为10次,可以通过参数-XX:PreBlockSpin来调整

但是通过参数设置自旋的次数,也不是很有效,因为调整的自旋次数是针对于整个应用程序的。

7.2. 适应自旋锁

使用自旋锁与自旋锁不同的地方在于,适应自旋锁自旋的次数不再是固定的,它是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。

如果线程自旋成功了,那么下次自旋的次数会更加多,因为虚拟机认为既然上次成功了,那么此次自旋也很有可能会再次成功,那么它就会允许自旋等待持续的次数更多。反之,如果对于某个锁,很少有自旋能够成功的,那么在以后要或者这个锁的时候自旋的次数会减少甚至省略掉自旋过程,以免浪费处理器资源。

7.3. 偏向锁

轻量级锁的加锁、解锁操作是需要依赖多次CAS原子指令的,而偏向锁只需要检查是否为偏向锁、锁标识为以及ThreadID即可,可以减少不必要的CAS操作。

7.4. 轻量级锁

轻量级锁主要是使用CAS进行原子操作,对于轻量级锁来说,其性能提升的依据是对于绝大部分的锁,在整个生命周期内都是不会存在竞争的,如果不满足这个条件,轻量级锁比重量级锁更慢,因为轻量级锁除了互斥的开销外,还有额外的CAS操作。

7.5. 重量锁

重量级锁通过对象内部的监视器(monitor)实现,其中monitor的本质是依赖于底层操作系统的Mutex Lock(互斥锁)实现,操作系统实现线程之间的切换需要从用户态到内核态的切换,切换成本非常高。

7.6 自旋锁

每个对象都相对于一个互斥锁的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。

7.7. 阻塞锁

阻塞锁就是让线程进入阻塞状态进行等待,当获得相应的信号(唤醒,时间) 时,才可以进入线程的准备就绪状态,准备就绪状态的所有线程,通过竞争,进入运行状态

7.8 读写锁

读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。

一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者,最大可能的读者数为实际的逻辑CPU数,写的优先级是高于读的。

ReentrantReadWriteLock是读写锁的一个实现,其主要特征:

  • 公平性:支持公平锁和非公平锁
  • 重入性:支持重入,读写锁最多支持65535个递归写入锁和65535个递归读取锁
  • 锁降级:写锁能够降级成为读锁,但读锁不能升级为写锁。

7.9. 公平锁

加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得

7.10. 非公平锁

加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待,非公平锁性能比公平锁高,因为公平锁需要在多核的情况下维护一个队列。

7.11. 可重入锁

ReentrantLock(可重入锁),是一种递归无阻塞的同步机制。它提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率。

简单来说,可重入锁就是在加锁和释放锁之间还能继续加锁和释放锁。

ReentrantLock还提供了公平锁和非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,否则为非公平锁。公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。

public ReentrantLock() {
    //非公平锁
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    //公平锁
    sync = fair ? new FairSync() : new NonfairSync();
}

Sync为ReentrantLock里面的一个内部类,它继承AQS(AbstractQueuedSynchronizer),它有两个子类:FairSync(公平)和NonfairSync(非公平锁)。

使用ReentrantLock获取锁,其底层使用的是AQS同步队列

ReentrantLock lock = new ReentrantLock();
lock.lock();

使用ReentrantLock释放锁

public void unlock() {
    sync.release(1);
}

ReentrantLock与synchronized的区别:

  • ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性,如时间锁等候,可中断锁等候,锁投票等等,除此之外,还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
  • ReentrantLock提供了可轮询的锁请求,它会尝试着去获取锁,如果成功,则继续,否则可以等到下次运行时处理,而synchronized则一旦去请求获取锁,要么成功,要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
  • ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放(ReentrantLock的锁释放一定要在finally中处理)。
  • ReentrantLock支持中断处理,且性能较synchronized会好些

8、Condition

8.1 Condition的概述

在没有Lock之前,我们使用Synchronized来控制同步,配合Object的wait、notify等方法可以实现等待/通知模式。在JDK5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。

Condition是一种广义上的条件队列(等待队列),它为线程提供了一种更为灵活的等待/通知模式。

Condition必须要配合锁一起使用,因此Condition一般都是作为Lock的内部实现。

8.2 Condition的实现

Condition为一个接口,其下仅有一个实现类ConditionObject,ConditionObject为AQS的内部类,AQS是同步锁的实现基础。

Condition可以通过Lock实例的newCondition()方法来获取,返回的结果是绑定到此 Lock 实例的新 Condition 实例。

Condition提供了一系列的方法来对阻塞和唤醒线程:

  • await()

    当前线程在接到信号或被中断之前一直处于等待状态

  • await(long time, TimeUnit unit)
    当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。

  • awaitNanos(long nanosTimeout)
    当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。

  • awaitUninterruptibly()
    当前线程在接到信号之前一直处于等待状态,该方法对中断不敏感。

  • awaitUntil(Date deadline)
    当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。

  • signal()
    唤醒一个等待线程,该线程从等待方法返回前必须获得与Condition相关的锁

  • signal()All
    唤醒所有等待线程,能够从等待方法返回的线程必须获得与Condition相关的锁

8.3. 等待队列

每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    //头节点
    private transient Node firstWaiter;
    //尾节点
    private transient Node lastWaiter;
    
    public ConditionObject() {
    }
    /** 省略方法 **/
}

ConditionObject内部拥有首节点(firstWaiter)、尾节点(lastWaiter),当前线程调用await方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。

Node里面包含了当前线程的引用,ConditionObject中的Node与AQS的CLH同步队列的Node使用的都是AbstractQueuedSynchronized.Node静态内部类。

调用Condition的await方法,首先用当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。同时不断检测该节点代表的线程是否出现在CLH同步队列中,如果不存在则一直挂起,否则参与竞争同步状态

调用Condition的signal方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

9、并发工具类

9.1. CyclicBarrier

CyclicBarrier也叫同步屏障,是在JDK1.5被引入的一个同步辅助类。CyclicBarrier的作用就好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。

CyclicBarrier的内部是使用ReentrantLock和Condition,其构造方法如下:

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

创建一个 CyclicBarrier,它将在给定数量(parties)的参与者(线程)处于等待状态时启动,并在启动屏障时执行给定的屏障操作(barrierAction),该操作由最后一个进入屏障的线程执行。

在CyclicBarrier中最重要的方法莫过于await()方法,每个线程调用await方法告诉CyclicBarrier已经到达屏障位置,线程被阻塞。

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    List<Thread> threadList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        Thread t = new Thread(new Athlete(cyclicBarrier, "运动员" + i));
        threadList.add(t);
    }
    for (Thread t : threadList) {
        t.start();
    }
}

static class Athlete implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private String name;

    public Athlete(CyclicBarrier cyclicBarrier, String name) {
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(name + "就位");
        try {
            cyclicBarrier.await();
            System.out.println(name + "跑到终点。");
        } catch (Exception e) {
        }
    }
}

9.2. CountDownLatch

用给定的数量初始化 CountDownLatch,在当前计数到达零之前,await 方法会一直受阻塞。作用与CyclicBarrier有点相似,但是无法被重置。

CountDownLatch是一个或者多个线程,等待其他多个线程完成某件事情之后才能执行,而CyclicBarrier是多个线程互相等待,直到到达同一个同步点,再继续一起执行。

CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后就可以恢复等待的线程,继续执行了。

image

CountDownLatch内部依赖Sync实现,而Sync继承AQS。

CountDownLatch中最常用的方法:

  • await()

    用于使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。内部使用AQS的getState方法获取计数器,如果计数器值不等于0,则会以自旋方式会尝试一直去获取同步状态。

  • countDown()

    用于递减锁存器的计数,如果计数到达零,则释放所有等待的线程。内部调用AQS的releaseShared方法来释放共享锁同步状态

public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

    List<Thread> threadList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //起点运动员
        Thread t1 = new Thread(new Athlete(cyclicBarrier, countDownLatch, "起点运动员" + i));

        //接力运动员
        Thread t2 = new Thread(new Athlete(countDownLatch, "接力运动员" + i));

        threadList.add(t1);
        threadList.add(t2);
    }

    for (Thread t : threadList) {
        t.start();
    }
}

static class Athlete implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private String name;

    CountDownLatch countDownLatch;

    //起点运动员
    public Athlete(CyclicBarrier cyclicBarrier, CountDownLatch countDownLatch, String name) {
        this.cyclicBarrier = cyclicBarrier;
        this.countDownLatch = countDownLatch;
        this.name = name;
    }

    //接力运动员
    public Athlete(CountDownLatch countDownLatch, String name) {
        this.countDownLatch = countDownLatch;
        this.name = name;
    }

    @Override
    public void run() {
        //判断是否是起点运动员
        if (cyclicBarrier != null) {

            System.out.println(name + "就位");
            try {
                cyclicBarrier.await();
                System.out.println(name + "到达交接点。");

                //已经到达交接点
                countDownLatch.countDown();
            } catch (Exception e) {
            }
        }

        //判断是否是接力运动员
        if (cyclicBarrier == null) {
            System.out.println(name + "就位");
            try {
                countDownLatch.await();
                System.out.println(name + "到达终点。");
            } catch (Exception e) {
            }
        }
    }
}

9.2. Semaphore

Semaphore是一个控制访问多个共享资源的计数器,和CountDownLatch一样,其本质上是一个“共享锁”。

Semaphore维护了一个信号量许可集,当信号量中有可用的许可时,线程可以获取该许可,有许可的线程可以继续执行,否则线程必须等待,直到有可用的许可为止。 线程可以释放它所持有的信号量许可,被释放的许可归还到许可集中,可以被其他线程再次获取。

当信号量初始化为 1 时,可以当作互斥锁使用,因为它只有两个状态:有一个许可能使用,或没有许可能使用。当以这种方式使用时,“锁”可以被其他线程控制和释放,而不是主线程控制释放。

Semaphore内部包含了公平锁(FairSync)和非公平锁(NonfairSync),其构造方法定义如下:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

使用具有给定的许可数(permits)和给定是否公平(fair)来创建Semaphore

Semaphore常用的方法:

  • acquire()

    用于来获取一个许可

  • release()

    用于释放许可

public static void main(String[] args) {
    Parking parking = new Parking(3);
    for (int i = 0; i < 5; i++) {
        new Car(parking).start();
    }
}

static class Parking {
    //信号量
    private Semaphore semaphore;

    Parking(int count) {
        semaphore = new Semaphore(count);
    }

    public void park() {
        try {
            //获取信号量
            semaphore.acquire();
            long time = (long) (Math.random() * 10);
            System.out.println(Thread.currentThread().getName() + "进入停车场,停车" + time + "秒...");
            Thread.sleep(time);
            System.out.println(Thread.currentThread().getName() + "开出停车场...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //释放信号量
            semaphore.release();
        }
    }
}

static class Car extends Thread {
    Parking parking;

    Car(Parking parking) {
        this.parking = parking;
    }

    @Override
    public void run() {
        //进入停车场
        parking.park();
    }
}

10、并发容器

10.1. 并发容器的概述

HashMap是我们常用的集合,但它是线程不安全的,为了解决该问题,JDK提供了Hashtable和Collections.synchronizedMap(hashMap)两种解决方案,但是这两种方案都是对读写加锁,一个线程在读数据时,其他线程必须等待,吞吐量较低,性能较为低下,所以又提供了ConcurrentHashMap来解决该问题。

ConcurrentHashMap是高性能并且线程安全HashMap,在JDK 1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,来保证并发更新的安全。但是在JDK 1.8中改变了这种思路,采用了CAS+Synchronized来保证并发更新的安全,其底层当采用数组+链表+红黑树的存储结构。

10.2. JDK7 HashMap

在JDK7中HashMap底层采用的是数组+链表的结构,不支持并发操作。
image

HashMap里面存储的每个元素都是Entry的实例,Entry 包含key、value、 hash和用于指向下一个元素的next。

HashMap的构造方法:

public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                           initialCapacity);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                           loadFactor);
    this.loadFactor = loadFactor;
    this.threshold = tableSizeFor(initialCapacity);
}

initialCapacity为当前数组容量,始终保持 2^n,默认值为16;loadFactor为负载因子,默认为 0.75; threshold为扩容的阈值,其值等于 capacity * loadFactor。

put 过程:

  • 数组初始化/;在第一个元素插入 HashMap 的时候做一次数组的初始化,先确定初始的数组大小,并计算数组扩容的阈值。
  • 计算具体数组位置:使用key进行hash值计算,根据hash值计算应该放在哪个数组中。
  • 找到数组下标后,会先判断 key 是否重复,如果没有重复,就准备将新值放入到链表的表头
  • 数组扩容:在插入新值的时候,如果当前的 size 已经达到了阈值,并且要插入的数组位置上已经有元素,那么就会触发扩容,扩容后,数组大小为原来的 2 倍。扩容就是用一个新的大数组替换原来的小数组,并将原来数组中的值迁移到新的数组中。

get过程:

  • 根据 key 计算 hash 值
  • 根据hash值找到相应的数组下标
  • 遍历该数组位置处的链表,直到找到相等的 key

10.3. JDK8 HashMap

JDK8 对 HashMap 进行了一些修改,最大的不同就是利用了红黑树,其结构由 数组+链表+红黑树 组成。

在JDK7中的HashMap,进行查找的时候,根据 hash 值我们能够快速定位到数组的具体下标,但是之后的话,需要顺着链表一个个比较下去才能找到我们需要的,时间复杂度取决于链表的长度。

为了降低这部分的开销,在 JDK8 中,当链表中的元素超过了 8 个以后,会将链表转换为红黑树,以降低在查找的时候可以降低时间复杂度。
image
在 JDK7 中使用 Entry 来代表每个 HashMap 中的数据节点,而在 JDK8 中则使用 Node 来代替数据节点,基本没有区别,都是 key、value、hash 和 next 这四个属性,不过,Node 只能用于链表的情况,红黑树的情况需要使用 TreeNode。

10.4. JDK7 ConcurrentHashMap

ConcurrentHashMap 由一个个 Segment 组成,Segment 代表部分或一段的意思,所以很多人都会将其描述为分段锁。简单的说,ConcurrentHashMap 是一个 Segment 数组,每个Segment内部都是一个数组+链表的结构,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的。
image
ConcurrentHashMap的构造方法:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

initialCapacity为当前Segment数组的长度,默认值为16;loadFactor为负载因子,是给每个 Segment 内部使用的,默认为 0.75; concurrencyLevel为并发数,默认值为16。

put过程

  • 根据 hash 值找到相应的 Segment,之后就是 Segment 内部的 put 操作了。
  • Segment 数组不能扩容,rehash方法扩容的是 segment 数组某个位置的内部数组 HashEntry[] 进行扩容,扩容后,容量为原来的 2 倍。

get过程

  • 计算 hash 值,找到 segment 数组中的具体位置
  • segment 数组中也是数组,再根据 hash 找到数组中具体值的位置
  • 然后顺着链表进行查找即可

10.5. JDK8 ConcurrentHashMap

在JDK 8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是JDK 1.8中已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,底层采用数组+链表+红黑树的存储结构。
image
ConcurrentHashMap线程安全的,允许一边更新一边遍历,也就是说在对象遍历的时候,也可以进行remove、put操作,且遍历的数据会随着remove、put操作产出变化。

10.6. ConcurrentSkipListMap

ConcurrentSkipListMap底层使用的数据结构是SkipList,SkipList 称之为跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照key值升序,天然有序。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过空间来换取时间的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

image

SkipListd的查找:

image
SkipList的插入:

插入时,需要查找合适的位置,在确认新节点要占据的层次K时,采用丢硬币的方式,完全随机,如果占据的层次K大于链表的层次,则重新申请新的层,否则插入到指定层次。

image

SkipList的删除:

image

11、队列

11.1. 队列的概述

线程安全的队列可以分为阻塞队列和非阻塞队列,常用的线程安全对象有:

队列名称 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ConcurrentLinkedQueue 非阻塞 无界 CAS 对全局的集合进行操作的场景 size() 是要遍历一遍集合,慎用
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 --
LinkedBlockingQueue 阻塞 可配置 存取采用2把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题
PriorityBlockingQueue 阻塞 无界 一把全局锁 支持优先级排序
SynchronousQueue 阻塞 无界 CAS 不存储元素的阻塞队列

11.2. ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链表结构的无边界的线程安全队列,它是非阻塞的,遵循队列的FIFO原则,采用CAS算法来实现线程安全。

常用方法:

  • add

    向队列添加元素

  • poll

    将首个元素从队列中弹出,如果队列是空的,就返回null

  • peek

    查看首个元素,不会移除首个元素,如果队列是空的就返回null

  • element

    查看首个元素,不会移除首个元素,如果队列是空的就抛出异常NoSuchElementException

使用ConcurrentLinkedQueue的size方法需要遍历一遍集合的,速度很慢,所以尽量要避免用size方法

public static void main(String[] args) throws Exception {
    Queue<String> queue = new ConcurrentLinkedQueue<String>();
    for (int i = 0; i < 10000; i++) {
        //队列中添加元素
        queue.add(String.valueOf(i));
    }

    QueueDemo1 demo1 = new QueueDemo1(queue);

    for (int i = 0; i < 10; i++) {
        Thread t = new Thread(demo1);
        t.start();
    }
}

class QueueDemo1 implements Runnable {
    Queue<String> queue;

    public QueueDemo1(Queue<String> queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            long start = new Date().getTime();
            //检索并移除此队列的头,如果此队列为空,则返回 null
            while (queue.poll() != null) {
                //if (queue.size() == 0) {
                //}

                if (queue.isEmpty()) {
                }
            }
            System.out.println(System.currentTimeMillis() - start);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

11.3. ArrayBlockingQueue

ArrayBlockingQueue是一个由数组实现的有界阻塞队列。该队列采用FIFO的原则对元素进行排序添加的,其大小在构造时由构造函数来决定,确认之后就不能再改变了。

ArrayBlockingQueue支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下是不保证线程公平的访问。

ArrayBlockingQueue内部使用可重入锁ReentrantLock + Condition来完成多线程环境的并发操作,在ArrayBlockingQueue中,生产和消费用的是同一个锁。

//最大容量为5的数组阻塞队列
private static ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5, true);
//private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(5);

public static void main(String[] args) {

    Thread t1 = new Thread(new ProducerTask());
    Thread t2 = new Thread(new ConsumerTask());

    //启动线程  
    t1.start();
    t2.start();

}

//生产者
static class ProducerTask implements Runnable {
    private Random rnd = new Random();

    @Override
    public void run() {
        try {
            while (true) {
                int value = rnd.nextInt(100);
                //如果queue容量已满,则当前线程会堵塞,直到有空间再继续
                queue.put(value);

                System.out.println("生产者:" + value);

                TimeUnit.MILLISECONDS.sleep(100); //线程休眠
            }
        } catch (Exception e) {
        }
    }
}

//消费者
static class ConsumerTask implements Runnable {
    @Override
    public void run() {
        try {
            while (true) {
                //如果queue为空,则当前线程会堵塞,直到有新数据加入
                Integer value = queue.take();

                System.out.println("消费者:" + value);

                TimeUnit.MILLISECONDS.sleep(15); //线程休眠
            }
        } catch (Exception e) {
        }
    }
}

11.4. LinkedBlockingQueue

LinkedBlockingQueue是一个基于链表的有界阻塞队列,在LinkedBlockingQueue中的生产和消费用的不是同一个锁。

11.5. PriorityBlockingQueue

PriorityBlockingQueue是一个优先级队列,它在PriorityQueue的基础上提供了可阻塞并且线程安全的读取操作。它是无界的,但可能会导致内存溢出而失败。PriorityBlockingQueue类似于ArrayBlockingQueue,其内部使用一个独占锁来控制,同时只有一个线程可以进行入队和出队。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素,并且可以定制优先级的规则,内部使用二叉堆,通过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,在当前元素个数>=最大容量时候会通过算法扩容,为了避免在扩容操作时候其他线程不能进行出队操作,实现上使用了先释放锁,然后通过CAS保证同时只有一个线程可以扩容成功。

优先队列不允许空值,而且不支持non-comparable(不可比较)的对象,比如用户自定义的类。优先队列要求使用Java Comparable和Comparator接口给对象排序,并且在排序时会按照优先级处理其中的元素

public static void main(String[] args) throws InterruptedException {
    PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();

    PriorityDemo demo = new PriorityDemo(queue);

    for (int i = 0; i < 5; i++) {
        new Thread(demo).start();
    }

    Thread.sleep(100);

    User u = queue.poll();
    while (u != null) {
        System.out.println("优先级是:" + u.getPriority() + "," + u.getUsername());
        u = queue.poll();
    }
}

static class PriorityDemo implements Runnable {

    PriorityBlockingQueue queue;
    Random r = new Random();

    public PriorityDemo(PriorityBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            User user = new User();
            user.setPriority(r.nextInt(100));
            user.setUsername("张三" + i);

            queue.add(user);
        }
    }
}

static class User implements Comparable<User> {

    private Integer priority;
    private String username;

    @Override
    public int compareTo(User user) {
        //System.out.println("比较结果"+this.priority.compareTo(user.getPriority()));
        return this.priority.compareTo(user.getPriority());
    }

    public Integer getPriority() {
        return priority;
    }

    public void setPriority(Integer priority) {
        this.priority = priority;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

11.6. SynchronousQueue

SynchronousQueue,实际上它不是一个真正的队列,因为它不存储元素,因此它需要put操作和take操作同时进行才不会阻塞,否则会一直阻塞,直到有另一个线程进行put操作或者take操作,每个 put 必须等待一个 take。

SynchronousQueue对于正在等待的生产者和使用者线程而言,默认是非公平排序,也可以选择公平排序策略。

常用方法:

  • put()
    往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走
  • offer()
    往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false
  • offer(2000, TimeUnit.SECONDS)
    往queue里放一个element但等待时间后才返回,和offer()方法一样
  • take()
    取出并且remove掉queue里的element,取不到东西他会一直等
  • poll()
    取出并且remove掉queue里的element,方法立即能取到东西返回。否则立即返回null
  • poll(2000, TimeUnit.SECONDS)
    等待时间后再取,并且remove掉queue里的element
public static void main(String[] args) throws InterruptedException {
    SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

    new Thread(new Product(queue)).start();
    new Thread(new Customer(queue)).start();
}

static class Product implements Runnable {
    SynchronousQueue<Integer> queue;
    Random r = new Random();

    public Product(SynchronousQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            int number = r.nextInt(1000);
            System.out.println("等待1秒后运送" + number);
            try {
                TimeUnit.SECONDS.sleep(1);

                queue.put(number);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

static class Customer implements Runnable {
    SynchronousQueue<Integer> queue;

    public Customer(SynchronousQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                System.out.println("收到了:" + queue.take());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

12、线程池

12.1. 线程池的概述

线程的创建和切换都是代价比较大的操作,因此复用已有的线程可以降低资源的消耗,提高响应速度,线程池就是线程的一个容器,可以用来管理和维护线程。

Java的线程池主要通过ThreadPoolExecutor来实现,常用的ExecutorService的各种线程池策略都是基于ThreadPoolExecutor实现的。

12.2. 线程池的状态

在ThreadPoolExecutor内部定义了一个名为ctl的AtomicInteger类型的变量,用于记录了线程池中的任务数量和线程池的状态两个信息,共32位,其中高3位表示线程池状态,低29位表示线程池中的任务数量。

线程池的状态:

  • RUNNING
    处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理
  • SHUTDOWN
    处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理
  • STOP
    处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
  • TIDYING
    当所有的任务已终止,ctl记录的任务数量为0,线程池会变为TIDYING状态,当线程池变为TIDYING状态时,会执行钩子函数terminated(),terminated()在ThreadPoolExecutor类中是空实现,若想在线程池变为TIDYING时,进行相应的处理,可以通过重载terminated()函数来实现
  • TERMINATED
    线程池彻底终止的状态

image

12.3. ThreadPoolExecutor

Java的线程池主要通过ThreadPoolExecutor来实现,其构造方法为:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize为线程池中核心线程的数量,可以调用线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize为线程池中允许的最大线程数。

keepAliveTime为线程空闲的时间,线程执行完任务后不会立即销毁,而是继续存活一段时间。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。

unit为keepAliveTime的单位。

workQueue为用来保存等待执行的任务的BlockQueue阻塞队列,等待的任务必须实现Runnable接口。

threadFactory为用于设置创建线程的工厂,hreadFactory就是用于创建线程的工厂,通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是非守护线程而且线程优先级都是默认优先级。

handler为RejectedExecutionHandler,线程池的拒绝策略,所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。

12.4. FixedThreadPool

FixedThreadPool是复用固定数量的线程来处理一个共享的无边界队列,其工作队列使用的是使用 LinkedBlockingQueue。在初始化时是不会创建线程,只有使用的时候才会创建,并只有到达最大线程数量的时候才会进行复用线程。

12.5.SingleThreadExecutor

SingleThreadExecutor只会使用单个工作线程来执行一个无边界的队列,其工作队列使用的是使用 LinkedBlockingQueue 。在初始化时是不会创建线程,只有使用的时候才会创建,并只有到达最大线程数量的时候才会进行复用线程。

12.6. CachedThreadPool

CachedThreadPool只有在用到线程的时候再创建线程,最大线程数量是Integer.MAX_VALUE,空闲线程等待新任务的最长时间为60秒,其工作队列使用的是使用 SynchronousQueue。

12.7. ScheduledThreadPool

ScheduledThreadPoolExecutor相当于提供了延迟和周期执行功能的ThreadPoolExecutor,其工作队列使用的是使用 DelayedWorkQueue。在初始化时是不会创建线程,只有使用的时候才会创建,并只有到达最大线程数量的时候才会进行复用线程。

DelayedWorkQueue,类似于延时队列和优先级队列。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面,这样就可以保证每次出队的任务都是当前队列中执行时间最靠前的。

12.8. 线程池的拒绝策略

线程池的拒绝策略:

  • AbortPolicy
    直接抛出异常,默认策略
  • CallerRunsPolicy
    用调用者所在的线程来执行任务
  • DiscardOldestPolicy
    丢弃阻塞队列中靠最前的任务,并执行当前任务
  • DiscardPolicy
    直接丢弃任务

自定义拒绝策略,可以实现RejectedExecutionHandler接口

这篇关于深入了解 Java 多线程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!