Java教程

Java中的并发计数器LongAdder

本文主要是介绍Java中的并发计数器LongAdder,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

并发计数器LongAdder

  • 基本概念
  • Striped64
    • Striped64重要成员变量
      • cells
      • cellsBusy
      • base
    • 内部类Cell
      • CPU缓存架构
      • 缓存行cache line
      • 伪共享
    • Striped64重要方法
      • longAccumulate
  • LongAdder
    • LongAdder重要方法
      • add
  • 总结
    • hash的值生成
    • threadLocalRandomProbe初始化
    • threadLocalRandomProbe重新生成

基本概念

  • 位于java.util.concurrent.atomic包.是高并发下计数功能最好的数据结构,低并发下效率也非常高
  • LongAdderDoubleAdder数据结构类似

在这里插入图片描述

  • LongAdder继承Striped64类,实现累加功能

Striped64

  • Striped64是一个高并发累加的工具类
  • Striped64设计的核心思路: 通过内部的分散计算来实现避免竞争
    • Striped64中包含一个base和一个cells数组
    • 在没有竞争的情况下,累加的数通过CAS累加到base
    • 在竞争的情况下,累加的数累加到数组cells中的某个cell元素上
    • 这样整个Striped64的值为: s u m = b a s e + ∑ 0 n c e l l sum=base+\sum_0^ncell sum=base+∑0n​cell

在这里插入图片描述

Striped64重要成员变量

  • Striped64中有三个重要的成员变量
	/**
     * 存放元素cell的Hash表,大小为2的整数次幂
     */
    transient volatile Cell[] cells;

    /**
     * 基础值:
     * - 在没有竞争的情况下,累加的数通过CAS累加到base上
     * - 在数组cells初始化过程时,数组中的元素cell不可用。此时累加的数会尝试通过CAS累加到base上
     */
    transient volatile long base;

    /**
     * 自旋锁,通过CAS加锁
     * 用于创建和扩容数组cells的Hash表
     */
    transient volatile int cellsBusy;

cells

  • 数组cellsLongAdder实现高性能的关键
  • AtomicLong中只有一个计数的值value, 所有线程的累加操作都要通过CAS锁竞争变量value来实现累加,处于高并发情况下,线程的竞争非常高
  • LongAdder中有两个计数的值:
    • base:
      • 在没有竞争或者初始化的情况下使用
      • 作用和AtomicLong中的value变量类似
    • cells:
      • 在没有竞争的情况下,不会使用cells数组,值为null
      • 在竞争的情况下,就使用到cells数组:
        • 初始化时长度为2
        • 每次扩容长度都增加1倍,长度变为原来的2
        • 直到数组cells的长度大于等于当前服务器CPU的数量就不再扩容
        • 每个线程通过对cells[threadLocalRandomProbe%cells.length] 位置上的cell元素中的value值做累加,这样相当于将线程绑定到数组cells中的某个cell元素对象上

cellsBusy

  • cellsBusy是自旋锁,通过CAS加锁.用于创建和扩展数组cellsHash
  • cellsBusy的值包括无锁0和加锁1, 作用是修改数组cells时进行加锁,防止多线程同时修改cells数组
  • cellsBusy加锁的情况有三种:
    • 数组cells初始化
    • 数组cells扩容
    • 数组cells中某个位置的为null, 在这个位置创建新的Cell对象

base

  • base主要有两个作用:
    • 在没有竞争的情况下,将累加的数通过CAS累加到base
    • 在数组cells初始化过程时,数组中的元素cell不可用.此时累加的数会尝试通过CAS累加到base

内部类Cell

  • 内部类Cell的类的注解是 @jdk.internal.vm.annotation.Contended
  • 内部类Cell是一个静态final类型的内部类,内部有一个value值,使用CAS来更新value的值

CPU缓存架构

在这里插入图片描述

  • 缓存越靠近CPU, 缓存运行速度就越快,容量也越小:
    • 一级缓存L1紧靠着CPU内核,只能被一个单独的CPU内核使用. 尽管容量很小,但是运行速度很快
    • 二级缓存L2容量大一些运行速度也慢一些,只能被一个单独的CPU内核使用
    • 三级缓存L3容量更大一些运行速度也更慢一些,可以被单个插槽上的所有CPU内核共享
    • 主内存中保存着应用运行的所有数据,也是容量最大,运行速度最慢的,可以被全部插槽上的所有CPU内核共享
  • CPU执行运算时,会首先从一级缓存L1中寻找数据,如果没有再去二级缓存L2中寻找数据,如果没有再去三级缓存L3中寻找数据.如果这些缓存中都没有数据,就去主内存中寻找数据
  • 因为CPU执行运算,寻找数据走得越远,执行耗费的时间就越长.所以对于一些需要频繁运算的数据,需要确保在一级缓存L1

缓存行cache line

  • CPU的缓存系统是以缓存行cache line为单位存储的:
    • 缓存行cache line2的整数幂次方个连续字节.通常情况下为32~256个字节
    • 常见的缓存行cache line的大小为64个字节
    • 缓存行cache line是缓存cache和内存memory之间数据传输的最小单元
  • Java中一个long类型的长度是8个字节,通常一个缓存行中可以存放8long类型的变量

在这里插入图片描述

  • 程序运行时,缓存每次更新都会从主内存中加载连续的64个字节.如果访问的是一个long类型的数组时,当数组中一个值被加载到缓存中时,同时另外7个元素的值也会被加载到缓存中
    • 如果数据结构中各项的值在内存中不是连续相邻的,比如链表.就无法获得这种同时加载数据到缓存中的这种好处
    • 这种同时加载存在缺点:
      • 如果一个long类型的变量A, 不是数组的一部分,而是一个单独的变量.并且有另外一个long类型的变量B紧挨着这个变量,那么当这个变量A被加载到缓存中时,另一个变量B也会被加载到缓存中
      • 此时如果一个CPU线程正在对这个变量A进行修改,另外一个CPU线程正在对另一个变量B进行读取
      • 对这个变量A修改的CPU线程修改变量A时,会将变量A和变量B同时加载到缓存行中,修改变量A后,因为其余缓存行中的变量A已经不是最新值,所以其余所有包含变量A的缓存行都将失效
      • 对另一个变量B读取的CPU线程读取变量B时,发现这个缓存行已经失效,就需要从主内存中重新加载数据到缓存行中
  • CPU中的缓存都是以缓存行作为一个最小单位进行数据处理的,只要缓存行中有一个值失效,那么整个缓存行都会失效

在这里插入图片描述

  • 由此会导致伪共享问题: 变量B和变量B两个完全不相干的变量,每次变量B都要因为变量A的更新重主内存中重新读取变量B, 这样变量B就会因为变量A造成被缓存未命中而拖慢运行速度

伪共享

  • 伪共享: 当多线程中修改相互独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能
    • CPU中使用了L1 cacheL2 cache:
      • L1 cache中数据处理方式是直写模式write through
        • write-through: 直写模式. 数据更新时,同时写入缓存Cache和后端存储
          • 优点: 操作简单
          • 缺点: 数据修改后同时需要写入缓存和存储,写入的速度较慢
      • L2 cache中数据处理方式是回写模式write back
        • write-back: 回写模式. 数据更新时只写入缓存,只有在数据被替换出缓存时,修改的数据才会被写到后端存储中
          • 优点: 因为不需要写存储,所以数据的写入速度很快
          • 缺点: 如果更新后的数据没有被及时写入到存储中,此时系统出现异常,会导致修改后的数据无法找回
      • L1 cache中的数据会及时写入缓存和存储中,而L2 cache中的数据只会写入缓存,而不会立即写回到内存存储中,这样就导致cachememory中的数据不一致的情况
      • 对于多处理器multiprocessors的情况,由于cacheCPU内核私有的,不同的CPU内核的cache内容也会存在不一致的问题
      • 因此,在很多多处理器multiprocessor的计算机架构中,比如ccnumasmp中,都实现了cache coherence机制,即不同的CPU内核的cache一致性机制
      • cache coherence: 缓存一致性. 通过cache-snooping协议,每个CPU内核通过对总线bus的窥探snoop实现对CPU内核读写缓存cache的监控
        • 某个CPU内核写缓存cache时:
          • 其余的CPU会检查自身的缓存cahce中的缓存行cache line
          • 如果状态为dirty(CPU更新数据,写入缓存没有写入存储), 就将数据回写到存储中,并且将写缓存cacheCPU的相关cache line刷新
          • 如果状态不是dirty(CPU更新数据,写入缓存没有写入存储),就将cache line使用invalidate使得缓存行失效
        • 某个CPU内核读缓存cache时:
          • 其余的CPU会将自身的缓存cache中的缓存行cache line中标记为dirty(CPU更新数据,写入缓存没有写入存储)的部分的数据回写到存储中,并且将读缓存cacheCPU的相关cache line刷新
      • 提高CPU的缓存命中率cache hit rate, 减少cachememory之间的数据传输,将会提高系统性能
    • 在程序和二进制对象的内存分配中保持缓存行对齐cache line aligned就十分重要. 否则会出现多个CPU内核中并行运行的进程或者线程同时读写同一个缓存行cache line的情况.这时候就会导致CPU内核的cachememory之间反复出现write backrefresh的情况,这种情况称作cache thrashing
    • 避免cache thrashing有以下两种途径:
      • 对于heap的分配,在malloc调用中实现了强制的对齐alignment
      • 对于stack的分配,编译器提供了stack aligned的选项
        • 注意: 对于在编译器中指定stack aligned, 程序的会变得臃肿,会占用更多的内存,一定要做好取舍
  • 伪共享的原理: 一个缓存行通常使用64个字节,缓存每次更新时都会从主内存中加载连续的64个字节的数据,如果访问的是一个8个字节的long类型的数组时,当数组中一个值被加载到缓存中,同时另外7个相邻的long类型的数组也会被加载到缓存中,进而引发伪共享的问题
  • 避免伪共享的几种方式:
    • 在两个原本相邻的不相干的long类型的变量之间添加7个额外的long类型的变量
    • 自定义long类型而不是使用Java自带的long类型
      • 使得自定义的long类型数据可以占满64个字节
      • 通常也就是单独的long类型变量中添加7个额外的long类型的变量
    • 使用@Contended注解. 推荐使用这种方式
      • 如果使用这个注解无效的话,需要配置JVM启动参数后重新运行
      -XX:-RestrictContended
      
  • Java中避免伪共享的示例:
    • ConcurrentHashMap
      • ConcurrentHashMap中的size() 方法是使用分段的思想来构造的
      • 每个段使用的类是CounterCell
      • CounterCell上就使用了 @Contended的注解
    • LongAdder

Striped64重要方法

longAccumulate

/**
 * 处理涉及初始化,扩容,创建新的Cell对象和竞争的更新情况
 *  
 * @param x 需要累加的值
 * @param fn 更新函数,可以为null。用于累加操作
 * @param wasUncontended 如果CAS操作在调用前执行失败则返回false。表明调用方法之前的add()方法是否没有发生争用
 */
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended);
  • longAccumulate()方法源码解析==>[如果点进来的话, 顺手给个follow哈! 大佬的鼓励支持是小可持续更新的动力!!!]

LongAdder

LongAdder重要方法

add

/**
 * 对指定的值累加
 *  
 * @param x 需要累加的值
 */
public void add(long x);
  • add()方法源码解析==>[如果点进来的话, 顺手给个follow哈! 大佬的鼓励支持是小可持续更新的动力!!!]

总结

hash的值生成

  • hash的值: LongAdder中用来定位当前线程应该将值累加到数组cells的哪一个索引位置上
  • Thread类中的成员变量threadLocalRandomProbe:
	@jdk.internal.vm.annotation.Contended("tlr")
    int threadLocalRandomProbe;
  • threadLocalRandomProbe变量的值就是LongAdder用来确定hash的值来定位数组cells中的位置的 .threadLocalRandomProbe的初始值为0
  • LongAdder的父类Striped64中,通过getProbe() 方法获取当前线程的threadLocalRandomProbe的值:
	/**
     * 返回当前线程的probe值
     * 由于包装的限制,从ThreadLocalRandom中复制
     */
	static final int getProbe() {
        return (int) THREAD_PROBE.get(Thread.currentThread());
    }
  • probe值是threadLocalRandomProbe变量在Thread类中的偏移量

threadLocalRandomProbe初始化

		if ((h = getProbe()) == 0) {
            // ThreadLocalRandom类强制初始化
            ThreadLocalRandom.current(); // force initialization
            // 设置h的值为0x9e3779b9
            h = getProbe();
            // 将没有争用的标识设置为true
            wasUncontended = true;
        }
  • 当前线程使用LongAdder中的add() 方法执行累加操作,在没有进入longAccumulate() 方法之前 ,threadLocalRandomProbe的值一直为0
  • 当执行累加操作发生争用后进入longAccumulate() 方法,第一次进入longAccumulate() 方法就判断threadLocalRandomProbe的值是否为0, 如果值为0, 则将hash的值设置为0x9e3779b9
  • ThreadLocalRandom.current() 方法:
	/**
	 * 返回当前线程的ThreadLocalRandom
	 */
	public static ThreadLocalRandom current() {
        if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0)
            localInit();
        return instance;
    }
  • current() 方法中,判断probe的值是否为0, 如果probe的值为0, 就执行localInit() 方法,将当前的probe设置为非0的值:
	/**
	 * 为当前线程初始化线程字段
	 */
	static final void localInit() {
        int p = probeGenerator.addAndGet(PROBE_INCREMENT);
        // 跳过p为0的值
        int probe = (p == 0) ? 1 : p; // skip 0
        long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
        // 获取当前线程
        Thread t = Thread.currentThread();
        U.putLong(t, SEED, seed);
        // 将probe的值更新为probeGenerator的值
        U.putInt(t, PROBE, probe);
    }
  • probeGeneratorstatic类型的AtomicInteger类的对象,每执行一次localInit() 方法,都会将probeGenerator累加一次值0x9e3779b9
  • 0x9e3779b9是由232 除以黄金比例常数1.6180339887确定的
  • 如果probeGenerator值为0, 就将当前线程的threadLocalRandomProbe的值设置为1. 否则就将当前线程的threadLocalRandomProbe的值设置为probeGenerator的值

threadLocalRandomProbe重新生成

  • probe的值进行3次左右移位和异或操作:
	static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        THREAD_PROBE.set(Thread.currentThread(), probe);
        return probe;
    }
这篇关于Java中的并发计数器LongAdder的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!