实现线程的两种方式:继承 Thread,实现 Runnable 接口
有了 Thread 不就够了?通过继承Thread来实现线程虽然比较简单,但 Java 中每个类最多只能有一个父类,如果类已经有父类了,就不能再继承 Thread。
启动线程调 start 而不是 run,一个线程对象只能启动一次
线程的状态
除了main线程外,至少还有一个负责垃圾回收的线程,这个线程就是 daemon 线程,在 main 线程结束的时候,垃圾回收线程也会退出。
Thread 几个静态方法:
sleep 方法
用该方法会让当前线程睡眠指定的时间,单位是毫秒。睡眠期间,该线程会让出 ** CPU(CPU可以去干其他事了),睡眠期间,线程可以被中断**,如果被中断,sleep 会抛出 InterruptedException 异常。
yield 方法
调用该方法,是告诉操作系统的调度器:现在不着急占用 CPU,可以先让其他线程运行。不过,这对调度器也仅仅是建议,调度器如何处理是不一定的,它可能完全忽略该调用。
join 方法
可以让调用join的线程等待该线程结束,join 实际上就是调用了 wait 方法。
每个线程表示一条单独的执行流,有自己的程序计数器,有自己的栈,但线程之间可以共享内存,它们可以访问和操作相同的对象。当多条执行流执行相同的程序代码时,每条执行流都有单独的栈,方法中的参数和局部变量都有自己的一份。当多条执行流可以操作相同的变量时,可能会出现一些意料之外的结果,包括竞态条件和内存可见性问题。
所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。
10个线程同时对一个变量counter执行加一,可能每次结果都不一样,因为counter++这个操作不是原子操作,它分为三个步骤:
如何解决这个问题:
多个线程可以共享访问和操作相同的变量,但一个线程对一个共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。这就是内存可见性问题。在计算机系统中,除了内存,数据还会被缓存在 CPU 的寄存器以及各级缓存中,当访问一个变量时,可能直接从寄存器或 CPU 缓存中获取,而不一定到内存中去取,当修改一个变量时,也可能是先写到缓存中,稍后才会同步更新到内存中。
怎么解决:
synchronized 可以用于修饰类的实例方法、静态方法和代码块。
方法加了 synchronized 后,方法内的代码就变成了原子操作。
synchronized 实例方法实际保护的是同一个对象的方法调用,确保同时只能有一个线程执行。
synchronized 保护的是对象而非代码,只要访问的是同一个对象的 synchronized 方法,即使是不同的代码,也会被同步顺序访问。synchronized 方法不能防止非 synchronized 方法被同时执行,所以一般在保护变量时,需要在所有访问该变量的方法上加上 synchronized 。
实例方法
synchronized 实例方法保护的是当前实例对象,即this, this 对象有一个锁和一个等待队列,锁只能被一个线程持有,其他试图获得同样锁的线程需要等待。
执行synchronized实例方法的过程大致如下:
当前线程不能获得锁的时候,它会加入等待队列等待,线程的状态会变为 BLOCKED。
静态方法
对静态方法,保护的是类对象。实际上,每个对象都有一个锁和一个等待队列,类对象也是。
代码块
synchronized 括号里面的就是保护的对象,因为任意对象都有一个锁和等待队列,或者说,任何对象都可以作为锁对象。
可重入性
对同一个执行线程,它在获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。
可重入是通过记录锁的持有线程和持有数量来实现的,当调用被 synchronized 保护的代码时,检查对象是否已被锁,如果是,再检查是否被当前线程锁定,如果是,增加持有数量,如果不是被当前线程锁定,才加入等待队列,当释放锁时,减少持有数量,当数量变为0时才释放整个锁。
内存可见性
在释放锁时,所有写入都会写回内存,而获得锁后,都会从内存中读最新数据。
死锁
应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。
多线程之间除了竞争访问同一个资源外,也经常需要相互协作,基本方式就是 wait/notify
。
Java 的根父类是 Object , Java 在 Object 类而非 Thread 类中定义了一些线程协作的基本方法,这些方法有两类,一类是 wait ,另一类是 notify 。
wait实际上做了什么?除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。
但调用wait时,线程会释放对象锁。
一个线程因为等待某个条件执行不下去,当这个条件改变之后就该调用 notify 方法了,notify 会从条件队列中选一个线程,将其从队列中移除并唤醒,选哪个是不确定的。而 notifyAll 会移除条件队列中所有的线程并全部唤醒。
调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁。
唤醒之后线程会重新尝试竞争获得锁:如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回,否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从 wait 调用中返回。
线程从wait调用中返回后,不代表其等待的条件就一定成立,它需要重新检查其等待的条件,这也是在条件附近看到 while 而不是 if 的原因。
停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。
每个线程都有一个标志位,表示该线程是否被中断了。
中断相关的方法:
public void interrupt(); // 中断线程 public boolean isInterrupted(); // 线程的中断标志位是否为true public static boolean interrupted(); // 线程的中断标志位是否为true + 清空中断标志位
注意:interrupt方法不一定会真正“中断”线程,
RUNNABLE
线程在运行或具备运行条件只是在等待操作系统调度。
WAITING/TIMED_WAITING
线程在等待某个条件或超时。线程调用join/wait/sleep方法会进入WAITING或TIMED_WAITING状态。调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。InterruptedException是一个受检异常,线程必须进行处理。
BLOCKED
线程在等待锁,试图进入同步块。调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。
test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并调用join等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。
public static void test() throws InterruptedException { synchronized (lock) { A a = new A(); a.start(); Thread.sleep(1000); a.interrunpt(); a.join(); } }
注意:在使用 synchronized 关键字获取锁的过程中不响应中断请求,这是 synchronized 的局限性。
NEW/TERMINATE
线程还未启动或已结束。调用 interrupt() 对它没有任何效果,中断标志位也不会被设置。
原子操作依赖一个很重要的方法:
public final boolean compareAndSet(int expect, int update)
这个方法就被成为CAS
。该方法有两个参数 expect 和 update ,以原子方式实现了如下功能:如果当前值等于 expect ,则更新为 update ,否则不更新,如果更新成功,返回 true,否则返回 false 。
以 AtomicInteger 为例,AtomicInteger 可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性。它的主要内部成员是:
public volatile int value; // 这个变量天生保证内存可见性
AtomicInteger 有个方法 incrementAndGet:
public final int incrementAndGet() { for(;;) { int current = get(); // 获取当前值value int next = current + 1; // 计算期望的值next // 调CAS方法进行更新,如果更新没有成功,说明value被别的线程改了,则再去取最新值并尝试更新直到成功为止。 if(compareAndSet(current, next)) { return next; } } }
与 synchronized 锁相比,这种原子更新方式代表一种不同的思维方式。synchronized 是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用 CAS 更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。
AQS是一个抽象类AbstractQueuedSynchronizer。
AQS封装了一个状态,给子类提供了查询和设置状态的方法:
public volatile int state; protected final int getState(); protected final void setState(int newState); protected final boolean compareAndSetState(int expect, int update);
用于实现锁时,AQS 可以保存锁的当前持有线程,提供了方法进行查询和设置:
private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread t); protected final Thread getExclusiveOwnerThread();
AQS内部维护了一个等待队列,借助 CAS 方法实现了无阻塞算法进行更新。
使用 CAS 方式更新有一个 ABA 问题。该问题是指,假设当前值为A,如果另一个线程先将 A 修改成 B ,再修改回成 A ,当前线程的 CAS 操作无法分辨当前值发生过变化。
ABA 是不是一个问题与程序的逻辑有关,一般不是问题。而如果确实有问题,解决方法是使用 AtomicStampedReference
显式锁接口和类主要有:
Lock 接口定义为:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; // 可以避免死锁。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。 boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
Lock接口的主要实现类是ReentrantLock,底层依赖CAS,AQS,ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:
ReentrantLock 和 synchronized 都是默认不保证公平。使用显式锁,一定要记得调用 unlock。
相比 synchronized , ReentrantLock 可以实现与 synchronized 相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。
synchronized代表一种声明式编程思维,程序员更多的是表达一种同步声明,由 Java 系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维,程序员实现所有细节。
简单总结下,能用 synchronized 就用 synchronized,不满足要求时再考虑 ReentrantLock。
显式锁与 synchronized 相对应,而显式条件与 wait/notify 相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。
Condition 表示条件变量,是一个接口,其中有 await、signal、signalAll 方法。
await 对应于 Object 的 wait , signal 对应于 notify, signalAll 对应于 notifyAll,语义也是一样的。
一般的 await 相关方法都是响应中断的,如果发生了中断,会抛出 InterruptedException,但中断标志位会被清空。awaitUnInterruptibly() 方法不会响应中断,它不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。
await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从 await 方法中退出。
示例:
static class MyBlockQueue<E> { private Queue<E> queue = null; private int limit; private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public MyBlockQueue(int limit) { this.limit = limit; // ArrayDeque是线程不安全的 queue = new ArrayDeque<>(); } private void put(E e) throws InterruptedException { lock.lockInterruptibly(); try { // 队列满,在notFull等待,不让放 while (queue.size() == limit) { notFull.await(); } queue.add(e); // 唤醒一下,现在不空了 notEmpty.signal(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lockInterruptibly(); try { // 队列空,在notEmpty等待,不让取 while (queue.isEmpty()) { notEmpty.await(); } E e = queue.poll(); // 唤醒一下,现在不满了 notFull.signal(); return e; } finally { lock.unlock(); } } }
上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知 notEmpty, take 操作后通知 notFull。这样,代码更清晰易读。
基本接口:
Runnable 没有返回结果,而 Callable 有,Runnable 不会抛出异常,而 Callable 会。
Executor 表示最简单的执行服务,可以执行一个 Runnable,没有返回结果。
ExecutorService 扩展了 Executor,其中的 submit 方法表示提交一个任务,返回值类型都是 Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。
Future中的 get 用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待;cancel 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel 返回 false,否则返回true。isDone 和 isCancelled 用于查询任务状态。isCancelled 表示任务是否被取消,只要 cancel 方法返回了 true,随后的isCancelled 方法都会返回 true,即使执行任务的线程还未真正结束。isDone 表示任务是否结束,不管什么原因都算。
Future 是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。
基本使用:
package AsyncTask; import java.util.Random; import java.util.concurrent.*; public class AsyncTaskDemo { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(new Task()); System.out.println("这是主线程"); Thread.sleep(100); try { System.out.println("任务结果" + future.get()); } catch (ExecutionException e) { e.printStackTrace(); } executorService.shutdown(); } static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { int sleepSeconds = new Random().nextInt(1000); System.out.println("子线程开始休眠"); Thread.sleep(sleepSeconds); System.out.println("子线程休眠结束"); return sleepSeconds; } } }
示例中的大致步骤就是:
其中 ExecutorService 有两个关闭方法:ExecutorServicshutdown 和 shutdownNow。区别是,shutdown表示不再接受新任务,shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。shutdown 和 shutdownNow 不会阻塞等待,它们返回后不代表所有任务都已结束,调用者可以通过awaitTermination等待所有任务结束。
ExecutorService 有两组批量提交任务的方法:invokeAll 和 invokeAny。invokeAll 等待所有任务完成,返回的 Future 列表中,每个 Future 的 isDone 方法都返回true,不过 isDone 为true不代表任务就执行成功了,可能是被取消了。而对于 invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消
好累啊不想写了
线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。( JavaScript 中的异步实现也是类似的套路哦)
线程池的实现类是 ThreadPoolExecutor,它继承自 AbstractExecutorService ,实现了 ExecutorService ,基本用法与上节异步任务介绍的类似。
构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);
第二个构造方法多了两个参数 threadFactory 和 handler,这两个参数一般不需要,第一个构造方法会设置默认值。参数 corePoolSize、maximumPoolSize、keepAliveTime、unit 用于控制线程池中线程的个数,workQueue 表示任务队列,threadFactory 用于对创建的线程进行一些配置,handler表示任务拒绝策略。
线程池大小
一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSize,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于 corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了 maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到 maximumPoolSize。
keepAliveTime 的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于 corePoolSize 时额外空闲线程的存活时间。如果该值为 0 ,则表示所有线程都不会超时终止。
队列
这里要求队列类型是阻塞队列 BlockingQueue。
注意:如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize,到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到 maximumPoolSize。
任务拒绝策略
如果队列有界,且 maximumPoolSize 有限,则当队列排满,线程个数也达到了 maximumPoolSize,这时,新任务会触发线程池的任务拒绝策略。
ThreadPoolExecuto r实现了4种处理方式。
拒绝策略可以在构造方法中进行指定,也可以通过 set 方法进行指定
工厂
线程池还可以接受一个参数:ThreadFactory。它是一个接口,由这个接口l来定义如何创建一个 Thread。
核心线程
线程个数小于等于 corePoolSiz e时,我们称这些线程为核心线程,默认情况下:
死锁
提交给线程池的任务之间有如果依赖,这种情况可能会导致出现死锁。这个死锁不是说共享资源竞争的死锁,而是单纯的等待,比如任务A,在它的执行过程中,它给同样的任务执行服务提交了一个任务B,但需要等待任务B结束。
解决办法:可以使用 newCachedThreadPool 创建线程池,让线程数不受限制。另一个解决方法是使用 SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue 来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。
package AsyncTask; import java.util.Timer; import java.util.TimerTask; public class TimerDemo { public static void main(String[] args) throws InterruptedException { Timer timer = new Timer(); timer.schedule(new DelayTask(), 10); // 延迟指定时间后以固定时延执行 timer.schedule(new DelayTask2(), 100, 1000); Thread.sleep(4000); timer.cancel(); } static class DelayTask extends TimerTask { @Override public void run() { System.out.println("延迟1任务执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } } static class DelayTask2 extends TimerTask { @Override public void run() { System.out.println("延迟2任务执行"); } } }
创建一个 Timer 对象,先运行 DelayTask,再固定周期运行 DelayTask2,最后调用 Timer 的 cancel 方法取消所有定时任务。
这里会发现 DelayTask2 总是等 DelayTas k执行之后才开始输出,因为一个 Timer 对象只有一个 Timer 线程在执行,所以 DelayTask2 被 DelayTask 给强行延迟了。
注意:任务的延迟执行分为固定延时(fixed-delay)与固定频率(fixed-rate),二者都是重复执行,但后一次任务执行相对的时间是不一样的,对于固定延时,它是基于上次任务的“实际”执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时,而固定频率会尽量补够运行次数。
Timer 内部主要由任务队列和 Timer 线程两部分组成,一个 Timer 对象只有一个 Timer 线程。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。Timer 线程主体是一个循环,从队列中获取任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。
在执行任何一个任务的 run 方法时,一旦 run 抛出异常,Timer 线程就会退出,从而所有定时任务都会被取消。
如果希望各个定时任务不互相干扰,一定要在 run 方法内捕获所有异常。
总之需要注意:
由于 Timer/TimerTask 的一些问题,Java 并发包引入了 ScheduledExecutorService。ScheduledExecutorService 的主要实现类是ScheduledThreadPoolExecutor,它是线程池 ThreadPoolExecutor 的子类,是基于线程池实现的。它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使 corePoolSize 设为 0,它也会至少运行一个线程。
与 Timer 不同,它不支持以绝对时间作为首次运行的时间。另外,单个定时任务的异常不会再导致全部定时任务被取消,即使后台只有一个线程执行任务。不过,需要强调的是,任务发生异常不会在任何地方体现,也就是说在 run 方法里 throw 了之后什么也看不见。所以,与 Timer 中的任务类似,应该捕获所有异常。
package AsyncTask; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledExecutorServiceDemo { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService timer = Executors.newScheduledThreadPool(10); timer.schedule(new LongRunTask(), 10, TimeUnit.MILLISECONDS); timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS); Thread.sleep(4000); timer.shutdown(); } static class LongRunTask implements Runnable { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("LongRunTask"); throw new RuntimeException(); } } static class FixedDelayTask implements Runnable { @Override public void run() { System.out.println("FixedDelayTask"); } } }
ScheduledThreadPoolExecutor 的实现思路与 Timer 基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:
synchronized 和显式锁 ReentrantLock,对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。
通过一个 ReadWriteLock 产生两个锁:一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。需要注意的是,只有“读-读”操作是可以并行的,“读-写”和“写-写”都不可以。
内部,它们使用同一个整数变量表示锁的状态,16 位给读锁用,16 位给写锁用,使用一个变量便于进行 CAS 操作,锁的等待队列其实也只有一个。写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了 0,如果是,唤醒等待队列中的下一个线程。
package AsyncTask; import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class MyCache { private Map<String, Object> map = new HashMap<>(); private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); public Object get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public Object put(String key, Object value) { writeLock.lock(); try { return map.put(key, value); } finally { writeLock.unlock(); } } public void clear() { writeLock.lock(); try { map.clear(); } finally { writeLock.unlock(); } } }
有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。
一般锁只能由持有锁的线程释放,而 Semaphore 表示的只是一个许可数,任意线程都可以调用其 release 方法。主要的锁实现类 ReentrantLock 是可重入的,而 Semaphore 不是,每一次的 acquire 调用都会消耗一个许可,acquire 是会阻塞的。
package AsyncTask; import java.util.concurrent.Semaphore; public class SemaphoreDemo { public static class ConcurrentLimitException extends RuntimeException { private static final long serialVersionUID = 1L; } private static final int MAX_PERMITS = 10; private Semaphore permits = new Semaphore(MAX_PERMITS); public boolean login(String name, String pwd) { if(!permits.tryAcquire()) { throw new ConcurrentLimitException(); } // TODO 校验密码 return true; } public void logout(String name) { // TODO 登出操作 permits.release(); } }
门栓的两种应用场景:一种是同时开始,另一种是主从协作。
同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用 await,主线程调用 countDown
主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用 countDown,主线程调用 await 进行等待。
package AsyncTask; import java.util.concurrent.CountDownLatch; // 同时开始场景 public class RacerWithCountDwnLatch { static class Racer extends Thread { CountDownLatch latch; public Racer(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { latch.await(); // 没有countDown信号就会卡在这 System.out.println(getName() + "开始正式运行" + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { int num = 10; CountDownLatch latch = new CountDownLatch(1); Thread[] racers = new Thread[num]; for(int i = 0; i < 10; i++) { racers[i] = new Racer(latch); racers[i].start(); } Thread.sleep(1000); // 发信号让线程一起开始动作 latch.countDown(); } }
package AsyncTask; import java.util.concurrent.CountDownLatch; // 主从协作场景 public class MasterWorkerDemo { static class Worker extends Thread { CountDownLatch latch; public Worker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { int sleepTime = (int) (Math.random() * 10); System.out.println(sleepTime); Thread.sleep(sleepTime); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { latch.countDown(); } } } public static void main(String[] args) throws InterruptedException { int num = 10; CountDownLatch latch = new CountDownLatch(num); Worker[] workers = new Worker[num]; for(int i = 0; i < num; i++) { workers[i] = new Worker(latch); workers[i].start(); } latch.await(); System.out.println("全部结束"); } }
CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。
与 CountDownLatch 类似,它也有一个数字,但表示的是参与的线程个数。
它有一个构造方法,接受一个 Runnable 参数,这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。
CyclicBarrier 的主要方法就是 await,await 在等待其他线程到达栅栏,调用 await 后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。
package AsyncTask; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { static class Tourist extends Thread { CyclicBarrier barrier; public Tourist(CyclicBarrier barrier) { this.barrier = barrier; } @Override public void run() { try { Thread.sleep((int) (Math.random() * 10)); // 第一次集合 barrier.await(); System.out.println(getName() + "继续" + System.currentTimeMillis()); Thread.sleep((int) (Math.random() * 10)); // 第二次集合 barrier.await(); System.out.println(getName() + "继续" + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { int num = 3; Tourist[] tourists = new Tourist[num]; CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() { @Override public void run() { System.out.println("全部集合了" + System.currentTimeMillis() + " 最后执行者:" + Thread.currentThread().getName()); } }); for (int p = 0; p < num; p++) { tourists[p] = new Tourist(barrier); tourists[p].start(); } } }
线程本地变量是说,每个线程都有同一个变量的独有拷贝。
多个线程访问的虽然是同一个变量,但每个线程都有自己的独立的值,这就是线程本地变量的含义。
使用场景:日期处理、随机数和上下文信息。
日期处理
package Threads.ThreadLocal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * 每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销 */ public class ThreadLocalDateFormat { static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd"); } }; public static String date2String(Date date) { return sdf.get().format(date); } public static Date string2Date(String str) throws ParseException { return sdf.get().parse(str); } }
随机数
即使对象是线程安全的,使用 ThreadLocal 也可以减少竞争,它是 Random 的子类,利用了 ThreadLocal,它没有 public 的构造方法,通过静态方法current 获取对象,这个对象就是个就是一个 ThreadLocal 变量。
上下文信息
package Threads.ThreadLocal; public class ReqContext { public static class Req {}; private static ThreadLocal<String> localUserId = new ThreadLocal<>(); private static ThreadLocal<Req> localReq = new ThreadLocal<>(); public static String getCurrentUserId() { return localUserId.get(); } public static void setCurrentUserId(String userId) { localUserId.set(userId); } public static Req getCurrentReq() { return localReq.get(); } public static void setCurrentReq(Req req) { localReq.set(req); } }
在一个 Web 服务器中,一个线程执行用户的请求,在执行过程中,很多代码都会访问一些共同的信息,比如请求信息、用户身份信息,它们是线程执行过程中的全局信息,在首次获取到信息时,调用 set 方法如 setCurrentRequest/setCurrentUserId 进行设置,然后就可以在代码的任意其他地方调用 get 相关方法进行获取了。
每个线程都有一个 Map,类型为 ThreadLocalMap ,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的 ThreadLocal 对象,值为 value。
每个线程都有一个 Map,对于每个 ThreadLocal 对象,调用其get/set实际上就是以 ThreadLocal 对象为键读写当前线程的 Map,这样,就实现了每个线程都有自己的独立副本的效果。
本章介绍了 Java 一些同步协作工具: