import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.function.Supplier; public class Test01 { public static void main(String[] args) { for(int i = 0; i < 10; i++) { demo(() -> new AtomicLong(0), AtomicLong::incrementAndGet); } for (int i = 0; i < 10; i++) { demo(LongAdder::new, LongAdder::increment); } } private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); List<Thread> ts = new ArrayList<>(); // 4个线程,没个线程累加500000次 for (int i = 0; i < 4; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { action.accept(adder); } })); } long start = System.nanoTime(); ts.forEach(Thread::start); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost: "+ (end - start)/1000_000); } } // 多次执行测试结果 2000000 cost: 53 2000000 cost: 50 2000000 cost: 45 2000000 cost: 46 2000000 cost: 46 2000000 cost: 46 2000000 cost: 46 2000000 cost: 46 2000000 cost: 46 2000000 cost: 46 2000000 cost: 12 2000000 cost: 9 2000000 cost: 8 2000000 cost: 6 2000000 cost: 6 2000000 cost: 6 2000000 cost: 5 2000000 cost: 6 2000000 cost: 5 2000000 cost: 5
可以发现原子累加器性能明显提升,为什么呢?
在发生竞争时,原子累加器设置了多个累加单元,Thread-0累加Cell[0],Thread-1累加Cell[1],…,最后将结果汇总。这样它们在累加的时操作不同的累加变量,因此减少了CAS失败重试,从而提高了性能。
要分析原理呢,我们先来看下源码。
LongAddr类几个关键的字段
/** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ transient volatile int cellsBusy;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Slf4j(topic = "c.LockCas") public class LockCas { private AtomicInteger state = new AtomicInteger(0); public void lock() { while (true) { if (state.compareAndSet(0, 1)) { break; } } } public void unlock() { log.debug("unlock..."); state.set(0); } public static void main(String[] args) { LockCas lock = new LockCas(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t1").start(); new Thread(() -> { log.debug("begin..."); lock.lock(); try { log.debug("lock..."); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t2").start(); } } // 测试结果 2021-09-29 10:28:45.960 DEBUG [t2] c.LockCas - begin... 2021-09-29 10:28:45.960 DEBUG [t1] c.LockCas - begin... 2021-09-29 10:28:45.962 DEBUG [t1] c.LockCas - lock... 2021-09-29 10:28:46.962 DEBUG [t1] c.LockCas - unlock... 2021-09-29 10:28:46.962 DEBUG [t2] c.LockCas - lock... 2021-09-29 10:28:47.962 DEBUG [t2] c.LockCas - unlock...
一旦某个线程加锁,其他线程就会一直循环等待,直到持有锁的线程释放锁。
Cell类
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
CUP与主存速度比较
从CUP到 | 需要的时钟周期 |
---|---|
寄存器 | 1 cycle |
L1 | 3~4 cycle |
L2 | 10~20 cycle |
L3 | 40~45 cycle |
内存 | 120~240 cycle |
因为CUP与内存速度的差异很大,需要靠预读取数据至缓存来提示效率
而缓存以缓存行为单位,没个缓存行对应一块内存,一般是64 byte
缓存的加入会造成数据副本的产生,即内存的同一份数据会缓存在不同核心(多核CUP)的缓存行中
CPU要保证数据的一致性,如果某个CPU核心更改了数据,其他CPU核心对应的缓存行会失效
因为cells是数组类型,在内存中是练习存储的,一个Cell为24字节(16字节对象头和8字节的value),因此缓存行可以存下 2个Cell对象。这样问题来了
无论谁修改成功,都会导致对方Core缓存行失效
@sun.misc.Contended注解用来解决这个问题,它的原理就是在使用了此注解的对象或者字段前后各家128字节大小的padding,从而让CPU将对象预读至缓存时占用不同的缓存行,这样,不会对方缓存行的失效
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
x此处为1
longAccumulate方法
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
cells数组创建且不为空
cells未创建或者为空
符合上述条件
sum方法获取最终结果
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
QQ:806797785
仓库地址:https://gitee.com/gaogzhen/concurrent