在很多面试场景和真实工作场景中,我们都会碰到多线程编程。多线程编程允许我们并发运行线程,其中每个线程可以处理不同的任务。因此,它可以最佳地利用系统资源,尤其当我们的计算机拥有多核 CPU 或多个 CPU 时。
有时,我们想控制多个线程同时启动。
在这篇文章里,我们首先要了解 “同时” 的含义。此外,我们将讨论如何在 Java 中同时启动两个线程。
我们的目标是:“同时启动两个线程。”
这个目标看起来很容易理解。但是,如果我们仔细思考一下,完全同时启动两个线程真的可能吗?
首先,每个线程都会消耗CPU时间来工作。因此,如果我们的应用程序运行在具有单核 CPU 的计算机上,就不可能完全同时启动两个线程。
但是如果我们的计算机有一个多核CPU或多个CPU,两个线程是可以在某个时间点同时启动的。但是,我们无法在 Java 代码层面控制。
这是因为当我们在 Java 中使用线程时,Java 的线程调度依赖于操作系统的线程调度。而不同的操作系统可能会有不同的处理方式。
此外,如果我们以更严格的方式讨论“完全相同的时间”,根据爱因斯坦的狭义相对论:
如果两个不同的事件在空间上是分开的,就不可能在绝对意义上说这两个不同的事件同时发生。——爱因斯坦
无论我们的 CPU 位于主板上或位于 CPU 中的内核有多近,它们之间总有缝隙。因此,我们不能确保两个线程完全同时启动。
但是,即使我们不能让两个线程在同一时间启动,我们仍可以使用一些同步手段让它们的启动间隔非常短暂。
当我们需要两个线程“同时”启动时,这些技术可以在大多数实际情况下帮助我们。
在本文,我们将介绍三种解决此问题的方法:
使用 CountDownLatch 类
使用 CyclicBarrier 类
使用 Phaser 类
这几个方法都遵循相同的思路:我们不会真正同时启动两个线程。相反,我们在线程启动后立即阻塞线程并尝试同时恢复它们的执行。
CountDownLatch是 Java 5 中作为java.util.concurrent包的一部分引入的同步器。通常,我们使用CountDownLatch来阻塞线程,直到其他线程完成它们的任务。
简单地说,我们在CountDownLatch对象中设置一个计数,并将CountDownLatch对象与一些线程相关联。当我们启动这些线程时,它们将被阻塞,直到CountDownLatch的计数变为零。
另一方面,在其他线程中,我们可以控制在什么情况下我们减少计数,让被阻塞的线程恢复,例如,当主线程中的某些任务完成时。
工作线程
现在,让我们看看如何使用CountDownLatch类解决我们的问题。
首先,我们将创建我们的Thread类。我们称之为WorkerWithCountDownLatch:
public class WorkerWithCountDownLatch extends Thread { private CountDownLatch latch; public WorkerWithCountDownLatch(String name, CountDownLatch latch) { this.latch = latch; setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the latch...\n", getName()); latch.await(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (InterruptedException e) { // handle exception } }
我们在WorkerWithCountDownLatch 类中添加了一个CountDownLatch 对象。首先我们来了解一下CountDownLatch 对象的作用。
在run()方法中,我们调用了latch.await()方法。这意味着,如果我们启动工作线程,它将检查CountDownLatch 的计数。线程将被阻塞,直到计数为零。
这样,我们就可以在主线程中创建一个count=1的CountDownLatch(1),并将锁对象与我们想要同时启动的两个工作线程相关联。
当我们希望两个线程继续执行它们的实际工作时,我们通过在主线程中调用latch.countDown()来释放锁。
接下来我们来看看主线程是如何控制这两个工作线程的。
主线程
我们将在usingCountDownLatch()方法中实现主线程:
private static void usingCountDownLatch() throws InterruptedException { System.out.println("==============================================="); System.out.println(" >>> Using CountDownLatch <<<<"); System.out.println("==============================================="); CountDownLatch latch = new CountDownLatch(1); WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch); WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now release the latch:"); System.out.println("-----------------------------------------------"); latch.countDown(); }
现在,让我们从main()方法调用上面的usingCountDownLatch ()方法。当我们运行 main()方法时,我们会看到输出:
=============================================== >>> Using CountDownLatch <<<< =============================================== [ Worker with latch 1 ] created, blocked by the latch [ Worker with latch 2 ] created, blocked by the latch ----------------------------------------------- Now release the latch: ----------------------------------------------- [ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z [ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z
如上面的输出所示,两个工作线程几乎同时启动。两个开始时间之间的差异小于两微秒。
CyclicBarrier 类是Java 5 开始引入的另一个同步器,CyclicBarrier允许固定数量的线程互相等待,直到都完成了某件事情。
接下来,让我们看看如何使用CyclicBarrier类解决同时启动的问题。
工作线程
我们先来看看工作线程的实现:
public class WorkerWithCyclicBarrier extends Thread { private CyclicBarrier barrier; public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) { this.barrier = barrier; this.setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the barrier\n", getName()); barrier.await(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (InterruptedException | BrokenBarrierException e) { // handle exception } } }
实现非常简单。我们将CyclicBarrier对象与工作线程相关联。当线程启动时,我们立即调用barrier.await() 方法。
这样,工作线程就会被阻塞,等待其它都调用了 barrier.await() 后才恢复运行。
主线程
接下来我们看看主线程中如何控制两个工作线程的恢复:
private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException { System.out.println("\n==============================================="); System.out.println(" >>> Using CyclicBarrier <<<<"); System.out.println("==============================================="); CyclicBarrier barrier = new CyclicBarrier(3); WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier); WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now open the barrier:"); System.out.println("-----------------------------------------------"); barrier.await(); }
我们的目标是让两个工作线程同时恢复。所以,加上主线程,我们一共有三个线程。
如上方法所示,我们在主线程中创建了一个包含计数为3的CyclicBarrier对象。接下来,我们创建并启动两个工作线程。
正如我们之前所讨论的,两个工作线程被阻塞并等待屏障打开以恢复。
在主线程中,我们可以做一些实际的工作。当我们决定打开barrier时,我们调用barrier.await() 方法让两个worker继续执行。
如果我们在 main()方法中调用usingCyclicBarrier(),我们将得到输出:
=============================================== >>> Using CyclicBarrier <<<< =============================================== [ Worker with barrier 1 ] created, blocked by the barrier [ Worker with barrier 2 ] created, blocked by the barrier ----------------------------------------------- Now open the barrier: ----------------------------------------------- [ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z [ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z
我们可以比较两个工作线程的开始时间。即使两个工作线程没有在完全相同的时间开始,我们也非常接近我们的目标:两个开始时间之间的差异小于三微秒。
Phaser 是Java 7开始引入的,很类似CyclicBarrier和CountDownLatch。但是,Phaser类更灵活。
例如,与CyclicBarrier和CountDownLatch不同,Phaser允许我们动态注册线程。
接下来,让我们使用Phaser解决问题。
工作线程
像之前一样,我们先看一下实现,然后了解它是如何工作的:
public class WorkerWithPhaser extends Thread { private Phaser phaser; public WorkerWithPhaser(String name, Phaser phaser) { this.phaser = phaser; phaser.register(); setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the phaser\n", getName()); phaser.arriveAndAwaitAdvance(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (IllegalStateException e) { // handle exception } } }
当工作线程被实例化时,我们通过调用phaser.register()将当前线程注册到给定的 Phaser 对象 。这样,当前的工作就变成了Phaser 的一个线程方 。
接下来,当工作线程启动时,我们立即调用phaser.arriveAndAwaitAdvance()。因此,我们告诉 phaser 当前线程已经到达,并将等待其他线程方的到达继续进行。当然,在其他线程方到来之前,当前线程是被阻塞的。
主线程
接下来,我们继续看主线程的实现:
private static void usingPhaser() throws InterruptedException { System.out.println("\n==============================================="); System.out.println(" >>> Using Phaser <<<"); System.out.println("==============================================="); Phaser phaser = new Phaser(); phaser.register(); WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser); WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now open the phaser barrier:"); System.out.println("-----------------------------------------------"); phaser.arriveAndAwaitAdvance(); }
在上面的代码中,我们可以看到,主线程将自己注册为Phaser对象的线程方。
在我们创建并阻塞了两个工作线程之后,主线程也调用了phaser.arriveAndAwaitAdvance()。这样我们就打开了移相器屏障,让两个工作线程可以同时恢复。
最后,让我们调用main()方法中的usingPhaser ()方法:
=============================================== >>> Using Phaser <<< =============================================== [ Worker with phaser 1 ] created, blocked by the phaser [ Worker with phaser 2 ] created, blocked by the phaser ----------------------------------------------- Now open the phaser barrier: ----------------------------------------------- [ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z [ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z
同样,两个工作线程几乎同时启动。两个开始时间之间的差异小于两微秒。
在本文中,我们首先讨论了目标:“同时启动两个线程”。
接下来,我们讨论了同时启动三个线程的方法:使用CountDownLatch、 CyclicBarrier和Phaser。
它们的思路很相似,阻塞两个线程并试图让它们同时恢复执行。
尽管这些方法不能保证两个线程完全同时启动,但对于现实世界中的大多数情况,结果非常接近且足够。