首先给大家分享一个github仓库,上面放了200多本经典的计算机书籍,包括C语言、C++、Java、Python、前端、数据库、操作系统、计算机网络、数据结构和算法、机器学习、编程人生等,可以star一下,下次找书直接在上面搜索,仓库持续更新中~
github地址:https://github.com/Tyson0314/java-books
如果github访问不了,可以访问gitee仓库。
gitee地址:https://gitee.com/tysondai/java-books
线程池:一个管理线程的池子。
创建新的线程需要获取全局锁,通过这种设计可以尽量避免获取全局锁,当 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),提交的大部分任务都会被放到 BlockingQueue。
为了形象描述线程池执行,打个比喻:
ThreadPoolExecutor 的通用构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
corePoolSize:当有新任务时,如果线程池中线程数没有达到线程池的基本大小,则会创建新的线程执行任务,否则将任务放入阻塞队列。当线程池中存活的线程数总是大于 corePoolSize 时,应该考虑调大 corePoolSize。
maximumPoolSize:当阻塞队列填满时,如果线程池中线程数没有超过最大线程数,则会创建新的线程运行任务。否则根据拒绝策略处理新任务。非核心线程类似于临时借来的资源,这些线程在空闲时间超过 keepAliveTime 之后,就应该退出,避免资源浪费。
BlockingQueue:存储等待运行的任务。
keepAliveTime:非核心线程空闲后,保持存活的时间,此参数只对非核心线程有效。设置为0,表示多余的空闲线程会被立即终止。
TimeUnit:时间单位
TimeUnit.DAYS TimeUnit.HOURS TimeUnit.MINUTES TimeUnit.SECONDS TimeUnit.MILLISECONDS TimeUnit.MICROSECONDS TimeUnit.NANOSECONDS
ThreadFactory:每当线程池创建一个新的线程时,都是通过线程工厂方法来完成的。在 ThreadFactory 中只定义了一个方法 newThread,每当线程池需要创建新线程就会调用它。
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName);//将线程池名字传递给构造函数,用于区分不同线程池的线程 } }
RejectedExecutionHandler:当队列和线程池都满了的时候,根据拒绝策略处理新任务。
AbortPolicy:默认的策略,直接抛出RejectedExecutionException DiscardPolicy:不处理,直接丢弃 DiscardOldestPolicy:将等待队列队首的任务丢弃,并执行当前任务 CallerRunsPolicy:由调用线程处理该任务
如果线程池线程数量太小,当有大量请求需要处理,系统响应比较慢影响体验,甚至会出现任务队列大量堆积任务导致OOM。
如果线程池线程数量过大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换(cpu给线程分配时间片,当线程的cpu时间片用完后保存状态,以便下次继续运行),从 而增加线程的执行时间,影响了整体执行效率。
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止某些原因导致的任务暂停(线程阻塞,如io操作,等待锁,线程sleep)而带来的影响。一旦某个线程被阻塞,释放了cpu资源,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 系统会用大部分的时间来处理 I/O 操作,而线程等待 I/O 操作会被阻塞,释放 cpu资源,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法:最佳线程数 = CPU核心数 * (1/CPU利用率) = CPU核心数 * (1 + (I/O耗时/CPU耗时)),一般可设置为2N
常见的线程池有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 和 ScheduledThreadPool。这几个都是 ExecutorService (线程池)实例。
FixedThreadPool
固定线程数的线程池。任何时间点,最多只有 nThreads 个线程处于活动状态执行任务。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
使用无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE),运行中的线程池不会拒绝任务,即不会调用RejectedExecutionHandler.rejectedExecution()方法。
maxThreadPoolSize 是无效参数,故将它的值设置为与 coreThreadPoolSize 一致。
keepAliveTime 也是无效参数,设置为0L,因为此线程池里所有线程都是核心线程,核心线程不会被回收(除非设置了executor.allowCoreThreadTimeOut(true))。
适用场景:适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。需要注意的是,FixedThreadPool 不会拒绝任务,在任务比较多的时候会导致 OOM。
SingleThreadExecutor
只有一个线程的线程池。
public static ExecutionService newSingleThreadExecutor() { return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
使用无界队列 LinkedBlockingQueue。线程池只有一个运行的线程,新来的任务放入工作队列,线程处理完任务就循环从队列里获取任务执行。保证顺序的执行各个任务。
适用场景:适用于串行执行任务的场景,一个任务一个任务地执行。在任务比较多的时候也是会导致 OOM。
CachedThreadPool
根据需要创建新线程的线程池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
如果主线程提交任务的速度高于线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
使用没有容量的SynchronousQueue作为线程池工作队列,当线程池有空闲线程时,SynchronousQueue.offer(Runnable task)
提交的任务会被空闲线程处理,否则会创建新的线程处理任务。
适用场景:用于并发执行大量短期的小任务。CachedThreadPool
允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPoolExecutor
在给定的延迟后运行任务,或者定期执行任务。在实际项目中基本不会被用到,因为有其他方案选择比如quartz
。
使用的任务队列 DelayQueue
封装了一个 PriorityQueue
,PriorityQueue
会对队列中的任务进行排序,时间早的任务先被执行(即ScheduledFutureTask
的 time
变量小的先执行),如果time相同则先提交的任务会被先执行(ScheduledFutureTask
的 squenceNumber
变量小的先执行)。
执行周期任务步骤:
DelayQueue
中获取已到期的 ScheduledFutureTask(DelayQueue.take())
。到期任务是指 ScheduledFutureTask
的 time 大于等于当前系统的时间;ScheduledFutureTask
;ScheduledFutureTask
的 time 变量为下次将要被执行的时间;ScheduledFutureTask
放回 DelayQueue
中(DelayQueue.add()
)。适用场景:周期性执行任务的场景,需要限制线程数量的场景。
进程是指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程中可以启动多个线程。
线程是比进程更小的执行单位,它是在一个进程中独立的控制流,一个进程可以启动多个线程,每条线程并行执行不同的任务。
初始(NEW):线程被构建,还没有调用 start()。
运行(RUNNABLE):包括操作系统的就绪和运行两种状态。
阻塞(BLOCKED):一般是被动的,在抢占资源中得不到资源,被动的挂起在内存,等待资源释放将其唤醒。线程被阻塞会释放CPU,不释放内存。
等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
终止(TERMINATED):表示该线程已经执行完毕。
图片来源:Java并发编程的艺术
线程中断即线程运行过程中被其他线程给打断了,它与 stop 最大的区别是:stop 是由系统强制终止线程,而线程中断则是给目标线程发送一个中断信号,如果目标线程没有接收线程中断的信号并结束线程,线程则不会终止,具体是否退出或者执行其他逻辑取决于目标线程。
线程中断三个重要的方法:
1、java.lang.Thread#interrupt
调用目标线程的interrupt()方法,给目标线程发一个中断信号,线程被打上中断标记。
2、java.lang.Thread#isInterrupted()
判断目标线程是否被中断,不会清除中断标记。
3、java.lang.Thread#interrupted
判断目标线程是否被中断,会清除中断标记。
private static void test2() { Thread thread = new Thread(() -> { while (true) { Thread.yield(); // 响应中断 if (Thread.currentThread().isInterrupted()) { System.out.println("Java技术栈线程被中断,程序退出。"); return; } } }); thread.start(); thread.interrupt(); }
继承 Thread 创建线程代码如下。run()方法是由jvm创建完操作系统级线程后回调的方法,不可以手动调用,手动调用相当于调用普通方法。
/** * @author: 程序员大彬 * @time: 2021-09-11 10:15 */ public class MyThread extends Thread { public MyThread() { } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread() + ":" + i); } } public static void main(String[] args) { MyThread mThread1 = new MyThread(); MyThread mThread2 = new MyThread(); MyThread myThread3 = new MyThread(); mThread1.start(); mThread2.start(); myThread3.start(); } }
Runnable 创建线程代码:
/** * @author: 程序员大彬 * @time: 2021-09-11 10:04 */public class RunnableTest { public static void main(String[] args){ Runnable1 r = new Runnable1(); Thread thread = new Thread(r); thread.start(); System.out.println("主线程:["+Thread.currentThread().getName()+"]"); }}class Runnable1 implements Runnable{ @Override public void run() { System.out.println("当前线程:"+Thread.currentThread().getName()); }}
实现Runnable接口比继承Thread类所具有的优势:
Callable 创建线程代码:
/** * @author: 程序员大彬 * @time: 2021-09-11 10:21 */ public class CallableTest { public static void main(String[] args) { Callable1 c = new Callable1(); //异步计算的结果 FutureTask<Integer> result = new FutureTask<>(c); new Thread(result).start(); try { //等待任务完成,返回结果 int sum = result.get(); System.out.println(sum); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class Callable1 implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 100; i++) { sum += i; } return sum; } }
使用 Executor 创建线程代码:
/** * @author: 程序员大彬 * @time: 2021-09-11 10:44 */ public class ExecutorsTest { public static void main(String[] args) { //获取ExecutorService实例,生产禁用,需要手动创建线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //提交任务 executorService.submit(new RunnableDemo()); } } class RunnableDemo implements Runnable { @Override public void run() { System.out.println("大彬"); } }
多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。
如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程就会互相等待而进入死锁状态。
下面通过例子说明线程死锁,代码来自并发编程之美。
public class DeadLockDemo { private static Object resource1 = new Object();//资源 1 private static Object resource2 = new Object();//资源 2 public static void main(String[] args) { new Thread(() -> { synchronized (resource1) { System.out.println(Thread.currentThread() + "get resource1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "waiting get resource2"); synchronized (resource2) { System.out.println(Thread.currentThread() + "get resource2"); } } }, "线程 1").start(); new Thread(() -> { synchronized (resource2) { System.out.println(Thread.currentThread() + "get resource2"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread() + "waiting get resource1"); synchronized (resource1) { System.out.println(Thread.currentThread() + "get resource1"); } } }, "线程 2").start(); } }
代码输出如下:
Thread[线程 1,5,main]get resource1 Thread[线程 2,5,main]get resource2 Thread[线程 1,5,main]waiting get resource2 Thread[线程 2,5,main]waiting get resource1
线程 A 通过 synchronized (resource1) 获得 resource1 的监视器锁,然后通过 Thread.sleep(1000);让线程 A 休眠 1s 为的是让线程 B 得到执行然后获取到 resource2 的监视器锁。线程 A 和线程 B 休眠结束了都开始企图请求获取对方的资源,然后这两个线程就会陷入互相等待的状态,这也就产生了死锁。
死锁产生的四个必要条件:
互斥:一个资源每次只能被一个进程使用(资源独立)
请求与保持:一个进程因请求资源而阻塞时,对已获得的资源保持不放(不释放锁)
不剥夺:进程已获得的资源,在未使用之前,不能强行剥夺(抢夺资源)
循环等待:若干进程之间形成一种头尾相接的循环等待的资源关闭(死循环)
避免死锁的方法:
调用 start() 方法是用来启动线程的,轮到该线程执行时,会自动调用 run();直接调用 run() 方法,无法达到启动多线程的目的,相当于主线程线性执行 Thread 对象的 run() 方法。
一个线程对线的 start() 方法只能调用一次,多次调用会抛出 java.lang.IllegalThreadStateException 异常;run() 方法没有限制。
join
Thread.join(),在main中创建了thread线程,在main中调用了thread.join()/thread.join(long millis),main线程放弃cpu控制权,线程进入WAITING/TIMED_WAITING状态,等到thread线程执行完才继续执行main线程。
public final void join() throws InterruptedException { join(0); }
yield
Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的CPU时间片,但不释放锁资源,由运行状态变为就绪状态,让OS再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞。该方法与sleep()类似,只是不能由用户指定暂停多长时间。
public static native void yield(); //static方法
sleep
Thread.sleep(long millis),一定是当前线程调用此方法,当前线程进入TIMED_WAITING状态,让出cpu资源,但不释放对象锁,指定时间到后又恢复运行。作用:给其它线程执行机会的最佳方式。
public static native void sleep(long millis) throws InterruptedException;//static方法
volatile是轻量级的同步机制,volatile保证变量对所有线程的可见性,不保证原子性。
MESI(缓存一致性协议):当CPU写数据时,如果发现操作的变量是共享变量,即在其他CPU中也存在该变量的副本,会发出信号通知其他CPU将该变量的缓存行置为无效状态,因此当其他CPU需要读取这个变量时,就会从内存重新读取。
volatile关键字的两个作用:
指令重排序是JVM为了优化指令,提高程序运行效率,在不影响单线程程序执行结果的前提下,尽可能地提高并行度。Java编译器会在生成指令系列时在适当的位置会插入内存屏障
指令来禁止处理器重排序。插入一个内存屏障,相当于告诉CPU和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。对一个volatile字段进行写操作,Java内存模型将在写操作后插入一个写屏障指令,这个指令会把之前的写入值都刷新到内存。
AQS,AbstractQueuedSynchronizer,抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,许多并发工具的实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch。
AQS使用一个volatile的int类型的成员变量state来表示同步状态,通过CAS修改同步状态的值。当线程调用 lock 方法时 ,如果 state=0,说明没有任何线程占有共享资源的锁,可以获得锁并将 state加1。如果 state不为0,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态(独占或共享 )构造成为一个节点(Node)并将其加入同步队列并进行自旋,当同步状态释放时,会把首节点中的后继节点对应的线程唤醒,使其再次尝试获取同步状态。
原子性:确保线程互斥的访问同步代码;
可见性:保证共享变量的修改能够及时可见,其实是通过Java内存模型中的 “对一个变量unlock 操作之前,必须要同步到主内存中;如果对一个变量进行lock操作,则将会清空工作内存中此变量的值,在执行引擎使用此变量前,需要重新从主内存中load操作或assign操作初始化变量值” 来保证的;
有序性:有效解决重排序问题,即 “一个unlock操作先行发生(happen-before)于后面对同一个锁的lock操作”。
synchronized 同步代码块的实现是通过 monitorenter 和 monitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。当执行 monitorenter 指令时,线程试图获取锁也就是获取 monitor(monitor对象存在于每个Java对象的对象头中, synchronized 锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因) 的持有权。
其内部包含一个计数器,当计数器为0则可以成功获取,获取后将锁计数器设为1也就是加1。相应的在 执行 monitorexit 指令后,将锁计数器设为0
,表明锁被释放。如果获取对象锁失败,那当前线程就要阻塞等待,直到锁被另外一个线程释放为止
synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法,JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。
ReentrantLock 内部自定义了同步器 Sync,在加锁的时候通过 CAS 算法,将线程对象放到一个双向链表中,每次获取锁的时候,检查当前维护的那个线程 ID 和当前请求的线程 ID 是否 一致,如果一致,同步状态加1,表示锁被当前线程获取了多次。
相同点:
不同点:
suspend()不建议使用,suspend()方法在调用后,线程不会释放已经占有的资 源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
假设有T1、T2、T3三个线程,你怎样保证T2在T1执行完后执行,T3在T2执行完后执行?
可以使用join方法解决这个问题。比如在线程A中,调用线程B的join方法表示的意思就是:A等待B线程执行完毕后(释放CPU执行权),在继续执行。
代码如下:
public class ThreadTest { public static void main(String[] args) { Thread spring = new Thread(new SeasonThreadTask("春天")); Thread summer = new Thread(new SeasonThreadTask("夏天")); Thread autumn = new Thread(new SeasonThreadTask("秋天")); try { //春天线程先启动 spring.start(); //主线程等待线程spring执行完,再往下执行 spring.join(); //夏天线程再启动 summer.start(); //主线程等待线程summer执行完,再往下执行 summer.join(); //秋天线程最后启动 autumn.start(); //主线程等待线程autumn执行完,再往下执行 autumn.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } class SeasonThreadTask implements Runnable{ private String name; public SeasonThreadTask(String name){ this.name = name; } @Override public void run() { for (int i = 1; i <4; i++) { System.out.println(this.name + "来了: " + i + "次"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果:
春天来了: 1次 春天来了: 2次 春天来了: 3次 夏天来了: 1次 夏天来了: 2次 夏天来了: 3次 秋天来了: 1次 秋天来了: 2次 秋天来了: 3次
乐观锁避免了悲观锁独占对象的现象,提高了并发性能,但它也有缺点:
守护线程是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些 发生的事件。在 Java 中垃圾回收线程就是特殊的守护线程。
volatile是轻量级的同步机制,volatile保证变量对所有线程的可见性,不保证原子性。
保证线程对变量访问的可见性和排他性。
wait/notify为 Object 对象的方法,调用wait/notify需要先获得对象的锁。对象调用wait之后线程释放锁,将线程放到对象的等待队列,当通知线程调用此对象的notify()方法后,等待线程并不会立即从wait返回,需要等待通知线程释放锁(通知线程执行完同步代码块),等待队列里的线程获取锁,获取锁成功才能从wait()方法返回,即从wait方法返回前提是线程获得锁。
等待通知机制依托于同步机制,目的是确保等待线程从wait方法返回时能感知到通知线程对对象的变量值的修改。
线程本地变量。当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程。
每个线程都有一个ThreadLocalMap(ThreadLocal内部类),Map中元素的键为ThreadLocal,而值对应线程的变量副本。
调用threadLocal.set()-->调用getMap(Thread)-->返回当前线程的ThreadLocalMap<ThreadLocal, value>-->map.set(this, value),this是ThreadLocal
public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
调用get()-->调用getMap(Thread)-->返回当前线程的ThreadLocalMap<ThreadLocal, value>-->map.getEntry(this),返回value
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
threadLocals的类型ThreadLocalMap的键为ThreadLocal对象,因为每个线程中可有多个threadLocal变量,如longLocal和stringLocal。
public class ThreadLocalDemo { ThreadLocal<Long> longLocal = new ThreadLocal<>(); public void set() { longLocal.set(Thread.currentThread().getId()); } public Long get() { return longLocal.get(); } public static void main(String[] args) throws InterruptedException { ThreadLocalDemo threadLocalDemo = new ThreadLocalDemo(); threadLocalDemo.set(); System.out.println(threadLocalDemo.get()); Thread thread = new Thread(() -> { threadLocalDemo.set(); System.out.println(threadLocalDemo.get()); } ); thread.start(); thread.join(); System.out.println(threadLocalDemo.get()); } }
ThreadLocal 并不是用来解决共享资源的多线程访问的问题,因为每个线程中的资源只是副本,并不共享。因此ThreadLocal适合作为线程上下文变量,简化线程内传参。
每个Thread都有⼀个ThreadLocalMap的内部属性,map的key是ThreaLocal,定义为弱引用,value是强引用类型。GC的时候会⾃动回收key,而value的回收取决于Thread对象的生命周期。一般会通过线程池的方式复用Thread对象节省资源,这也就导致了Thread对象的生命周期比较长,这样便一直存在一条强引用链的关系:Thread --> ThreadLocalMap-->Entry-->Value,随着任务的执行,value就有可能越来越多且无法释放,最终导致内存泄漏。
解决⽅法:每次使⽤完ThreadLocal就调⽤它的remove()⽅法,手动将对应的键值对删除,从⽽避免内存泄漏。
currentTime.set(System.currentTimeMillis()); result = joinPoint.proceed(); Log log = new Log("INFO",System.currentTimeMillis() - currentTime.get()); currentTime.remove();
ThreadLocal 适用场景:每个线程需要有自己单独的实例,且需要在多个方法中共享实例,即同时满足实例在线程间的隔离与方法间的共享,这种情况适合使用ThreadLocal。比如Java web应用中,每个线程有自己单独的 Session 实例,就可以使用ThreadLocal来实现。
按照线程访问顺序获取对象锁。synchronized 是非公平锁, Lock 默认是非公平锁,可以设置为公平锁,公平锁会影响性能。
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
共享式与独占式的最主要区别在于:同一时刻独占式只能有一个线程获取同步状态,而共享式在同一时刻可以有多个线程获取同步状态。例如读操作可以有多个线程同时进行,而写操作同一时刻只能有一个线程进行写操作,其他操作都会被阻塞。
悲观锁,每次访问资源都会加锁,执行完同步代码释放锁,synchronized 和 ReentrantLock 属于悲观锁。
乐观锁,不会锁定资源,所有的线程都能访问并修改同一个资源,如果没有冲突就修改成功并退出,否则就会继续循环尝试。乐观锁最常见的实现就是CAS。
乐观锁一般来说有以下2种方式:
适用场景:
CAS全称 Compare And Swap,比较与交换,是乐观锁的主要实现方式。CAS 在不使用锁的情况下实现多线程之间的变量同步。ReentrantLock 内部的 AQS 和原子类内部都使用了 CAS。
CAS算法涉及到三个操作数:
只有当 V 的值等于 A 时,才会使用原子方式用新值B来更新V的值,否则会继续重试直到成功更新值。
以 AtomicInteger 为例,AtomicInteger 的 getAndIncrement()方法底层就是CAS实现,关键代码是 compareAndSwapInt(obj, offset, expect, update)
,其含义就是,如果obj
内的value
和expect
相等,就证明没有其他线程改变过这个变量,那么就更新它为update
,如果不相等,那就会继续重试直到成功更新值。
CAS 三大问题:
ABA问题。CAS需要在操作值的时候检查内存值是否发生变化,没有发生变化才会更新内存值。但是如果内存值原来是A,后来变成了B,然后又变成了A,那么CAS进行检查时会发现值没有发生变化,但是实际上是有变化的。ABA问题的解决思路就是在变量前面添加版本号,每次变量更新的时候都把版本号加一,这样变化过程就从A-B-A
变成了1A-2B-3A
。
JDK从1.5开始提供了AtomicStampedReference类来解决ABA问题,原子更新带有版本号的引用类型。
循环时间长开销大。CAS操作如果长时间不成功,会导致其一直自旋,给CPU带来非常大的开销。
只能保证一个共享变量的原子操作。对一个共享变量执行操作时,CAS能够保证原子操作,但是对多个共享变量操作时,CAS是无法保证操作的原子性的。
Java从1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。
在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段。
CountDownLatch用于某个线程等待其他线程执行完任务再执行,与thread.join()功能类似。常见的应用场景是开启多个线程同时执行某个任务,等到所有任务执行完再执行特定操作,如汇总统计结果。
public class CountDownLatchDemo { static final int N = 4; static CountDownLatch latch = new CountDownLatch(N); public static void main(String[] args) throws InterruptedException { for(int i = 0; i < N; i++) { new Thread(new Thread1()).start(); } latch.await(1000, TimeUnit.MILLISECONDS); //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行;等待timeout时间后count值还没变为0的话就会继续执行 System.out.println("task finished"); } static class Thread1 implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "starts working"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } } }
运行结果:
Thread-0starts working Thread-1starts working Thread-2starts working Thread-3starts working task finished
CyclicBarrier(同步屏障),用于一组线程互相等待到某个状态,然后这组线程再同时执行。
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
参数parties指让多少个线程或者任务等待至某个状态;参数barrierAction为当这些线程都达到某个状态时会执行的内容。
public class CyclicBarrierTest { // 请求的数量 private static final int threadCount = 10; // 需要同步的线程数量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 创建线程池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { /**等待60秒,保证子线程完全执行结束*/ cyclicBarrier.await(60, TimeUnit.SECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); } }
运行结果如下,可以看出CyclicBarrier是可以重用的:
threadnum:0is ready threadnum:1is ready threadnum:2is ready threadnum:3is ready threadnum:4is ready threadnum:4is finish threadnum:3is finish threadnum:2is finish threadnum:1is finish threadnum:0is finish threadnum:5is ready threadnum:6is ready ...
当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。
CyclicBarrier 和 CountDownLatch 都能够实现线程之间的等待。
CountDownLatch用于某个线程等待其他线程执行完任务再执行。CyclicBarrier用于一组线程互相等待到某个状态,然后这组线程再同时执行。
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可用于处理更为复杂的业务场景。
Semaphore类似于锁,它用于控制同时访问特定资源的线程数量,控制并发线程数。
public class SemaphoreDemo { public static void main(String[] args) { final int N = 7; Semaphore s = new Semaphore(3); for(int i = 0; i < N; i++) { new Worker(s, i).start(); } } static class Worker extends Thread { private Semaphore s; private int num; public Worker(Semaphore s, int num) { this.s = s; this.num = num; } @Override public void run() { try { s.acquire(); System.out.println("worker" + num + " using the machine"); Thread.sleep(1000); System.out.println("worker" + num + " finished the task"); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
运行结果如下,可以看出并非按照线程访问顺序获取资源的锁,即
worker0 using the machine worker1 using the machine worker2 using the machine worker2 finished the task worker0 finished the task worker3 using the machine worker4 using the machine worker1 finished the task worker6 using the machine worker4 finished the task worker3 finished the task worker6 finished the task worker5 using the machine worker5 finished the task
使用原子的方式更新基本类型
AtomicInteger 类常用的方法:
public final int get() //获取当前的值 public final int getAndSet(int newValue)//获取当前的值,并设置新的值 public final int getAndIncrement()//获取当前的值,并自增 public final int getAndDecrement() //获取当前的值,并自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 boolean compareAndSet(int expect, int update) //如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update) public final void lazySet(int newValue)//最终设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
AtomicInteger 类主要利用 CAS (compare and swap) 保证原子操作,从而避免加锁的高开销。
使用原子的方式更新数组里的某个元素
AtomicIntegerArray 类常用方法:
public final int get(int i) //获取 index=i 位置元素的值 public final int getAndSet(int i, int newValue)//返回 index=i 位置的当前的值,并将其设置为新值:newValue public final int getAndIncrement(int i)//获取 index=i 位置元素的值,并让该位置的元素自增 public final int getAndDecrement(int i) //获取 index=i 位置元素的值,并让该位置的元素自减 public final int getAndAdd(int i, int delta) //获取 index=i 位置元素的值,并加上预期的值 boolean compareAndSet(int i, int expect, int update) //如果输入的数值等于预期值,则以原子方式将 index=i 位置的元素值设置为输入值(update) public final void lazySet(int i, int newValue)//最终 将index=i 位置的元素设置为newValue,使用 lazySet 设置之后可能导致其他线程在之后的一小段时间内还是可以读到旧的值。