目录
线程安全:
线程安全特性:
原子性:
原子类型划分
AtomicReference:
AtomicIntegerFieldUpdater:
AtomicBoolean:
ABA问题:
锁实现原子性:
原子性对比:
可见性:
synchronized关键字:
volatile关键字:
有序性:
当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程如何交替执行,并且在主调代码中,不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。
原子性 | 同一时刻只能有一个线程对它进行操作。 |
可见性 | 一个线程对主内存的修改,可以及时的被其他线程观察到。 |
有序性 | 一个线程观察其他线程的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序 |
Atomic类:
Atomic包采用CAS算法:
public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; }
var1为操作的对象,var2为该对象当前值,var4为该对象增加值,var5为底层当前值。
CAS(Compare And Swap)实现的核心原理 | 用当前对象的值(工作内存)与底层(主内存)的值进行对比,如果当前值等于底层值,则执行对应的操作,如果不相等,则继续循环取值,直到相等(死循环),不断进行循环尝试。 |
CAS的操作过程 | CAS比较的过程可以通俗的理解为CAS(V,O,N),包含三个值分别为:V内存地址实际存放的值;O预期的值(旧值),N更新的新值,当V和O相同时,也就是说明旧值和内存中实际的值相同,表明该值没有被其他线程修改过,即该旧值就是目前来说最新的值了,自然可以将新值N赋值给V,反之V和O不相同,表明该值已经被其他线程修改过,则该旧值O不是最新的版本,所以不能将该值N赋值给V,返回V即可,当多个线程使用CAS操作一个变量时,只有一一个线程会成功,并成功更新,其余会失败,失败的线程会重新尝试,当然也可以选择挂起线程。 |
AtomicLong与LongAdder:
AtomicLong的原理是依靠底层的cas来保障原子性的更新数据(在死循环内不断尝试修改目标值,直至修改成功),在要添加或者减少的时候,会使用自循(CLH)方式不断地cas到特定的值,从而达到更新数据的目的。然而在线程竞争激烈的情况下,自循往往浪费很多计算资源才能达成预期效果。
对于普通类型的变量long,double类型的变量,JVM允许将64位的读操作或写操作拆成两个32位的操作。
LongAdder将热点数据分离(将AtomicLong的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,最终的技术结果则为这个数组的求和累加,其中热点数据的value被分离成多个单元的cell,每个cell独自维护内部的值,当前对象的实际值由多个cell累计合成,这样热点就进行了有效的分离,提高了并行度,在AtomicLong的基础上,将单点的更新压力分散到各个节点上,在低并发的情况下,通过对base的直接更新,可以保障和AtomicLong的性能基本一致,而在高并发的情况下则通过分散提高了性能)。
LongAdder的缺点:如果在统计的时候有并发更新,则可能会导致统计的数据存在误差。
为了方面对这些类逐级掌握,我将这些原子类型分为以下几类:
普通原子类型:提供对boolean、int、long和对象的原子性操作 | AtomicBoolean AtomicInteger AtomicLong AtomicReference |
原子类型数组:提供对数组元素的原子性操作 | AtomicLongArray AtomicIntegerArray AtomicReferenceArray |
原子类型字段更新器:提供对指定对象的指定字段进行原子性操作 | AtomicLongFieldUpdater AtomicIntegerFieldUpdater AtomicReferenceFieldUpdater |
带版本号的原子引用类型:以版本戳的方式解决原子类型的ABA问题 | AtomicStampedReference AtomicMarkableReference |
原子累加器(JDK1.8):AtomicLong和AtomicDouble的升级类型,专门用于数据统计,性能更高 | DoubleAccumulator DoubleAdder LongAccumulator LongAdder |
AtomicInteger:
//ThreadSafe public class AtomicIntegerTest { public static int clientTotal = 5000;// 请求总数 public static int threadTotal = 200;// 同时并发执行的线程数 public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("count:" + count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
执行结果:
count:5000
AtomicLong:
//ThreadSafe public class AtomicLongTest { public static int clientTotal = 5000;// 请求总数 public static int threadTotal = 200;// 同时并发执行的线程数 public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("count:" + count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
执行结果:
count:5000
LongAdder:
//ThreadSafe public class LongAdderTest{ public static int clientTotal = 5000;// 请求总数 public static int threadTotal = 200;// 同时并发执行的线程数 public static LongAdder count = new LongAdder(); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("count:" + count); } private static void add() { count.increment(); } }
执行结果:
count:5000
//ThreadSafe public class AtomicReferenceTest { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); // 2 count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // no count.compareAndSet(2, 4); // 4 count.compareAndSet(3, 5); // no System.out.println("result: " + count.get()); } }
执行结果:
result: 4
//ThreadSafe @Data public class AtomicIntegerFieldUpdaterTest { private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class, "count"); @Getter public volatile int count = 100; public static void main(String[] args) { AtomicIntegerFieldUpdaterTest example = new AtomicIntegerFieldUpdaterTest(); if (updater.compareAndSet(example, 100, 120)) { System.out.println("update success 1 : " + example.getCount()); } if (updater.compareAndSet(example, 100, 120)) { System.out.println("update success 2 : " + example.getCount()); } else { System.out.println("update failed : " + example.getCount()); } } }
执行结果:
update success 1 : 120
update failed : 120
//ThreadSafe public class AtomicBooleanTest { private static AtomicBoolean isHappened = new AtomicBoolean(false); public static int clientTotal = 5000;// 请求总数 public static int threadTotal = 200;// 同时并发执行的线程数 public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); method(); semaphore.release(); } catch (Exception e) { e.printStackTrace(); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); System.out.println("isHappened: " + isHappened.get()); } private static void method() { if (isHappened.compareAndSet(false, true)) { System.out.println("executing..."); } } }
执行结果:
executing...
isHappened: true
CAS机制的原理由CPU支持的原子操作,其原子性是在硬件层面进行保证的。
而CAS机制可能会出现ABA问题,即T1读取内存变量为A,T2修改内存变量为B,T2修改内存变量为A,这时T1再CAS操作A时是可行的。但实际上在T1第二次操作A时,已经被其他线程修改过了。
解决方案:添加版本号。
synchronized | 依赖JVM。在类的继承过程中synchronized不具有传递性。synchronized不属于方法声明的一部分。 |
lock | 依赖特殊的CPU指令,代码实现,如ReenTrantLock; |
Atomic | 竞争激烈时能维持常态,比lock性能好,只能同步一个值。 |
synchronized | 不可中断锁,适合竞争不激烈,可读性好。 |
Lock | 可中断锁,多样化同步,竞争激烈时能维持常态。 |
导致共享变量在线程不可见的原因:
线程交叉执行。 |
重排序结合线程交叉执行。 |
共享变量更新后的值没有在工作内存与主存之间及时更新。 |
JMM关于synchronized的两条规定:
线程解锁前必须把共享变量的值刷新到主内存 |
线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值,注意(加锁与解锁是同一把锁); |
根据JMM,Java中有一块主内存,不同的线程有自己的工作内存,同一个变量值在主内存中有一份,如果线程用到了这个变量的话,自己的工作内存中有一份一模一样的拷贝。每次进入线程从主内存中拿到变量值,每次执行完线程将变量从工作内存同步回主内存中。
volatile解决的是变量在多个线程之间的可见性,但是无法保证原子性。
通过加入内存屏障和禁止重排序来实现。
对volatile进行写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存。 |
对volatile进行读操作时,会在写操作后加入一条load屏障指令,从主内存中读取共享变量。 |
可见性-volatile写示意图:
可见性-volatile读示意图:
注意:volatile不能保证原子性;
对变量的写操作不依赖于当前值。 |
对于该变量的值没有包含在具有其他变量的不变的式子在中。 |
可见性--volatile的使用:(作为状态标识量)
volatile boolean inited = false; //线程1 context = loadContext(); inited = true; //线程2 while(!inited){ sleep; } doSomethingWithConfig(context);
Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程的执行,但却会影响到多线程并发执行的正确性。 |
通过 volatile、synchronized、lock保证有序性,synchronized、lock保证同一时刻只有一个线程执行代码,从而保证有序性。 |
先天有序性-- happens-before原则:
程序次序规则 | 一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作。 |
锁定规则 | 一个unlock操作先行发生于后面对同一个锁的lock操作。 |
volatile变量规则 | 对一个变量的写操作先行发生于后面对这个变量的读操作。 |
传递规则 | 如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C。 |
线程启动规则 | Thread对象的start()方法先行发生于此线程的每一个动作。 |
线程中断规则 | 对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生。 |
线程中断规则 | 线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行。 |
对象终结规则 | 一个对象的初始化完成先行发生于他的finalize()方法开始。 |