并发编程已完结,章节如下:
Java 并发编程上篇 -(Synchronized 原理、LockSupport 原理、ReentrantLock 原理)
Java 并发编程中篇 -(JMM、CAS 原理、Volatile 原理)
Java 并发编程下篇 -(线程池)
Java 并发编程下篇 -(JUC、AQS 源码、ReentrantLock 源码)
原子性 - 保证指令不会受到线程上下文切换的影响
可见性 - 保证指令不会受 cpu 缓存的影响
有序性 - 保证指令不会受 cpu指令并行优化的影响
首先看一段代码:
public static boolean run = true; public static void main(String[] args) { Thread t1 = new Thread(() -> { while(run) { } }, "t1"); t1.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("t1 Stop"); run = false; }
public static volatile boolean run = true; // 保证内存的可见性
因为 printIn() 方法使用了 synchronized 同步代码块,可以保证原子性与可见性,它是 PrintStream 类的方法。
public class Code_02_Test { public static void main(String[] args) throws InterruptedException { Monitor monitor = new Monitor(); monitor.start(); Thread.sleep(3500); monitor.stop(); } } class Monitor { Thread monitor; // 设置标记,用于判断是否被终止了 private volatile boolean stop = false; /** * 启动监控器线程 */ public void start() { // 设置线控器线程,用于监控线程状态 monitor = new Thread() { @Override public void run() { // 开始不停的监控 while (true) { if(stop) { System.out.println("处理后续任务"); break; } System.out.println("监控器运行中..."); try { // 线程休眠 Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("被打断了"); } } } }; monitor.start(); } /** * 用于停止监控器线程 */ public void stop() { // 修改标记 stop = true; // 打断线程 monitor.interrupt(); } }
public class Code_03_Test { public static void main(String[] args) throws InterruptedException { Monitor monitor = new Monitor(); monitor.start(); monitor.start(); Thread.sleep(3500); monitor.stop(); } } class Monitor { Thread monitor; // 设置标记,用于判断是否被终止了 private volatile boolean stop = false; // 设置标记,用于判断是否已经启动过了 private boolean starting = false; /** * 启动监控器线程 */ public void start() { // 上锁,避免多线程运行时出现线程安全问题 synchronized (this) { if (starting) { // 已被启动,直接返回 return; } // 启动监视器,改变标记 starting = true; } // 设置线控器线程,用于监控线程状态 monitor = new Thread() { @Override public void run() { // 开始不停的监控 while (true) { if(stop) { System.out.println("处理后续任务"); break; } System.out.println("监控器运行中..."); try { // 线程休眠 Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("被打断了"); } } } }; monitor.start(); } /** * 用于停止监控器线程 */ public void stop() { // 打断线程 monitor.interrupt(); stop = true; } }
// 可以重排的例子 int a = 10; int b = 20; System.out.println( a + b ); // 不能重排的例子 int a = 10; int b = a - 5;
int num = 0; // volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序 boolean ready = false; // 线程1 执行此方法 public void actor1(I_Result r) { if(ready) { r.r1 = num + num; } else { r.r1 = 1; } } // 线程2 执行此方法 public void actor2(I_Result r) { num = 2; ready = true; }
第一种:线程 2 先执行,然后线程 1 后执行,r1 的结果为 4
第二种:线程 1 先执行,然后线程 2 后执行,r1 的结果为 1
第三种:线程 2 先执行,但是发送了指令重排,num = 2 与 ready = true 这两行代码语序发生装换,
ready = true; // 前 num = 2; // 后
对 volatile 变量的写指令后会加入写屏障
对 volatile 变量的读指令前会加入读屏障
public void actor2(I_Result r) { num = 2; ready = true; // ready 是被 volatile 修饰的,赋值带写屏障 // 写屏障 }
public void actor1(I_Result r) { // 读屏障 // ready是被 volatile 修饰的,读取值带读屏障 if(ready) { r.r1 = num + num; } else { r.r1 = 1; } }
分析如图:
public void actor2(I_Result r) { num = 2; ready = true; // ready 是被 volatile 修饰的,赋值带写屏障 // 写屏障 }
public void actor1(I_Result r) { // 读屏障 // ready 是被 volatile 修饰的,读取值带读屏障 if(ready) { r.r1 = num + num; } else { r.r1 = 1; } }
// 最开始的单例模式是这样的 public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static Singleton getInstance() { // 首次访问会同步,而之后的使用不用进入synchronized synchronized(Singleton.class) { if (INSTANCE == null) { // t1 INSTANCE = new Singleton(); } } return INSTANCE; } }
// 但是上面的代码块的效率是有问题的,因为即使已经产生了单实例之后,之后调用了getInstance()方法之后还是会加锁,这会严重影响性能!因此就有了模式如下double-checked lockin: public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; public static Singleton getInstance() { if(INSTANCE == null) { // t2 // 首次访问会同步,而之后的使用没有 synchronized synchronized(Singleton.class) { if (INSTANCE == null) { // t1 INSTANCE = new Singleton(); } } } return INSTANCE; } } //但是上面的if(INSTANCE == null)判断代码没有在同步代码块synchronized中,不能享有synchronized保证的原子性,可见性。
懒惰实例化
首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外,但在多线程环境下,上面的代码是有问题的
0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton; 3: ifnonnull 37 // ldc是获得类对象 6: ldc #3 // class cn/itcast/n5/Singleton // 复制操作数栈栈顶的值放入栈顶, 将类对象的引用地址复制了一份 8: dup // 操作数栈栈顶的值弹出,即将对象的引用地址存到局部变量表中 // 将类对象的引用地址存储了一份,是为了将来解锁用 9: astore_0 10: monitorenter 11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton; 14: ifnonnull 27 // 新建一个实例 17: new #3 // class cn/itcast/n5/Singleton // 复制了一个实例的引用 20: dup // 通过这个复制的引用调用它的构造方法 21: invokespecial #4 // Method "<init>":()V // 最开始的这个引用用来进行赋值操作 24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton; 27: aload_0 28: monitorexit 29: goto 37 32: astore_1 33: aload_0 34: monitorexit 35: aload_1 36: athrow 37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton; 40: areturn
其中
也许 jvm 会优化为:先执行 24,再执行 21。如果两个线程 t1,t2 按如下时间序列执行:
public final class Singleton { private Singleton() { } private static volatile Singleton INSTANCE = null; public static Singleton getInstance() { // 实例没创建,才会进入内部的 synchronized代码块 if (INSTANCE == null) { synchronized (Singleton.class) { // t2 // 也许有其它线程已经创建实例,所以再判断一次 if (INSTANCE == null) { // t1 INSTANCE = new Singleton(); } } } return INSTANCE; } }
static int x; static Object m = new Object(); new Thread(()->{ synchronized(m) { x = 10; } },"t1").start(); new Thread(()->{ synchronized(m) { System.out.println(x); } },"t2").start();
volatile static int x; new Thread(()->{ x = 10; },"t1").start(); new Thread(()->{ System.out.println(x); },"t2").start();
static int x; x = 10; new Thread(()->{ System.out.println(x); },"t2").start();
static int x; Thread t1 = new Thread(()->{ x = 10; },"t1"); t1.start(); t1.join(); System.out.println(x);
static int x; public static void main(String[] args) { Thread t2 = new Thread(()->{ while(true) { if(Thread.currentThread().isInterrupted()) { System.out.println(x); break; } } },"t2"); t2.start(); new Thread(()->{ sleep(1); x = 10; t2.interrupt(); },"t1").start(); while(!t2.isInterrupted()) { Thread.yield(); } System.out.println(x); }
volatile static int x; static int y; new Thread(() -> { y = 10; x = 20; },"t1").start(); new Thread(() -> { // x=20 对 t2 可见, 同时 y=10 也对 t2 可见 System.out.println(x); },"t2").start();
public class TestVolatile { volatile boolean initialized = false; void init() { if (initialized) { return; } doInit(); initialized = true; } private void doInit() { } }
// 问题1:为什么加 final? 防止子类继承后更改,破坏单例 // 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例? 如果进行反序列化的时候会生成新的对象,这样跟单例模式生成的对象是不同的。要解决直接加上readResolve()方法就行了,如下所示 public final class Singleton implements Serializable { // 问题3:为什么设置为私有? 放弃其它类中使用new生成新的实例,是否能防止反射创建新的实例? 不能。 private Singleton() {} // 问题4:这样初始化是否能保证单例对象创建时的线程安全? 可以保证线程安全,因为设置成为静态变量,是jvm在类加载阶段就进行了初始化,jvm保证了此操作的线程安全性 private static final Singleton INSTANCE = new Singleton(); // 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由。 //1.提供更好的封装性;2.提供范型的支持 public static Singleton getInstance() { return INSTANCE; } public Object readResolve() { return INSTANCE; } }
// 问题1:枚举单例是如何限制实例个数的? 创建枚举类的时候就已经定义好了,每个枚举常量其实就是枚举类的一个静态成员变量 // 问题2:枚举单例在创建时是否有并发问题? 没有,这是静态成员变量,线程安全性在类加载阶段完成的 // 问题3:枚举单例能否被反射破坏单例? 不能 // 问题4:枚举单例能否被反序列化破坏单例? 枚举类默认实现了序列化接口,枚举类已经考虑到此问题,无需担心破坏单例 // 问题5:枚举单例属于懒汉式还是饿汉式? 饿汉式 // 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做? 加构造方法就行了 enum Singleton { INSTANCE; }
public final class Singleton { private Singleton() { } private static Singleton INSTANCE = null; // 分析这里的线程安全, 并说明有什么缺点:synchronized加载静态方法上,可以保证线程安全。缺点就是锁的范围过大,每次访问都会加锁,性能比较低。 public static synchronized Singleton getInstance() { if( INSTANCE != null ){ return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } }
public final class Singleton { private Singleton() { } // 问题1:解释为什么要加 volatile? 为了防止重排序问题 private static volatile Singleton INSTANCE = null; // 问题2:对比懒汉式实现, 说出这样做的意义? 提高了效率,不需要每次都等待锁,进入同步代码块之后,再判断是否已经实例化了 public static Singleton getInstance() { if (INSTANCE != null) { return INSTANCE; } synchronized (Singleton.class) { // 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗? 这是为了防止首次创建INSTANCE时的并发问题。 if (INSTANCE != null) { return INSTANCE; } INSTANCE = new Singleton(); return INSTANCE; } } }
public final class Singleton { private Singleton() { } // 问题1:属于懒汉式还是饿汉式:懒汉式,这是一个静态内部类。类加载本身就是懒惰的,在没有调用getInstance方法时是没有执行LazyHolder内部类的类加载操作的。 private static class LazyHolder { static final Singleton INSTANCE = new Singleton(); } // 问题2:在创建时是否有并发问题,这是线程安全的,类加载时,jvm保证类加载操作的线程安全 public static Singleton getInstance() { return LazyHolder.INSTANCE; } }
public class Code_04_UnsafeTest { public static void main(String[] args) { Account acount = new AccountUnsafe(10000); Account.demo(acount); } } class AccountUnsafe implements Account { private Integer balance; public AccountUnsafe(Integer balance) { this.balance = balance; } @Override public Integer getBalance() { return this.balance; } @Override public void withdraw(Integer amount) { synchronized (this) { // 加锁。 this.balance -= amount; } } } interface Account { // 获取金额的方法 Integer getBalance(); // 取款的方法 void withdraw(Integer amount); static void demo(Account account) { List<Thread> list = new ArrayList<>(); long start = System.nanoTime(); for(int i = 0; i < 1000; i++) { list.add(new Thread(() -> { account.withdraw(10); })); } list.forEach(Thread::start); list.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms"); } }
class AccountSafe implements Account{ AtomicInteger atomicInteger ; public AccountSafe(Integer balance){ this.atomicInteger = new AtomicInteger(balance); } @Override public Integer getBalance() { return atomicInteger.get(); } @Override public void withdraw(Integer amount) { // 核心代码 while (true){ int pre = getBalance(); int next = pre - amount; if (atomicInteger.compareAndSet(pre,next)){ break; } } } }
AtomicInteger:整型原子类
AtomicLong:长整型原子类
AtomicBoolean :布尔型原子类
public static void main(String[] args) { AtomicInteger i = new AtomicInteger(0); // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.getAndIncrement()); // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.incrementAndGet()); // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i System.out.println(i.decrementAndGet()); // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i-- System.out.println(i.getAndDecrement()); // 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5)); // 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5)); // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.getAndUpdate(p -> p - 2)); // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.updateAndGet(p -> p + 2)); // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x)); }
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记的引用类型。该类将boolean 标记与引用关联起。
class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } // 取款任务 @Override public void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } }
class DecimalAccountCas implements DecimalAccount { private AtomicReference<BigDecimal> balance; public DecimalAccountCas(BigDecimal balance) { this.balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return balance.get(); } @Override public void withdraw(BigDecimal amount) { while (true) { BigDecimal preVal = balance.get(); BigDecimal nextVal = preVal.subtract(amount); if(balance.compareAndSet(preVal, nextVal)) { break; } } } }
public static AtomicReference<String> ref = new AtomicReference<>("A"); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); String preVal = ref.get(); other(); TimeUnit.SECONDS.sleep(1); log.debug("change A->C {}", ref.compareAndSet(preVal, "C")); } private static void other() throws InterruptedException { new Thread(() -> { log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); }
// 两个参数,第一个:变量的值 第二个:版本号初始值 public static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); String preVal = ref.getReference(); int stamp = ref.getStamp(); log.info("main 拿到的版本号 {}",stamp); other(); TimeUnit.SECONDS.sleep(1); log.info("修改后的版本号 {}",ref.getStamp()); log.info("change A->C:{}", ref.compareAndSet(preVal, "C", stamp, stamp + 1)); } private static void other() throws InterruptedException { new Thread(() -> { int stamp = ref.getStamp(); log.info("{}",stamp); log.info("change A->B:{}", ref.compareAndSet(ref.getReference(), "B", stamp, stamp + 1)); }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { int stamp = ref.getStamp(); log.info("{}",stamp); log.debug("change B->A:{}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp + 1)); }).start(); }
AtomicIntegerArray:整形数组原子类
AtomicLongArray:长整形数组原子类
AtomicReferenceArray :引用类型数组原子类
public class Code_10_AtomicArrayTest { public static void main(String[] args) throws InterruptedException { /** 参数1,提供数组、可以是线程不安全数组或线程安全数组 参数2,获取数组长度的方法 参数3,自增方法,回传 array, index 参数4,打印数组的方法 */ // supplier 提供者 无中生有 ()->结果 // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果 // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> demo( () -> new int[10], (array) -> array.length, (array, index) -> array[index]++, (array) -> System.out.println(Arrays.toString(array)) ); TimeUnit.SECONDS.sleep(1); demo( () -> new AtomicIntegerArray(10), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), (array) -> System.out.println(array) ); } private static <T> void demo( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer) { ArrayList<Thread> ts = new ArrayList<>(); // 创建集合 T array = arraySupplier.get(); // 获取数组 int length = lengthFun.apply(array); // 获取数组的长度 for(int i = 0; i < length; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 10000; j++) { putConsumer.accept(array, j % length); } })); } ts.forEach(Thread::start); ts.forEach((thread) -> { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); } }
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
Exception in thread “main” java.lang.IllegalArgumentException: Must be volatile type
public class Code_11_AtomicReferenceFieldUpdaterTest { public static AtomicReferenceFieldUpdater ref = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name"); public static void main(String[] args) throws InterruptedException { Student student = new Student(); new Thread(() -> { System.out.println(ref.compareAndSet(student, null, "list")); }).start(); System.out.println(ref.compareAndSet(student, null, "张三")); System.out.println(student); } } class Student { public volatile String name; @Override public String toString() { return "Student{" + "name='" + name + '\'' + '}'; } }
public static void main(String[] args) { for(int i = 0; i < 5; i++) { demo(() -> new AtomicLong(0), (ref) -> ref.getAndIncrement()); } for(int i = 0; i < 5; i++) { demo(() -> new LongAdder(), (ref) -> ref.increment()); } } private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) { ArrayList<Thread> list = new ArrayList<>(); T adder = supplier.get(); // 4 个线程,每人累加 50 万 for (int i = 0; i < 4; i++) { list.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { consumer.accept(adder); } })); } long start = System.nanoTime(); list.forEach(t -> t.start()); list.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start)/1000_000); }
public class LongAdder extends Striped64 implements Serializable {}
// 累加单元数组, 懒惰初始化 transient volatile Cell[] cells; // 基础值, 如果没有竞争, 则用 cas 累加这个域 transient volatile long base; // 在 cells 创建或扩容时, 置为 1, 表示加锁 transient volatile int cellsBusy;
public class Code_13_LockCas { // 如果 state 值为 0 表示没上锁, 1 表示上锁 public AtomicInteger state = new AtomicInteger(0); public void lock() { while (true) { if(state.compareAndSet(0, 1)) { break; } } } public void unlock() { log.debug("unlock..."); state.set(0); } public static void main(String[] args) { Code_13_LockCas lock = new Code_13_LockCas(); new Thread(() -> { log.info("begin..."); lock.lock(); try { log.info("上锁成功"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t1").start(); new Thread(() -> { log.info("begin..."); lock.lock(); try { log.info("上锁成功"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t2").start(); } }
// 缓存行伪共享 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值 final boolean cas(long prev, long next) { return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next); } // 省略不重要代码 }
public void increment() { add(1L); }
public void add(long x) { // as 为累加单元数组, b 为基础值, x 为累加值 Cell[] as; long b, v; int m; Cell a; // 进入 if 的两个条件 // 1. as 有值, 表示已经发生过竞争, 进入 if // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if // 3. 如果 as 没有创建, 然后 cas 累加成功就返回,累加到 base 中 不存在线程竞争的时候用到。 if ((as = cells) != null || !casBase(b = base, b + x)) { // uncontended 表示 cell 是否有竞争,这里赋值为 true 表示有竞争 boolean uncontended = true; if ( // as 还没有创建 as == null || (m = as.length - 1) < 0 || // 当前线程对应的 cell 还没有被创建,a为当线程的cell (a = as[getProbe() & m]) == null || // 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell ) !(uncontended = a.cas(v = a.value, v + x)) ) { // 当 cells 为空时,累加操作失败会调用方法, // 当 cells 不为空,当前线程的 cell 创建了但是累加失败了会调用方法, // 当 cells 不为空,当前线程 cell 没创建会调用这个方法 // 进入 cell 数组创建、cell 创建的流程 longAccumulate(x, null, uncontended); } } }
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cell if ((h = getProbe()) == 0) { // 初始化 probe ThreadLocalRandom.current(); // h 对应新的 probe 值, 用来对应 cell h = getProbe(); wasUncontended = true; } // collide 为 true 表示需要扩容 boolean collide = false; for (;;) { Cell[] as; Cell a; int n; long v; // 已经有了 cells if ((as = cells) != null && (n = as.length) > 0) { // 但是还没有当前线程对应的 cell if ((a = as[(n - 1) & h]) == null) { // 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x // 成功则 break, 否则继续 continue 循环 if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && // 判断槽位确实是空的 rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } // 有竞争, 改变线程对应的 cell 来重试 cas else if (!wasUncontended) wasUncontended = true; // cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 null else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 cas else if (n >= NCPU || cells != as) collide = false; // 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了 else if (!collide) collide = true; // 加锁 else if (cellsBusy == 0 && casCellsBusy()) { // 加锁成功, 扩容 continue; } // 改变线程对应的 cell h = advanceProbe(h); } // 还没有 cells, cells==as是指没有其它线程修改cells,as和cells引用相同的对象,使用casCellsBusy()尝试给 cellsBusy 加锁 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell // 成功则 break; boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } // 上两种情况失败, 尝试给 base 使用casBase累加 else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; } }
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException { // Unsafe 使用了单例模式,unsafe 对象是类中的一个私有的变量 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); Unsafe unsafe = (Unsafe)theUnsafe.get(null); }
public class UnsaferCAS { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException { // Unsafe 对象的获取 Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); Unsafe unsafe = (Unsafe)theUnsafe.get(null); //1.获取域的偏移量 long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id")); long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name")); Teacher t = new Teacher(); //2.执行cas操作 unsafe.compareAndSwapInt(t, idOffset, 0, 1); unsafe.compareAndSwapObject(t, nameOffset, null, "张三"); System.out.println(t); } } @Data class Teacher { volatile int id; volatile String name; }
public class UnsaferAtomicInteger implements Account { public static void main(String[] args) { Account.demo(new UnsaferAtomicInteger(10000)); } private volatile int value; private static final long valueOffset; private static final Unsafe UNSAFE; static { UNSAFE = UnsafeAccessor.getUnsafe(); try { valueOffset = UNSAFE.objectFieldOffset(UnsaferAtomicInteger.class.getDeclaredField("value")); } catch (NoSuchFieldException e) { e.printStackTrace(); throw new RuntimeException(e); } } public UnsaferAtomicInteger(Integer value) { this.value = value; } public Integer get() { return value; } @Override public Integer getBalance() { return get(); } @Override public void withdraw(Integer amount) { System.out.println(49); while (true) { Integer preVal = this.value; Integer nextVal = preVal - amount; if(UNSAFE.compareAndSwapInt(this, valueOffset, preVal, nextVal)) { break; } } System.out.println(57); } } interface Account { // 获取余额 Integer getBalance(); // 取款 void withdraw(Integer amount); /* * 方法内会启动 1000 个线程,每个线程做 -10 元 的操作 * 如果初始余额为 10000 那么正确的结果应当是 0 */ static void demo(Account account) { List<Thread> ts = new ArrayList<>(); System.out.println(72); for (int i = 0; i < 10; i++) { ts.add(new Thread(() -> { account.withdraw(10); })); } System.out.println(78); long startTime = System.nanoTime(); ts.forEach(Thread::start); System.out.println(81); for (Thread t : ts) { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(89); long end = System.nanoTime(); System.out.println(account.getBalance() + " cost: " + (end-startTime)/1000_000 + " ms"); } } //Unsafe 对象的获取 class UnsafeAccessor { private static final Unsafe unsafe; static { try { Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); unsafe = (Unsafe) theUnsafe.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { throw new Error(e); } } public static Unsafe getUnsafe() { return unsafe; } }
本章重点讲解
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { try { log.debug("{}", sdf.parse("1951-04-21")); } catch (Exception e) { log.error("{}", e); } }).start(); }
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd"); for (int i = 0; i < 10; i++) { new Thread(() -> { LocalDate date = dtf.parse("2018-10-01", LocalDate::from); log.debug("{}", date); }).start(); }
public final class String implements java.io.Serializable, Comparable<String>, CharSequence { /** The value is used for character storage. */ private final char value[]; /** Cache the hash code for the string */ private int hash; // Default to 0 // ... }
public String substring(int beginIndex, int endIndex) { if (beginIndex < 0) { throw new StringIndexOutOfBoundsException(beginIndex); } if (endIndex > value.length) { throw new StringIndexOutOfBoundsException(endIndex); } int subLen = endIndex - beginIndex; if (subLen < 0) { throw new StringIndexOutOfBoundsException(subLen); } // 上面是一些校验,下面才是真正的创建新的String对象 return ((beginIndex == 0) && (endIndex == value.length)) ? this : new String(value, beginIndex, subLen); }
public String(char value[], int offset, int count) { if (offset < 0) { throw new StringIndexOutOfBoundsException(offset); } if (count <= 0) { if (count < 0) { throw new StringIndexOutOfBoundsException(count); } if (offset <= value.length) { this.value = "".value; return; } } // Note: offset or count might be near -1>>>1. if (offset > value.length - count) { throw new StringIndexOutOfBoundsException(offset + count); } // 上面是一些安全性的校验,下面是给String对象的value赋值,新创建了一个数组来保存String对象的值 this.value = Arrays.copyOfRange(value, offset, offset+count); }
包装类
public static Long valueOf(long l) { final int offset = 128; if (l >= -128 && l <= 127) { // will cache return LongCache.cache[(int)l + offset]; } return new Long(l); }
Byte, Short, Long 缓存的范围都是 -128~127
Character 缓存的范围是 0~127
Integer 的默认范围是 -128~127,最小值不能变,但最大值可以通过调整虚拟机参数 "-Djava.lang.Integer.IntegerCache.high "来改变
Boolean 缓存了 TRUE 和 FALSE
String 池
参考文章:JDK1.8关于运行时常量池, 字符串常量池的要点
BigDecimal、BigInteger
public class Code_17_DatabaseConnectionPoolTest { public static void main(String[] args) { Pool pool = new Pool(2); for(int i = 0; i < 5; i++) { new Thread(() -> { Connection connection = pool.borrow(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } pool.free(connection); }).start(); } } } @Slf4j(topic = "c.Pool") class Pool { // 连接池的大小, 因为没有实现连接池大小的扩容, 用 final 表示池的大小是一个固定值。 private final int poolSize; // 连接池 private Connection[] connections; // 表示连接状态, 如果是 0 表示没连接, 1 表示有连接 private AtomicIntegerArray status; // 初始化连接池 public Pool(int poolSize) { this.poolSize = poolSize; status = new AtomicIntegerArray(new int[poolSize]); connections = new Connection[poolSize]; for(int i = 0; i < poolSize; i++) { connections[i] = new MockConnection("连接" + (i + 1)); } } // 从连接池中获取连接 public Connection borrow() { while (true) { for(int i = 0; i < poolSize; i++) { if(0 == status.get(i)) { if(status.compareAndSet(i,0, 1)) { log.info("获取连接:{}", connections[i]); return connections[i]; } } } synchronized (this) { try { log.info("wait ..."); wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 从连接池中释放指定的连接 public void free(Connection connection) { for (int i = 0; i < poolSize; i++) { if(connections[i] == connection) { status.set(i, 0); log.info("释放连接:{}", connections[i]); synchronized (this) { notifyAll(); } } } } } class MockConnection implements Connection { private String name; public MockConnection(String name) { this.name = name; } @Override public String toString() { return "MockConnection{" + "name='" + name + '\'' + '}'; } }
连接的动态增长与收缩
连接保活(可用性检测)
等待超时处理
分布式 hash
public class TestFinal { final int a = 20; }
0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: aload_0 5: bipush 20 7: putfield #2 // Field a:I <-- 写屏障 10: return