死锁是指两个或者两个以上的线程在执行的过程中,因争夺资源产生的一种互相等待现象。
假设线程 A 持有资源 1,线程 B 持有资源 2,它们同时都想申请对方的资源,那么这两个线程就会互相等待而进入死锁状态。
使用 Java 代码模拟上述死锁场景:
public class Resources { public static final Object resource1 = new Object(); // 资源 1 public static final Object resource2 = new Object(); // 资源 2 }
public class ThreadA extends Thread{ @Override public void run() { synchronized (Resources.resource1) { System.out.println(Thread.currentThread().getName() + " get Resource-1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " waiting get Resource-2"); synchronized (Resources.resource2) { System.out.println(Thread.currentThread().getName() + " get Resource-2"); } } } }
public class ThreadB extends Thread{ @Override public void run() { synchronized (Resources.resource2) { System.out.println(Thread.currentThread().getName() + " get Resource-2"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " waiting get Resource-1"); synchronized (Resources.resource1) { System.out.println(Thread.currentThread().getName() + " get Resource-1"); } } } }
public class DeadLockDemo { public static void main(String[] args) { Thread threadA=new ThreadA(); threadA.setName("Thread A"); threadA.start(); Thread threadB=new ThreadB(); threadB.setName("Thread B"); threadB.start(); } }
输出结果:
Thread A get Resource-1 Thread B get Resource-2 Thread B waiting get Resource-1 Thread A waiting get Resource-2
线程 A 通过 synchronized (resource1)
获得 resource1
的监视器锁,然后线程 A 休眠 1s,执行线程 B 获取到 resource2 的监视器锁。线程 A 和线程 B 休眠结束了都开始企图请求获取对方的资源,然后这两个线程就会陷入互相等待的状态,这也就产生了死锁。
上面的例子符合产生死锁的四个必要条件:
在程序运行之前预防发生死锁。
我们对线程 B 的代码进行如下改造,就不会出现死锁了。
public class ThreadB extends Thread{ @Override public void run() { synchronized (Resources.resource1) { System.out.println(Thread.currentThread().getName() + " get Resource-1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " waiting get Resource-2"); synchronized (Resources.resource1) { System.out.println(Thread.currentThread().getName() + " get Resource-2"); } } } }
输出结果:
Thread A get Resource-1 Thread A waiting get Resource-2 Thread A get Resource-2 Thread B get Resource-1 Thread B waiting get Resource-2 Thread B get Resource-2
线程 A 首先获得到 resource1 的监视器锁,这时候线程 2 就获取不到了。然后线程 1 再去获取 resource2 的监视器锁,可以获取到。线程 1 释放了对 resource1、resource2 的监视器锁的占用,线程 2 获取到就可以执行了。这样就破坏了环路等待条件,因此避免了死锁。
如果多个线程对同一个共享数据进行访问而不采取同步操作的话,那么操作的结果是不一致的。
以下代码演示了 1000 个线程同时对 cnt 执行自增操作,操作结束之后它的值有可能小于 1000。
public class ThreadUnsafeExample { private int cnt = 0; public void add() { cnt++; } public int get() { return cnt; } }
public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadUnsafeExample example = new ThreadUnsafeExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); }
输出结果:
998 //如果线程是安全的最终结果就是 1000
解决线程安全问题:
方案一:使用 AtomicInteger(实际上非阻塞同步)
public class ThreadSafeExample { // cnt 的初始值为 0。 private AtomicInteger cnt = new AtomicInteger(0); public void add() { cnt.incrementAndGet(); } public int get() { return cnt.get(); } public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadSafeExample example = new ThreadSafeExample(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); } }
输出结果:
1000
方案二:使用 synchronized(实际上是阻塞同步/互斥同步)
public class ThreadSafeExample2 { // cnt 的初始值为 0。 private int cnt = 0; public synchronized void add() { cnt++; } public synchronized int get() { return cnt; } public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadSafeExample2 example = new ThreadSafeExample2(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); } }
输出结果:
1000
方案三:使用 Reentrant(实际上是阻塞同步/互斥同步)
public class ThreadSafeExample3 { // cnt 的初始值为 0。 private int cnt = 0; private ReentrantLock lock = new ReentrantLock(); public void add() { try{ lock.lock(); cnt++; }finally { lock.unlock(); } } public int get() { try{ lock.lock(); return cnt; }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { final int threadSize = 1000; ThreadSafeExample3 example = new ThreadSafeExample3(); final CountDownLatch countDownLatch = new CountDownLatch(threadSize); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < threadSize; i++) { executorService.execute(() -> { example.add(); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println(example.get()); } }
输出结果:
1000
所谓线程安全就是说,多个线程不管以何种方式访问某个类,并且在主调代码中不需要进行同步,都能表现正确的行为。
线程安全有以下几种实现方式:不可变、互斥同步、非阻塞同步、无同步方案(栈封闭和线程本地存储)
不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。
不可变的类型:
对于集合类型,可以使用 Collections.unmodifiableXXX() 方法来获取一个不可变的集合。
public class ImmutableExample { public static void main(String[] args) { Map<String, Integer> map = new HashMap<>(); Map<String, Integer> unmodifiableMap = Collections.unmodifiableMap(map); unmodifiableMap.put("a", 1); } }
Exception in thread "main" java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableMap.put(Collections.java:1457) at ImmutableExample.main(ImmutableExample.java:9)
Collections.unmodifiableXXX() 先对原始的集合进行拷贝,需要对集合进行修改的方法都直接抛出异常。
public V put(K key, V value) { throw new UnsupportedOperationException(); }
互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。
互斥同步属于一种悲观的并发策略:总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁。
Java 中 synchronized 和 ReentrantLock 等独占锁就是悲观锁思想的实现。
乐观锁基于冲突检测的乐观并发策略:先进行操作,如果没有其他线程争用共享数据,操作成功;如果数据存在竞争,就采用补偿措施(常见的有不断重试,直到成功)。这种乐观的并发策略的许多实现是不需要将线程挂起的,因此这种同步操作称为非阻塞同步。
乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。、
硬件支持的原子性操作最典型的是:CAS(Compare-and-Swap)。
当多个线程尝试使用 CAS 同时更新一个共享变量时,只有其中一个线程能够更新共享变量中的值,其他线程都失败,失败的线程不会被挂起,而是被告知在这次竞争中失败,并且可以再次尝试。
CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。
J.U.C 包里面的原子操作类的方法调用了 Unsafe 类的 CAS 操作。
要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。
多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。
public class StackClosedExample { public void add100() { int cnt = 0; for (int i = 0; i < 100; i++) { cnt++; } System.out.println(cnt); } }
public static void main(String[] args) { StackClosedExample example = new StackClosedExample(); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> example.add100()); executorService.execute(() -> example.add100()); executorService.shutdown(); }
100 100
如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。
符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题。
可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。如果创建一个 ThreadLocal 变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题。
public class ThreadLocalExample { public static void main(String[] args) { ThreadLocal threadLocal = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal.set(1); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(threadLocal.get()); threadLocal.remove(); }); Thread thread2 = new Thread(() -> { threadLocal.set(2); threadLocal.remove(); }); thread1.start(); thread2.start(); } }
1
thread1 中设置 threadLocal 为 1,而 thread2 设置 threadLocal 为 2。过了一段时间之后,thread1 读取 threadLocal 依然是 1,不受 thread2 的影响。
每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。
public class Thread implements Runnable { // 与此线程有关的 ThreadLocal 值,由 ThreadLocal 类维护 ThreadLocal.ThreadLocalMap threadLocals = null; }
默认情况下 threadLocals 变量值为 null,只有当前线程调用 ThreadLocal 类的 set() 或 get() 方法时才创建,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap 类对应的 get()、set()方法。
当调用一个 ThreadLocal 的 set(T value) 方法时,先得到当前线程的 ThreadLocalMap 对象,然后将 <ThreadLocal 对象,value>
键值对插入到该 Map 中。变量是存放在当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 中。
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); // 获取当前线程的 ThreadLocalMap 对象 if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
get() 方法类似。
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); // 获取当前线程的 ThreadLocalMap 对象 if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
所以对于以下代码:
public class ThreadLocalExample1 { public static void main(String[] args) { ThreadLocal threadLocal1 = new ThreadLocal(); ThreadLocal threadLocal2 = new ThreadLocal(); Thread thread1 = new Thread(() -> { threadLocal1.set(1); threadLocal2.set(1); }); Thread thread2 = new Thread(() -> { threadLocal1.set(2); threadLocal2.set(2); }); thread1.start(); thread2.start(); } }
其对应的底层结构图为:
由上图可以看出,ThreadLocal 从理论上讲并不是用来解决多线程并发问题的,因为根本不存在多线程竞争。
ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。
static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。ThreadLocalMap 中就会出现 key 为 null 的 Entry,如果我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。
ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()、get()、remove() 方法的时候,会清理掉 key 为 null 的记录。应该尽可能在每次使用 ThreadLocal 后手动调用 remove(),以避免出现 ThreadLocal 经典的内存泄漏甚至是造成自身业务混乱的风险。
当多个线程可以一起工作去解决某个问题时,如果某些部分必须在其它部分之前完成,那么就需要对线程进行协调。
在线程中调用另一个线程的 join() 方法,会将当前线程挂起,而不是忙等待,直到目标线程结束。
对于以下代码,虽然 b 线程先启动,但是因为在 b 线程中调用了 a 线程的 join() 方法,b 线程会等待 a 线程结束才继续执行,因此最后能够保证 a 线程的输出先于 b 线程的输出。
public class JoinExample { private class A extends Thread { @Override public void run() { System.out.println("A"); } } private class B extends Thread { private A a; B(A a) { this.a = a; } @Override public void run() { try { a.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("B"); } } public void test() { A a = new A(); B b = new B(a); b.start(); a.start(); } } public static void main(String[] args) { JoinExample example = new JoinExample(); example.test(); }
A B
调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。
它们都属于 Object 的一部分,而不属于 Thread。
只能用在同步方法或者同步控制块中使用,否则会在运行时抛出 IllegalMonitorStateException。
使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。
public class WaitNotifyExample { public synchronized void before() { System.out.println("before"); notifyAll(); } public synchronized void after() { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("after"); } } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); WaitNotifyExample example = new WaitNotifyExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。
相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。
使用 Lock 来获取一个 Condition 对象。
public class AwaitSignalExample { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void before() { lock.lock(); try { System.out.println("before"); condition.signalAll(); } finally { lock.unlock(); } } public void after() { lock.lock(); try { condition.await(); System.out.println("after"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); AwaitSignalExample example = new AwaitSignalExample(); executorService.execute(() -> example.after()); executorService.execute(() -> example.before()); }
before after
public class ProducerConsumer { private static class Producer implements Runnable{ private List<Integer> list; private int capacity; public Producer(List list, int capacity) { this.list = list; this.capacity = capacity; } @Override public void run() { while (true) { synchronized (list) { try { String producer = Thread.currentThread().getName(); while (list.size() == capacity) { System.out.println("生产者 " + producer + " list 已达到最大容量,进行 wait"); list.wait(); System.out.println("生产者 " + producer + " 退出 wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + " 生产数据" + i); list.add(i); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } private static class Consumer implements Runnable{ private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { String consumer = Thread.currentThread().getName(); while (list.isEmpty()) { System.out.println("消费者 " + consumer + " list 为空,进行 wait"); list.wait(); System.out.println("消费者 " + consumer + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者 " + consumer + " 消费数据:" + element); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { final LinkedList linkedList = new LinkedList(); final int capacity = 5; Thread producer = new Thread(new Producer(linkedList,capacity),"producer"); Thread consumer = new Thread(new Consumer(linkedList),"consumer"); consumer.start(); producer.start(); } }
输出结果:
生产者 producer 生产数据-1652445373 生产者 producer 生产数据1234295578 生产者 producer 生产数据-1885445180 生产者 producer 生产数据864400496 生产者 producer 生产数据621858426 生产者 producer list 已达到最大容量,进行 wait 消费者 consumer 消费数据:-1652445373 消费者 consumer 消费数据:1234295578 消费者 consumer 消费数据:-1885445180 消费者 consumer 消费数据:864400496 消费者 consumer 消费数据:621858426 消费者 consumer list 为空,进行 wait 生产者 producer 退出 wait
public class ProducerConsumer { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); private static class Producer implements Runnable{ private List<Integer> list; private int capacity; public Producer(List list, int capacity) { this.list = list; this.capacity = capacity; } @Override public void run() { while (true) { try { lock.lock(); String producer = Thread.currentThread().getName(); while (list.size() == capacity) { System.out.println("生产者 " + producer + " list 已达到最大容量,进行 wait"); full.await(); System.out.println("生产者 " + producer + " 退出 wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + " 生产数据" + i); list.add(i); empty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } private static class Consumer implements Runnable{ private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { try { lock.lock(); String consumer = Thread.currentThread().getName(); while (list.isEmpty()) { System.out.println("消费者 " + consumer + " list 为空,进行 wait"); empty.await(); System.out.println("消费者 " + consumer + " 退出wait"); } Integer element = list.remove(0); System.out.println("消费者 " + consumer + " 消费数据:" + element); full.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } public static void main(String[] args) { final LinkedList linkedList = new LinkedList(); final int capacity = 5; Thread producer = new Thread(new Producer(linkedList,capacity),"producer"); Thread consumer = new Thread(new Consumer(linkedList),"consumer"); consumer.start(); producer.start(); } }
输出结果:
生产者 producer 生产数据-1748993481 生产者 producer 生产数据-131075825 生产者 producer 生产数据-683676621 生产者 producer 生产数据1543722525 生产者 producer 生产数据804266076 生产者 producer list 已达到最大容量,进行 wait 消费者 consumer 消费数据:-1748993481 消费者 consumer 消费数据:-131075825 消费者 consumer 消费数据:-683676621 消费者 consumer 消费数据:1543722525 消费者 consumer 消费数据:804266076 消费者 consumer list 为空,进行 wait 生产者 producer 退出 wait
public class ProducerConsumer { private static class Producer implements Runnable{ private BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { String producer = Thread.currentThread().getName(); Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + " 生产数据" + i); queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static class Consumer implements Runnable{ private BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue =queue; } @Override public void run() { while (true) { try { String consumer = Thread.currentThread().getName(); Integer element = queue.take(); System.out.println("消费者 " + consumer + " 消费数据:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); Thread producer = new Thread(new Producer(queue),"producer"); Thread consumer = new Thread(new Consumer(queue),"consumer"); consumer.start(); producer.start(); } }
输出结果:
生产者producer生产数据-222876564 消费者consumer正在消费数据-906876105 生产者producer生产数据-9385856 消费者consumer正在消费数据1302744938 生产者producer生产数据-177925219 生产者producer生产数据-881052378 生产者producer生产数据-841780757 生产者producer生产数据-1256703008 消费者consumer正在消费数据1900668223 消费者consumer正在消费数据2070540191 消费者consumer正在消费数据1093187 消费者consumer正在消费数据6614703 消费者consumer正在消费数据-1171326759
1114. 按序打印
参考代码:
import java.util.concurrent.CountDownLatch; class Foo { private CountDownLatch countDownLatchA; //等待 A 线程执行完 private CountDownLatch countDownLatchB; //等待 B 线程执行完 public Foo() { countDownLatchA = new CountDownLatch(1); //只等待一个线程,即 A 线程 countDownLatchB = new CountDownLatch(1); //只等待一个线程,即 B 线程 } public void first(Runnable printFirst) throws InterruptedException { // printFirst.run() outputs "first". Do not change or remove this line. printFirst.run(); countDownLatchA.countDown(); //减去 1 后,cnt =0 那些因为调用 await() 方法而在等待的线程就会被唤醒。 } public void second(Runnable printSecond) throws InterruptedException { countDownLatchA.await(); // printSecond.run() outputs "second". Do not change or remove this line. printSecond.run(); countDownLatchB.countDown(); //减去 1 后,cnt =0 那些因为调用 await() 方法而在等待的线程就会被唤醒。 } public void third(Runnable printThird) throws InterruptedException { countDownLatchB.await(); // printThird.run() outputs "third". Do not change or remove this line. printThird.run(); } }