Java教程

Java线程安全

本文主要是介绍Java线程安全,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

线程安全:

线程安全特性:

原子性:

原子类型划分

AtomicReference:

AtomicIntegerFieldUpdater:

AtomicBoolean:

ABA问题:

锁实现原子性:

原子性对比:

可见性:

synchronized关键字:

volatile关键字:

有序性:


线程安全:

        当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程如何交替执行,并且在主调代码中,不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

线程安全特性:

原子性

同一时刻只能有一个线程对它进行操作。

可见性

一个线程对主内存的修改,可以及时的被其他线程观察到。

有序性

一个线程观察其他线程的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

原子性:

Atomic类:

        Atomic包采用CAS算法:

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

        var1为操作的对象,var2为该对象当前值,var4为该对象增加值,var5为底层当前值。

CAS(Compare And Swap)实现的核心原理

用当前对象的值(工作内存)与底层(主内存)的值进行对比,如果当前值等于底层值,则执行对应的操作,如果不相等,则继续循环取值,直到相等(死循环),不断进行循环尝试。

CAS的操作过程

CAS比较的过程可以通俗的理解为CAS(V,O,N),包含三个值分别为:V内存地址实际存放的值;O预期的值(旧值),N更新的新值,当V和O相同时,也就是说明旧值和内存中实际的值相同,表明该值没有被其他线程修改过,即该旧值就是目前来说最新的值了,自然可以将新值N赋值给V,反之V和O不相同,表明该值已经被其他线程修改过,则该旧值O不是最新的版本,所以不能将该值N赋值给V,返回V即可,当多个线程使用CAS操作一个变量时,只有一一个线程会成功,并成功更新,其余会失败,失败的线程会重新尝试,当然也可以选择挂起线程。

        AtomicLong与LongAdder:

        AtomicLong的原理是依靠底层的cas来保障原子性的更新数据(在死循环内不断尝试修改目标值,直至修改成功),在要添加或者减少的时候,会使用自循(CLH)方式不断地cas到特定的值,从而达到更新数据的目的。然而在线程竞争激烈的情况下,自循往往浪费很多计算资源才能达成预期效果。

        对于普通类型的变量long,double类型的变量,JVM允许将64位的读操作或写操作拆成两个32位的操作。

        LongAdder将热点数据分离(将AtomicLong的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,最终的技术结果则为这个数组的求和累加,其中热点数据的value被分离成多个单元的cell,每个cell独自维护内部的值,当前对象的实际值由多个cell累计合成,这样热点就进行了有效的分离,提高了并行度,在AtomicLong的基础上,将单点的更新压力分散到各个节点上,在低并发的情况下,通过对base的直接更新,可以保障和AtomicLong的性能基本一致,而在高并发的情况下则通过分散提高了性能)。

        LongAdder的缺点:如果在统计的时候有并发更新,则可能会导致统计的数据存在误差。

原子类型划分

为了方面对这些类逐级掌握,我将这些原子类型分为以下几类:

普通原子类型:提供对boolean、int、long和对象的原子性操作

AtomicBoolean

AtomicInteger

AtomicLong

AtomicReference

原子类型数组:提供对数组元素的原子性操作

AtomicLongArray

AtomicIntegerArray

AtomicReferenceArray

原子类型字段更新器:提供对指定对象的指定字段进行原子性操作

AtomicLongFieldUpdater

AtomicIntegerFieldUpdater

AtomicReferenceFieldUpdater

带版本号的原子引用类型:以版本戳的方式解决原子类型的ABA问题

AtomicStampedReference

AtomicMarkableReference

原子累加器(JDK1.8):AtomicLong和AtomicDouble的升级类型,专门用于数据统计,性能更高

DoubleAccumulator

DoubleAdder

LongAccumulator

LongAdder

AtomicInteger:

//ThreadSafe
public class AtomicIntegerTest {
    public static int clientTotal = 5000;// 请求总数
    public static int threadTotal = 200;// 同时并发执行的线程数
    public static AtomicInteger count = new AtomicInteger(0);
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count:" + count.get());
    }
    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}

        执行结果:

count:5000

AtomicLong:

//ThreadSafe
public class AtomicLongTest {
    public static int clientTotal = 5000;// 请求总数
    public static int threadTotal = 200;// 同时并发执行的线程数
    public static AtomicLong count = new AtomicLong(0);
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count:" + count.get());
    }
    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}

        执行结果:

 count:5000

LongAdder:

//ThreadSafe
public class LongAdderTest{
    public static int clientTotal = 5000;// 请求总数
    public static int threadTotal = 200;// 同时并发执行的线程数
    public static LongAdder count = new LongAdder();
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("count:" + count);
    }
    private static void add() {
        count.increment();
    }
}

        执行结果:

count:5000

AtomicReference:

//ThreadSafe
public class AtomicReferenceTest {
    private static AtomicReference<Integer> count = new AtomicReference<>(0);
    public static void main(String[] args) {
        count.compareAndSet(0, 2); // 2
        count.compareAndSet(0, 1); // no
        count.compareAndSet(1, 3); // no
        count.compareAndSet(2, 4); // 4
        count.compareAndSet(3, 5); // no
        System.out.println("result: " + count.get());
    }
}

        执行结果:

result: 4

AtomicIntegerFieldUpdater:

//ThreadSafe
@Data
public class AtomicIntegerFieldUpdaterTest {
    private static AtomicIntegerFieldUpdater<AtomicIntegerFieldUpdaterTest> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFieldUpdaterTest.class, "count");
    @Getter
    public volatile int count = 100;
    public static void main(String[] args) {
        AtomicIntegerFieldUpdaterTest example = new AtomicIntegerFieldUpdaterTest();
        if (updater.compareAndSet(example, 100, 120)) {
            System.out.println("update success 1 : " +  example.getCount());
        }
        if (updater.compareAndSet(example, 100, 120)) {
            System.out.println("update success 2 : " + example.getCount());
        } else {
            System.out.println("update failed : " + example.getCount());
        }
    }
}

        执行结果:

update success 1 : 120
update failed : 120

AtomicBoolean:

//ThreadSafe
public class AtomicBooleanTest {
    private static AtomicBoolean isHappened = new AtomicBoolean(false);
    public static int clientTotal = 5000;// 请求总数
    public static int threadTotal = 200;// 同时并发执行的线程数
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    method();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println("isHappened: " + isHappened.get());
    }
    private static void method() {
        if (isHappened.compareAndSet(false, true)) {
            System.out.println("executing...");
        }
    }
}

        执行结果:

 executing...
isHappened: true

ABA问题:

        CAS机制的原理由CPU支持的原子操作,其原子性是在硬件层面进行保证的。

        而CAS机制可能会出现ABA问题,即T1读取内存变量为A,T2修改内存变量为B,T2修改内存变量为A,这时T1再CAS操作A时是可行的。但实际上在T1第二次操作A时,已经被其他线程修改过了。

        解决方案:添加版本号。

锁实现原子性:

synchronized

依赖JVM。在类的继承过程中synchronized不具有传递性。synchronized不属于方法声明的一部分。

lock

依赖特殊的CPU指令,代码实现,如ReenTrantLock;

原子性对比:

Atomic

竞争激烈时能维持常态,比lock性能好,只能同步一个值。

synchronized

不可中断锁,适合竞争不激烈,可读性好。

Lock

可中断锁,多样化同步,竞争激烈时能维持常态。

可见性:

        导致共享变量在线程不可见的原因:

线程交叉执行。

重排序结合线程交叉执行。

共享变量更新后的值没有在工作内存与主存之间及时更新。

synchronized关键字:

        JMM关于synchronized的两条规定:

线程解锁前必须把共享变量的值刷新到主内存

线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值,注意(加锁与解锁是同一把锁);

volatile关键字:

     根据JMM,Java中有一块主内存,不同的线程有自己的工作内存,同一个变量值在主内存中有一份,如果线程用到了这个变量的话,自己的工作内存中有一份一模一样的拷贝。每次进入线程从主内存中拿到变量值,每次执行完线程将变量从工作内存同步回主内存中。

     volatile解决的是变量在多个线程之间的可见性,但是无法保证原子性。

通过加入内存屏障和禁止重排序来实现。

对volatile进行写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存。

对volatile进行读操作时,会在写操作后加入一条load屏障指令,从主内存中读取共享变量。

可见性-volatile写示意图:

 可见性-volatile读示意图:

                 注意:volatile不能保证原子性;

对变量的写操作不依赖于当前值。

对于该变量的值没有包含在具有其他变量的不变的式子在中。

        可见性--volatile的使用:(作为状态标识量)

volatile boolean inited = false;
//线程1
context = loadContext();
inited = true;
//线程2
while(!inited){
    sleep;
}
doSomethingWithConfig(context);

有序性:

Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程的执行,但却会影响到多线程并发执行的正确性。

通过 volatile、synchronized、lock保证有序性,synchronized、lock保证同一时刻只有一个线程执行代码,从而保证有序性。

        先天有序性-- happens-before原则:

程序次序规则

一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作。

锁定规则

一个unlock操作先行发生于后面对同一个锁的lock操作。

volatile变量规则

对一个变量的写操作先行发生于后面对这个变量的读操作。

传递规则

如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C。

线程启动规则

Thread对象的start()方法先行发生于此线程的每一个动作。

线程中断规则

对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生。

线程中断规则

线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、Thread.isAlive()的返回值手段检测到线程已经终止执行。

对象终结规则

一个对象的初始化完成先行发生于他的finalize()方法开始。

这篇关于Java线程安全的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!