/** * callable方式创建线程 * 返回值 * * 线程间的通讯(wait/notify) */ public class Test1 { public static void main(String[] args) { // //开启一个子线程 // //继承Thread类 // new Thread(){ // @Override // public void run() { // // } // }.start(); // // //继承Runable接口 // new Thread(() -> { // // }).start(); System.out.println("主线程开始执行"); //开启子线程 FutureTask<String> futureTask = new FutureTask<>(new MyCallable()); new Thread(futureTask).start(); //主线程进行其他的业务处理 try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //获取子线程的返回值 try { String result = futureTask.get(); System.out.println("获得子线程的返回值:" + result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } class MyCallable implements Callable<String> { @Override public String call() throws Exception { System.out.println("子线程开始执行" + Thread.currentThread().getName()); Thread.sleep(5000); System.out.println("子线程执行结束" + Thread.currentThread().getName()); return "Hello"; } }
新建状态 -> 就绪状态 -> 运行状态 <-> 阻塞状态 -> 死亡状态
在多线程环境下,当n个线程同时的操作(增删改)一份共享资源时,因为线程调度的不确定性,可能引起资源状态前后的不一致,这种问题就是线程安全问题
要能够明确 竞态资源是否存在,是什么资源
引起线程不安全的原因在于:
1、可见性
2、原子性
3、有序性
任何一个业务,如果没有保证以上3个特性中的任意一个特性,就有线程安全的问题
一个线程对一份资源的修改,对其他线程必须立即可见
可见性的案例
public class Test2 { public static boolean flag = true; public static void main(String[] args) { System.out.println("主线程开始执行!"); //开启子线程 new Thread(){ @Override public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } flag = false; System.out.println("子线程重新设置flag变量:" + flag); } }.start(); while(flag){ } System.out.println("主线程执行结束!"); } }
一个操作是一个整体不可分割的
比如:
i = 10;//原子性
i++; //非原子性 x=i+1; | i=x;
j = 10.0;//在32位的系统上非原子,在64位的系统上是原子性注意:两个原子性的操作放在一起,就不是原子性的了
public class Test3 { private static int i = 0; public static void main(String[] args) { for (int j = 0; j < 1000; j++) { new Thread(() -> { i++; }).start(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("i的结果:" + i); } }
指令重排:cpu为了优化考虑,可能会打乱代码的执行顺序。
指令重拍的保证:cpu可以保证代码在单线程情况下,指令重拍后,不影响原程序的执行结果。
public class Test3 { private static boolean flag = true; private static Object obj; public static void main(String[] args) { //线程1 while(flag){ } int i = obj.hashCode(); //线程2 obj = new Object(); flag = false; } }
volatile关键字可以保证变量的可见性,以及局部有序性。
volatile不能保证原子性。使用volatile关键字修饰的变量,一旦被某个线程修改,其他线程是立刻可见的。
AtomicXxxx类里面的方法都是原子操作
ublic class Test3 { private static int i = 0; private static volatile AtomicInteger atomicInteger = new AtomicInteger(i); public static void main(String[] args) { for (int j = 0; j < 1000; j++) { new Thread(() -> { atomicInteger.getAndIncrement();//i++ }).start(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("i的结果:" + atomicInteger.get()); } }
CAS(compare and swap)本质上就是乐观锁操作
/** * flag.compareAndSet(0, 1) * 判断flag的值是否为0,如果为0就修改成1,并且返回true,如果不为0就不修改,并且返回false * 这个过程是一个原子不可分割的操作 */ public class Test4 { private static String name;//姓名 private static int age;//年龄 private static AtomicInteger flag = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { new Thread(() -> { boolean result = flag.compareAndSet(0, 1); if (result) { name = "小红"; try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } age = 18; } }).start(); new Thread(() -> { boolean result = flag.compareAndSet(0, 1); if (result) { name = "小明"; try { Thread.sleep(499); } catch (InterruptedException e) { e.printStackTrace(); } age = 17; } }).start(); new Thread(() -> { boolean result = flag.compareAndSet(0, 1); if (result) { name = "小刚"; try { Thread.sleep(498); } catch (InterruptedException e) { e.printStackTrace(); } age = 19; } }).start(); Thread.sleep(1000); System.out.println(name + " " + age); } }
synchronized关键字,可以保证可见性、原子性、有序性
可见性:解锁时,会强制的将所有变量刷新到主存中
原子性:加锁后,其他线程无法获得锁,也就不能打断线程中的程序执行
有序性:原子性保证了,有序性就保证了锁什么东西?- 重要
1、尽可能选择细粒度更小的对象上锁
2、synchronized修饰普通方法时,默认锁this对象
3、synchronized修饰静态方法时,默认锁当前类的class对象
JDK1.4提供的新的加锁方式,JDK1.5之后,对synchronized关键字做了一个优化,性能和Lock就差不多了。所以现在还是有很多程序员喜欢使用synchronized关键词
重入锁
//重入锁 Lock lock = new ReentrantLock(); new Thread(() -> { lock.lock(); System.out.println("线程1执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1执行结束"); lock.unlock(); }).start(); new Thread(() -> { lock.lock(); System.out.println("线程2执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程2执行结束"); lock.unlock(); }).start();
读写锁
package com.qf.demo14; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Test7 { public static void main(String[] args) { //读写锁 ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); //读锁 ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock(); //写锁 ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock(); new Thread(() -> { readLock.lock(); System.out.println("线程1执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1执行结束"); readLock.unlock(); }).start(); new Thread(() -> { writeLock.lock(); System.out.println("线程2执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程2执行结束"); writeLock.unlock(); }).start(); } }
读写锁:
读锁 兼容 读锁
读锁 不兼容 写锁
写锁 不兼容 写锁
…
CAS版
public class Test8 { private Test8(){} private static AtomicReference<Test8> reference = new AtomicReference<>(); public static Test8 getInstance(){ reference.compareAndSet(null, new Test8()); return reference.get(); } }
双重锁判定
public class Test8 { private Test8(){} private volatile static Test8 test8; public static Test8 getInstance(){ if(test8 == null){ synchronized (Test8.class) { if(test8 == null){ //线程1 test8 = new Test8(); //初始化对象 //1、申请堆内存空间 //2、初始化申请的堆内存空间 //3、将变量test8指向堆内存空间 } } } return test8; } }
Object obj = new Object(); Object obj2 = new Object(); new Thread(() -> { System.out.println("线程1执行。。。。"); synchronized (obj) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (obj2){ } } System.out.println("线程1结束。。。。"); }).start(); new Thread(() -> { System.out.println("线程2执行。。。。"); synchronized (obj2) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (obj){ } } System.out.println("线程2结束。。。。"); }).start();
这些基础集合,添加元素时,因为是多线程,同时没有任何锁机制,所以很大的概率发生一些元素覆盖或者丢失的情况。多线程同时读写也会造成一定的问题,比如读到写线程的中间状态,造成业务判定问题等等
在多线程环境下,可以使用一个线程安全的集合,比如Vector、Hashtable。但是Vector和Hashtable的锁的细粒度太大了,对于高并发的读写性能损耗太高,实际开发过程中,并不推荐使用。建议采用JDK1.4之后推出的JUC包中提供的一些线程安全的集合,比如ConcurrentHashMap等,这些集合在保证线程安全的同时,也尽可能的提高了并发能力。
思考:
1、是不是实际开发过程中就一定不能用ArrayList这种线程不安全的集合?- 不是,具体问题具体分析
2、使用了线程安全的集合,是否就意味着不会发生线程安全问题呢?- 所谓的线程安全集合,只是指里面的每个单独的方法是线程安全的,但是将这些方法组合起来形成的业务,并不能保证线程安全
CopyOnWriteArrayList 采用重入锁 + 写入时复制的手段,保证集合的线程安全。添加的时候加锁,写入时无需加锁(写入时复制的方式),以此来提高集合的读取的效率。
CopyOnWriteArrayList特别适合读多写少的场景,并发读取是没有任何锁机制,但是写入的成本会将对较高,每次写入都需要拷贝一新的数组。并且这种方式,可能使得读取的数据有一定概率是旧数据,所以如果程序允许这种短时间内的不一致性(最终一致性),CopyOnWriteArrayList是非常合适的,但是如果程序必须要求数据的绝对一致性,这时应该采用List list = Collections.synchronizedList(new ArrayList<>());这种方式获得线程安全的集合
写入时复制
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } }
底层实现
JDK1.7之前,采用分段锁的方式保证线程安全
Jdk1.8之后,采用CAS + synchronized 来保证线程安全(比JDK1.7锁的细粒度更小)
源码分析
添加元素的方法(put)
final V putVal(K key, V value, boolean onlyIfAbsent) { //key和value不能为null if (key == null || value == null) throw new NullPointerException(); //调用哈希函数,计算key的哈希值 int hash = spread(key.hashCode()); //1、标志位 //2、链表的长度 int binCount = 0; //循环 - 自旋 //tab指向底层的哈希表 for (Node<K,V>[] tab = table;;) { //f - 添加元素对应的哈希桶的第一个元素 //n - 哈希表的容量 //i - 添加元素计算出来的下标(对应的哈希桶的位置) //fh - 标志位 Node<K,V> f; int n, i, fh; //判断哈希表是否初始化 if (tab == null || (n = tab.length) == 0) //哈希表还未初始化,进行初始化的操作 //initTable方法是线程安全的,可以保证只有一个线程能够初始化哈希表 tab = initTable(); //i = (n - 1) & hash 计算新增元素的哈希桶的位置,赋值给i变量 //tabAt(tab, i = (n - 1) & hash) 等价于 tab[i = (n - 1) & hash] //因为哈希表本身是保证可见性的,但是哈希表中的元素并不保证可见性, //tabAt(tab, i = (n - 1) & hash)该方法是通过内存地址直接去内存中获取元素(保证可见性) //其实就是从哈希表中获得桶i的第一个元素赋值给f,判断是否为null else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //没有发生哈希碰撞 //开始讲新的值封装成Node节点,放入该桶的位置 //通过CAS的方式判断桶i的位置是否有元素,如果没有就赋值(原子性),如果有的话,就停止操作,进入下一轮for循环,继续自旋 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //获取桶i位置的第一个元素f哈希值,赋值给fh //如果fh==-1,则表示该桶i正在进行扩容的迁移 else if ((fh = f.hash) == MOVED) //当前线程保证扩容的线程,迁移桶i的元素 //迁移完成后,返回最新的哈希表 tab = helpTransfer(tab, f); else { //说明当前桶i有元素,并且没有在迁移 //新的元素就可以放入桶i的位置,但是需要解决哈希碰撞的问题 V oldVal = null; //进行加锁,锁桶i的第一个元素f //一个桶一个锁,最大化的降低了锁的细粒度 synchronized (f) { //双重锁判定,加锁之后再判断一次条件 if (tabAt(tab, i) == f) { //fh>0,说明当前桶i是一个链表 if (fh >= 0) { //因为是链表,binCount设置为1 //在链表循环的过程中,binCount代表着链表的长度 binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //判断key是否重复 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //将新的元素放入链表的尾部 //尾插 pred.next = new Node<K,V>(hash, key, value, null); break; } } } //判断当前桶i是否为一颗红黑树 else if (f instanceof TreeBin) { //走红黑树的逻辑 Node<K,V> p; //如果是红黑树,binCount会被设置为2 binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //binCount = 1 链表 //binCount = 2 红黑树 if (binCount >= TREEIFY_THRESHOLD) //判断是否需要转成红黑树 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //哈希表的扩容 addCount(1L, binCount); return null; }
初始化哈希表
//初始化哈希表 //可能有n个线程同时执行该方法 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //自旋 //如果哈希表为空,就走循环体,如果哈希表不为空,就直接返回哈希表对象 while ((tab = table) == null || tab.length == 0) { //如果sc<0 说明当前已经有线程在初始化哈希表了,当前线程进入让步状态 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //通过cas的方式,设置sizeCtl为-1表示已经有一个线程进行初始化哈希表了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //双重判定 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //初始化哈希表 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //将哈希表赋值给全局变量 table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
参数介绍
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数一:corePoolSize , 核心线程数,线程池的最小线程数量
参数二:maximumPoolSize,最大线程数,如果线程池的线程不够用时,会创建新的线程,但是不会超过这个数量
参数三:keepAliveTime,线程最大的空闲时间,超过核心线程数的线程,超过空闲时间后,就会被自动回收
参数四:unit,空闲时间的单位
参数五(重要):workQueue,线程池的阻塞队列对象(类型)
参数六:threadFactory,线程创建的工厂对象(决定了线程的创建方式)
参数七:handler,多余的任务(Runnable)如果无法放入阻塞队列,该用什么方式拒绝该任务(4种方式)
因为多线程环境下,线程调度有确定性,但是有时候的业务,需要一个线程基于另外一个线程的结果才能继续进行,这时就需要考虑线程间通讯的问题了
1、wait/notify/notifyAll
2、阻塞队列
1、wait和notify必须写在同步代码块(同步方法)中
2、必须有同步锁对象调用wait和notify方法问题:
1、wait和sleep的区别?- wait会释放锁资源,sleep不会释放
add - 如果队列满了,添加元素会报错
put - 如果队列满了,添加元素会阻塞
poll - 如果没有元素,直接返回null
take - 如果没有元素,会阻塞当前的线程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vJm1sTSn-1627987591497)(img/image-20200804160457464.png)]
**面试题:**有ABCD4个线程,A线程只能写A,B线程只能写B,以此类推。请编写一个程序,通过ABCD4个线程输出4个文件1,2,3,4。第一个文件中只有ABCDABCD, 第二个文件中输出BCDABCDA,第三个文件中输出CDABCDAB,第四个文件中输出DABCDABC