线程简介
线程就是独立的执行路径;
在程序运行时,即使没有自己创建线程,后台也会有多个线程,如主线程,gc线程;
main()称之为主线程,为系统的入口,用于执行整个程序;
在一个进程中,如果开辟多个线程,线程的运行由调度器安排调度,调度器是与操作系统紧密相关的,先后顺序是不能人为干预的。
对同一份资源操作时,会存在资源抢夺的问题,需要加入并发控制;
线程会带来额外的开销,如cpu调度时间,并发控制开销。
每个线程在自己的工作内存交互,内存控制不当会造成数据不一致
继承Thread类
实现Runnable接口
注意:extends 与 implements 区别
其实质属于函数式编程的概念
a-> System.out.println("i like lambda-->"+a); new Thread(()->System.out.println(“多线程学习...”)).start();
理解Functional Interface(函数式接口)是学习Java8 lambda表达式的关键所在
函数式接口:任何接口,如果只包含唯一一个抽象方法,那么它就是一个函数式接口
对于函数式接口,我们可以通过lambda表达式来创建该接口的对象
public class LambdaTest { // 2. 静态内部类 static class Love2 implements ILove{ @Override public void love(int a) { System.out.println("I love you --> " + a); } } public static void main(String[] args) { ILove lv = new Love(); lv.love(1); lv = new Love2(); lv.love(2); // 3. 局部内部类 class Love3 implements ILove{ @Override public void love(int a) { System.out.println("I love you --> " + a); } } lv = new Love3(); lv.love(3); // 4. 匿名内部类 lv = new ILove() { @Override public void love(int a) { System.out.println("I love you --> " + a); } }; lv.love(4); // 5-8. lambda 表达式 lv = (int a) -> { System.out.println("I love you --> " + a); }; lv.love(5); // 简化参数类型 lv = a -> { System.out.println("I love you --> " + a); }; // 多个参数也可简化,但不可简化括号 // lv = (a,b) -> { // System.out.println("I love you --> "+ a + b); // }; lv.love(6); // 简化括号(仅适用于一行代码) lv = a -> System.out.println("I love you --> " + a); lv.love(7); // 特殊情况(只输出参数的情况) ILove lv2 = System.out::println; lv2.love(8); } } // 0. 定义一个函数式接口 interface ILove{ void love(int a); } // 1. 实现类 class Love implements ILove{ @Override public void love(int a){ System.out.println("I love you --> " + a); } }
函数式接口 | 非函数式接口 |
---|---|
package functionalInterface; public class Demo01Test { public static void main(String[] args) { // 1. 调用show方法,方法的参数是一个接口,所以可以传递接口的实现类对象 show(new Demo01MyFunctionalInterfaceImpl()); // 2. 调用show方法,方法的参数是一个接口,所以我们可以传递接口的匿名内部类 show(new Demo01MyFunctionalInterface() { @Override public void method() { System.out.println("使用匿名内部类实现函数式接口"); } }); // 3. 使用Lambda 表达式 show(() -> System.out.println("使用Lambda实现函数式接口")); } public static void show(Demo01MyFunctionalInterface myInterface) { myInterface.method(); } }
package functionalInterface.demo02; public class Demo02Logger { public static void showLog(int level, String message) { if (level == 1) { System.out.println(message); } } public static void main(String[] args) { String msg1 = "Hello"; String msg2 = "World"; String msg3 = "Java"; showLog(1, msg1 + msg2 + msg3); // 无论level 是不是 1 , 字符串都会被拼接 } }
使用Lambda表达式作为参数传递,仅仅是把参数传递到showLog方法中
只有满足条件(日志的等级是1级),才会调用接口MessageBuilder中的方法builderMessage才会进行字符串的拼接
如果条件不满足,,那么MessageBuilder接口中的方法builderMessage也不会执行所以拼接字符串的代码也不会执行所以不会存在性能的浪费
package functionalInterface.demo02; public class Demo02lambda { // 定义一个显示日志的方法,方法的参数传递日志的等级和 MessageBuilder 接口 public static void showLog(int level, MessageBuilder mb) { if (level == 1) { System.out.println(mb.builderMessage()); } } public static void main(String[] args) { String msg1 = "Hello"; String msg2 = "World"; String msg3 = "Java"; showLog(2,() -> { return msg1 + msg2 + msg3;}); } } // ------------------------------------------------ // package functionalInterface.demo02; @FunctionalInterface public interface MessageBuilder { public abstract String builderMessage(); }
package functionalInterface.demo02; public class Demo03Runnable { public static void startThread(Runnable run) { // 开启多线程 new Thread(run).start(); } public static void main(String[] args) { startThread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "线程启动了"); } }); // lambda 表达式 startThread(() -> {System.out.println(Thread.currentThread().getName() + "线程启动了");}); startThread(() -> System.out.println(Thread.currentThread().getName() + "线程启动了")); } }
package functionalInterface.demo02; import java.util.Arrays; import java.util.Comparator; public class Demo04Comparator { public static Comparator<String> getComparator() { // 1. 方法的返回值类型是一个接口,那么可以返回这个接口的匿名内部类 // return new Comparator<String>() { // @Override // public int compare(String o1, String o2) { // return o2.length() - o1.length(); // } // }; // 2. 方法的返回类型是函数式接口,可以返回lambda 表达式 return (o1, o2) -> o2.length() - o1.length(); } public static void main(String[] args) { String[] arr = {"aaa", "b", "cccccccc", "ddddddddddddd"}; System.out.println(Arrays.toString(arr)); Arrays.sort(arr, getComparator()); System.out.println(Arrays.toString(arr)); } }
java.uti1.function.Supplier<T>
接口仅包含一个无参的方法:T get()
。用来获取一个泛型参数指定类型的对象数据。由于这是一个函数式接口,这也就意味着对应的Lambda表达式需要"对外提供"一个符合泛型类型的对象数据。
package functionalInterface.demo03; import java.util.function.Supplier; public class Demo01Supplier { public static String getString(Supplier<String>sup) { return sup.get(); } public static void main(String[] args) { // 函数式接口可以使用 lambda 表达式 String s = getString(() -> { return "Kite"; }); String s1 = getString(() -> "Lee"); System.out.println(s); System.out.println(s1); } }
package functionalInterface.demo03; import java.util.function.Supplier; public class Demo02Supplier { public static int getMax(Supplier<Integer> sup) { return sup.get(); } public static void main(String[] args) { int[] arr = {1, 23, 43, 54, 64, 10, 0}; int MAX = getMax(() -> { int max = arr[0]; for (int i : arr) { if (i > max) { max = i; } } return max; }); System.out.println(MAX); } }
package functionalInterface.demo04consumer; import java.util.function.Consumer; public class Demo01Consumer { public static void method(String name, Consumer<String> con) { con.accept(name); } public static void main(String[] args) { method("Kite", (name) -> { String reName = new StringBuffer(name).reverse().toString(); System.out.println(reName); }); } }
package functionalInterface.demo04consumer; import java.util.function.Consumer; public class Demo02ConsumerAndThen { public static void method(String s, Consumer<String> con1, Consumer<String> con2) { // con1.accept(s); // con2.accept(s); con1.andThen(con2).accept(s); } public static void main(String[] args) { method("abc", (s) -> { s+="ABC"; System.out.println(s); }, (s) -> { s+="123"; System.out.println(s); }); } } // abcABC // abc123
package functionalInterface.demo04consumer; import java.util.function.Consumer; public class Demo03ConsumerTest { public static void printInfo(String[] arr, Consumer<String> con1, Consumer<String> con2) { for (String message : arr) { con1.andThen(con2).accept(message); } } public static void main(String[] args) { String[] arr = {"胡歌, 男", "刘亦菲, 女", "彭于晏, 男"}; printInfo(arr, (msg) -> { String name = msg.split(", ")[0]; System.out.print("姓名: " + name + "\t"); }, (msg) -> { String gender = msg.split(", ")[1]; System.out.println(", 性别: " + gender); }); } } /* 姓名: 胡歌 , 性别: 男 姓名: 刘亦菲 , 性别: 女 姓名: 彭于晏 , 性别: 男 */
package functionalInterface.demo05predicate; import java.util.function.Predicate; public class Demo01Predicate { public static boolean checkString(String s, Predicate<String> pre) { return pre.test(s); } public static void main(String[] args) { String s = "abcdef"; boolean b = checkString(s, (str) -> str.length() > 5); System.out.println(b); } }
package functionalInterface.demo05predicate; import java.util.function.Predicate; public class Demo02Predicate_and { public static boolean checkString(String s, Predicate<String> pre1, Predicate<String> pre2) { // return pre1.test(s) && pre2.test(s); return pre1.and(pre2).test(s); } public static void main(String[] args) { String str = "abcdef"; boolean b = checkString(str, (s) -> s.length() > 5, (s) -> s.contains("a") ); System.out.println(b); } }
package functionalInterface.demo05predicate; import java.util.function.Predicate; public class Demo03Predicate_or { public static boolean checkString(String s, Predicate<String> pre1, Predicate<String> pre2) { // return pre1.test(s) || pre2.test(s); return pre1.or(pre2).test(s); } public static void main(String[] args) { String str = "abcdef"; boolean b = checkString(str, (s) -> s.contains("g"), (s) -> s.contains("a") ); System.out.println(b); } }
package functionalInterface.demo05predicate; import java.util.function.Predicate; public class Demo04Predicate_negate { public static boolean checkString(String s, Predicate<String> pre) { // return !pre.test(s); return pre.negate().test(s); } public static void main(String[] args) { String str = "abcdef"; boolean b = checkString(str,(s) -> s.length() > 5); System.out.println(b); } }
package functionalInterface.demo05predicate; import java.util.ArrayList; import java.util.function.Predicate; public class Demo05PredicateTest { public static ArrayList<String> filter(String[] arr, Predicate<String> pre1, Predicate<String> pre2) { ArrayList<String> list = new ArrayList<String>(); for (String s : arr) { boolean b = pre1.and(pre2).test(s); if (b) { list.add(s); } } return list; } public static void main(String[] args) { String[] arr = {"胡歌, 男", "刘亦菲, 女", "彭于晏, 男", "赵丽颖, 女", "邓紫棋, 女", "王菲, 女"}; ArrayList<String> newList = filter(arr, (str) -> str.split(", ")[0].length() > 2, (str) -> str.split(", ")[1].equals("女") ); System.out.println(newList); } }
// 自己写的方法明显要差一些 package functionalInterface.demo05predicate; import java.util.ArrayList; import java.util.function.Predicate; public class Demo05PredicateTest2 { public static boolean checkInfo(String s, Predicate<String> pre1, Predicate<String> pre2) { return pre1.and(pre2).test(s); } public static void main(String[] args) { ArrayList<String> list = new ArrayList<>(); String[] arr = {"胡歌, 男", "刘亦菲, 女", "彭于晏, 男"}; for (String s : arr) { boolean b = checkInfo(s, (str) -> { String name = str.split(", ")[0]; return name.length() > 2; }, (str) -> { String gender = str.split(", ")[1]; return gender.equals("女"); }); if (b) { list.add(s); } } System.out.println(list); } }
package functionalInterface.demo06function; import java.util.function.Function; public class Demo01Function { public static void change(String s, Function<String, Integer> fun) { int in = fun.apply(s); System.out.println(in); } public static void main(String[] args) { String s = "1234"; //change(s, (String str) -> Integer.parseInt(str)); change(s, Integer::parseInt); } }
package functionalInterface.demo06function; import java.util.function.Function; public class Demo02Function_andthen { public static void change(String s, Function<String, Integer> fun1, Function<Integer, String> fun2) { String ss = fun1.andThen(fun2).apply(s); System.out.println(ss); } public static void main(String[] args) { String s = "123"; change(s, (str) -> Integer.parseInt(str) + 10, (str) -> str + ""); } }
package functionalInterface.demo06function; import java.util.function.Function; public class Demo03FunctionTest { public static int change(String s, Function<String, String> fun1, Function<String, Integer> fun2, Function<Integer, Integer> fun3) { return fun1.andThen(fun2).andThen(fun3).apply(s); } public static void main(String[] args) { String str = "赵丽颖, 20"; int num = change(str, (s) -> s.split(", ")[1], (s) -> Integer.parseInt(s), (s) -> s + 100 ); System.out.println(num); } }
package functionalInterface.demo06function; import java.util.function.Function; public class Demo04FunctionTest2 { public static String method(String s, Function<String,String> fun1, Function<String,String> fun2) { return fun1.andThen(fun2).apply(s); } public static void main(String[] args) { String str = "abc"; String newStr = method(str, (s) -> s + "ABC", (s) -> s + "123"); System.out.println(newStr); } }
@FunctionalInterface public interface Printable { void print(String str); }
public class MethodRerObject { public void printUpperCaseString(String str) { System.out.println(str.toUpperCase()); } }
public class Demo01ObjectMethodReference { public static void printString(Printable p) { p.print("hello"); } public static void main(String[] args) { printString((s) -> { MethodRerObject obj = new MethodRerObject(); obj.printUpperCaseString(s); }); } }
Math类是存在的, abs计算绝对值的静态方法也是已经存在的,所以我们可以直接通过类名引用静态方法
@FunctionalInterface public interface Calcable { int calsAbs(int numbers); }
public class Demo02StaticMethodReference { public static int method(int number, Calcable c) { return c.calsAbs(number); } public static void main(String[] args) { int number = method(-10, (n) -> Math.abs(n)); System.out.println(number); int number2 = method(-5, Math::abs); System.out.println(number2); } }
@FunctionalInterface public interface Greetable { void greet(); }
public class Human { public void sayHi() { System.out.println("Hello, 我是Human"); } }
public class Man extends Human { @Override public void sayHi() { System.out.println("Hello, 我是Man"); } public void method(Greetable g) { g.greet(); } public void show() { // method(() -> { // Human h = new Human(); // h.sayHi(); // }); // 因为有子父类关系,所以存在的一个关键字super,代表父类,所以我们可以直接使用super调用父类的成员方法 // method(() -> super.sayHi()); method(super::sayHi); } public void method2() { super.sayHi(); } public static void main(String[] args) { new Man().show(); // 子类方法调用 new Man().method2(); // 子类直接调用 } }
public interface Richable { void buy(); }
public class Husband { public void buyHouse() { System.out.println("北京二环内买一套四合院"); } public void merry(Richable r) { r.buy(); } public void soHappy() { merry(this::buyHouse); // merry(() -> { // this.buyHouse(); // }); } public static void main(String[] args) { new Husband().soHappy(); } }
构造方法new Person(String name)已知
创建对象已知
new就可以使用person引用new创建对象
@FunctionalInterface public interface PersonBuilder { Person builderPerson(String name); }
public class Person { private String name; public Person() { } public Person(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
public class Demo { public static void printName(String name, PersonBuilder pb) { Person person = pb.builderPerson(name); System.out.println(person.getName()); } public static void main(String[] args) { // printName("Kite", (name) -> new Person(name)); printName("Kite", Person::new); } }
@FunctionalInterface public interface ArrayBuilder { int[] builderArray(int length); }
public class Demo { public static int[] createArray(int length, ArrayBuilder ab) { return ab.builderArray(length); } public static void main(String[] args) { // int[] arr1 = createArray(10, (len) -> new int[10]); int[] arr1 = createArray(10,int[]:: new); System.out.println(arr1.length); } }
方法 | 说明 |
---|---|
setPriority(int newPriority) | 更改线程的优先级 |
static void sleep(long millis) | 在指定的毫秒数内让当前正在执行的线程休眠 |
void join() | 等待该线程终止 |
static void yield() | 暂停当前正在执行的线程对象,并执行其他线程 |
void interrupt() | 中断线程,别用这个方式 |
boolean isAlive() | 测试线程是否处于活动状态 |
sleep(时间)指定当前线程阻塞的毫秒数;
sleep存在异常InterruptedException;
sleep时间达到后线程进入就绪状态;
sleep可以模拟网络延时,倒计时等;
每一个对象都有一个锁,sleep不会释放锁;
礼让不一定成功,看 cpu 分配
此线程执行完后再执行其他线程,其他线程阻塞
Java提供一个线程调度器来监控程序中启动后进入就绪状态的所有线程,线程调度器按照优先级决定应该调度哪个线程来执行。
线程的优先级用数字表示,范围从1~10。
使用以下方式改变或获取优先级
处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象.这时候我们就需要线程同步。线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面线程使用完毕,下一个线程再使用。
由于同一进程的多个线程共享同一块存储空间,在带来方便的同时,也带来了访问冲突问题,为了保证数据在方法中被访问时的正确性,在访问时加入锁机制synchronized,当一个线程获得对象的排它锁,独占资源,其他线程必须等待,使用后释放锁即可.存在以下问题:
由于我们可以通过private关键字来保证数据对象只能被方法访问,所以我们只需要针对方法提出一套机制,这套机制就是synchronized关键字,它包括两种用法:
同步方法:public synchronized void method(int args){ }
synchronized方法控制对“对象”的访问,每个对象对应一把锁,每个synchronized方法都必须获得调用该方法的对象的锁才能执行,否则线程会阻塞,方法一旦执行,就独占该锁,直到该方法返回才释放锁,后面被阻塞的线程才能获得这个锁,继续执行
缺陷:若将一个大的方法申明为synchronized将会影响效率
方法里面需要修改的内容才需要锁,锁的太多,浪费资源
同步块:synchronized(Obj){ }
obj称之为同步监视器
同步监视器的执行过程
产生死锁的四个必要条件
class A{ private final ReentrantLock lock = new ReenTrantLock(); public void m(){ lock.lock(); // 加锁 try{ //保证线程安全的代码; } finally { lock.unlock(); //如果同步代码有异常,要将unlock()写入finally语句块 } } }
应用场景:生产者和消费者问题
这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件.
方法名 | 作用 |
---|---|
wait() | 表示线程一直等待,直到其他线程通知,与sleep不同,会释放锁 |
wait(long timeout) | 指定等待的毫秒数 |
notify() | 唤醒一个处于等待状态的线程 |
notifyAll() | 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度 |
注意:均是object类的方法,都只能在同步方法或者同步代码块中使用,否则会抛出异常 IllegalMonitorStateException
生产者将生产好的数据放入缓冲区,消费者从缓冲区拿出数据
背景:
经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
思路:
提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁创建销毁、实现重复利用。类似生活中的公共交通工具。
好处:
JDK 5.0起提供了线程池相关API:ExecutorService和Executors
java.util.con current
java.util.con current.atomic
java.util.con current.locks
进程:每个进程都有独立的代码和数据空间(进程上下文),进程间的切换会有较大的开销,一个进程包含1-n个线程。(进程是资源分配的最小单位)
线程:同一类线程共享代码和数据空间,每个线程有独立的运行栈和程序计数器(PC),线程切换开销小。(线程是cpu调度的最小单位)
程序是指令和数据的有序集合,其本身没有任何运行的含义,是一个静态的概念。
一个进程可以有多个线程,如视频中同时听声音,看图像,看弹幕,等等
进程是执行程序的一次执行过程,它是一个动态的概念。
注意:很多多线程是模拟出来的,真正的多线程是指有多个cpu,即多核,如服务器。如果是模拟出来的多线程,即在一个cpu的情况下,在同一个时间点,cpu只能执行一个代码,因为切换的很快,所以就有同时执行的错觉。
线程就是独立的执行路径;
在程序运行时,即使没有自己创建线程,后台也会有多个线程,如主线程,gc线程;
main()称之为主线程,为系统的入口,用于执行整个程序;
在一个进程中,开辟多个线程,线程的运行由调度器安排调度,调度器与操作系统紧密相关,先后顺序不能人为干预。
对同一份资源操作时,会存在资源抢夺的问题,需要加入并发控制;
线程会带来额外的开销,如cpu调度时间,并发控制开销。
每个线程在自己的工作内存交互,内存控制不当会造成数据不一致。
java 并不能开启线程
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } // 本地方法,底层的 C++,java 无法直接操作硬件 private native void start0();
并发是指多个任务,这些任务在重叠的时间段内以无特定顺序启动,运行和完成。
并行是指多个任务或唯一任务的多个部分在逻辑上同时运行。
当我们谈论至少两个或更多任务时,并发这个定义是适用的。当一个应用程序实际上可以同时执行两个任务时,我们将其称为并发应用程序。尽管这里的任务看起来像是同时运行的,但实际上它们可能不一样。它们利用操作系统的CPU时间分片功能,其中每个任务运行其任务的一部分,然后进入等待状态。当第一个任务处于等待状态时,会将CPU分配给第二个任务以完成其一部分任务。
并行不需要两个任务存在。通过为每个任务或子任务分配一个内核,它实际上使用多核CPU基础结构同时运行部分任务或多个任务。
并行性本质上要求具有多个处理单元的硬件。在单核CPU中,您可能会获得并发性,但不能获得并行性。
并发与并行之间的区别现在,让我们列出并发与并行之间的显着区别。并发是两个任务可以在重叠的时间段内启动,运行和完成的时间。并行是指任务实际上在同一时间运行。
package com.lee.demo01; public class Demo01 { public static void main(String[] args) { // 获取 CPU 核数 // CPU 密集型, IO 密集型 System.out.println(Runtime.getRuntime().availableProcessors()); } }
并发编程的本质:充分利用 CPU 的资源
线程的六个状态:
public enum State { /** * Thread state for a thread which has not yet started. */ // 新生 NEW, /** * Thread state for a runnable thread. A thread in the runnable * state is executing in the Java virtual machine but it may * be waiting for other resources from the operating system * such as processor. */ // 运行 RUNNABLE, /** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */ // 阻塞 BLOCKED, /** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called <tt>Object.wait()</tt> * on an object is waiting for another thread to call * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on * that object. A thread that has called <tt>Thread.join()</tt> * is waiting for a specified thread to terminate. */ // 等待(一直等) WAITING, /** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */ // 超时等待 TIMED_WAITING, /** * Thread state for a terminated thread. * The thread has completed execution. */ // 终止 TERMINATED; }
wait 与 sleep 的区别
- 来自不同的类
- wait => Object
- sleep => Thread
- 关于锁的释放
- wait 会释放锁
- sleep 不会释放锁
- 使用范围
- wait 只能在同步代码块中使用
- sleep 可以任意使用
- 是否需要捕获异常
- wait 需要捕获异常
- sleep 需要捕获异常
lock
公平锁:十分公平,可以先来后到
非公平锁:十分不公平,可以插队
代码中比较了不加锁(顺序混乱),使用synchronized 与 Lock 锁
package com.lee.demo02_lock; // 基本的卖票例子 import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static java.lang.Thread.sleep; /** * 真正的多线程开发,公司中的开发,降低耦合性 * 线程就是一个单独的资源类,没有任何的附属操作 * 1. 属性、方法 */ public class SaleTicketDemo01 { public static void main(String[] args) throws InterruptedException { // 并发:多线程操作同一个资源类 Ticket ticket1 = new Ticket(); new Thread( ()-> { for (int i = 0; i < 40; i++) { ticket1.sale(); } },"A").start(); new Thread( ()-> { for (int i = 0; i < 40; i++) { ticket1.sale(); } },"B").start(); new Thread( ()-> { for (int i = 0; i < 40; i++) { ticket1.sale(); } },"C").start(); sleep(1000); System.out.println("----------- synchronized 同步以后 -----------"); Ticket ticketSyn = new Ticket(); // 简化了 for 循环 new Thread( ()-> {for (int i = 0; i < 40; i++) ticketSyn.saleSyn();},"O").start(); new Thread( ()-> {for (int i = 0; i < 40; i++) ticketSyn.saleSyn();},"P").start(); new Thread( ()-> {for (int i = 0; i < 40; i++) ticketSyn.saleSyn();},"Q").start(); sleep(1000); System.out.println("----------- Lock 同步以后 -----------"); Ticket ticketLock = new Ticket(); // 简化了 for 循环 new Thread( ()-> {for (int i = 0; i < 40; i++) ticketLock.saleLock();},"X").start(); new Thread( ()-> {for (int i = 0; i < 40; i++) ticketLock.saleLock();},"Y").start(); new Thread( ()-> {for (int i = 0; i < 40; i++) ticketLock.saleLock();},"Z").start(); } } /* // 资源类 OOP class Ticket implements Runnable { // implements Runnable --> 耦合性太高 @Override public void run() { } */ // 资源类 OOP class Ticket { // 属性、方法 private int number = 40; // 卖票的方式 public void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余: " + number); } } /************************************* synchronized *************************************/ public synchronized void saleSyn() { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余: " + number); } } /************************************* Lock *************************************/ Lock lock = new ReentrantLock(); // 第一步 : new // 卖票的方式 public void saleLock() { lock.lock(); // 第二步 : 加锁 try { if (number > 0) { System.out.println(Thread.currentThread().getName() + "卖出了" + (number--) + "票,剩余: " + number); } } catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); // 第三步 : 解锁 } } }
- Synchronized 是内置的Java关键字,Lock是一个Java类
- Synchronized 无法判断获取锁的状态,Lock可以判断是否获取到了锁
- Synchronized 会自动释放锁,lock必须要手动释放锁!如果不释放锁,死锁
- Synchronized 线程1(获得锁,阻塞)、线程2(等待,傻傻的等);Lock锁就不一定会等待下去(lock.trylock());
- Synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,可以判断锁,默认非公平(可以自己设置);
- Synchronized 适合锁少量的代码同步问题,Lock 适合锁大量的同步代码!
使用synchronized
package com.lee.demo03; /** * 线程之间的通信问题:生产者消费者问题 (通知和等待唤醒) * 线程交替执行 A B 操作同一变量 num = 0 * A num + 1 * B num - 1 * 只要是并发一定要有锁 * */ public class PC { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.increment(); } },"A" ).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.decrement(); } },"B" ).start(); } } // 判断 业务 通知 class Data { // 数字 资源类 private int number = 0; public synchronized void increment() { if (number != 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number++; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 this.notifyAll(); } public synchronized void decrement() { if (number == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 this.notifyAll(); } }
当同时运行四个线程时,会出现虚假唤醒
解决方法就是使用 while 判断代替 if 判断
package com.lee.demo03; public class PC { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.increment(); } },"A" ).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.decrement(); } },"B" ).start(); // 如果只运行两个线程则是安全的 // 下面增加两个线程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.increment(); } },"C" ).start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } data.decrement(); } },"D" ).start(); } } // 判断 业务 通知 class Data { // 数字 资源类 private int number = 0; public synchronized void increment() { // if (number != 0) { while (number != 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number++; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 this.notifyAll(); } public synchronized void decrement() { // if (number == 0) { while (number == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 this.notifyAll(); } }
使用 Lock
package com.lee.demo03; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class PC_lock { public static void main(String[] args) { Data data = new Data(); new Thread(() -> {for (int i = 0; i < 10; i++) { data.increment(); }},"A" ).start(); new Thread(() -> {for (int i = 0; i < 10; i++) { data.decrement(); }},"B" ).start(); // 如果只运行两个线程则是安全的 // 下面增加两个线程 new Thread(() -> {for (int i = 0; i < 10; i++) { data.increment(); }},"C" ).start(); new Thread(() -> {for (int i = 0; i < 10; i++) { data.decrement(); }},"D" ).start(); } } // 判断 业务 通知 class Data2 { // 数字 资源类 private int number = 0; Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); public synchronized void increment() { lock.lock(); try { while (number != 0) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public synchronized void decrement() { lock.lock(); try { while (number != 0) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "-->" + number); // 通知其他线程 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
condition 的优势--可以精准的唤醒
package com.lee.demo03; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionTest { public static void main(String[] args) { Data3 data3 = new Data3(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printA(); } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printB(); } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printC(); } },"C").start(); // new Thread(()->{},"D").start(); } } class Data3 { // 资源类 Lock private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int num = 1; //num = 1 —> A 执行, 2 -> B执行, 3 -> C 执行 public void printA() { lock.lock(); try { // 业务 判断 —> 执行 —> 通知 while (num != 1) { condition1.await(); } System.out.println(Thread.currentThread().getName() + "-> AAA"); num = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { // 业务 判断 —> 执行 —> 通知 while (num != 2) { condition2.await(); } System.out.println(Thread.currentThread().getName() + "-> BBB"); num = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { // 业务 判断 —> 执行 —> 通知 while (num != 3) { condition3.await(); } System.out.println(Thread.currentThread().getName() + "-> CCC"); num = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
补充生产者消费者问题lock锁代码(学习Thread部分时)
package com.lee.demo03; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Chicken_Test { public static void main(String[] args) { SynContainer container = new SynContainer(); new Productor(container).start(); new Consumer(container).start(); } } // 生产者 class Productor extends Thread { SynContainer container; public Productor(SynContainer container) { this.container = container; } // 生产 @Override public void run() { for (int i = 1; i <= 35; i++) { System.out.println("生产了 -->" + i + "只鸡 " + (int)(container.count + 1)); container.push(new Chicken(i)); } } } // 消费者 class Consumer extends Thread { SynContainer container; public Consumer(SynContainer container) { this.container = container; } // 消费 @Override public void run() { for (int i = 1; i <= 30; i++) { System.out.println("消费了-->" + container.pop().id +"只鸡 " + (int)(container.count + 1)); } } } // 产品 class Chicken { int id; public Chicken(int id) { this.id = id; } } // 缓冲区 class SynContainer { // 需要一个容器大小 Chicken[] chickens = new Chicken[10]; Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); // 容器计数器 int count = 0; // 生产者放入产品 public void push(Chicken chicken) { // 如果容器满了,就需要等待消费者消费 lock.lock(); try { while (count == chickens.length - 1) { // 生产等待 condition1.await(); } // 如果没有满,我们就需要丢入产品 count++; chickens[count] = chicken; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } // 消费者消费产品 public Chicken pop() { // 判断能否消费 lock.lock(); try { while (count <= 2) { // 生产等待 condition2.await(); } // 如果没有满,我们就需要丢入产品 Chicken chicken2 = chickens[count]; condition1.signal(); count--; return chicken2; } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } return null; } }
8锁问题:new 锁的就是一个对象,实体;有 static 锁的就是类
// 问题 1:标准情况下,A B 哪个先执行? // A 因为使用了 synchronized ,所以先锁住一个以后,就会执行完才解锁 public class test01 { public static void main(String[] args) { Phone phone = new Phone(); new Thread(phone::sendSms,"A").start(); new Thread(phone::call,"B").start(); } } class Phone { public synchronized void sendSms() { System.out.println("send..."); } public synchronized void call() { System.out.println("call..."); } } // 问题 2: A 延迟1秒的情况下,A B 哪个先执行? // A 因为使用了 synchronized ,所以先锁住一个以后,就会执行完才解锁 import java.util.concurrent.TimeUnit; public class test02 { public static void main(String[] args) { Phone2 phone2 = new Phone2(); new Thread(phone2::sendSms,"A").start(); new Thread(phone2::call,"B").start(); } } class Phone2 { public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } public synchronized void call() { System.out.println("call..."); } } // 问题 3:synchronized A 延迟4秒,普通方法 B 延迟1秒,哪个先执行? // B 因为B没有使用 synchronized 所以谁的延迟时间段,谁就先执行。 import java.util.concurrent.TimeUnit; public class test03 { public static void main(String[] args) { Phone3 phone3 = new Phone3(); new Thread(phone3::sendSms,"A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(phone3::hello,"B").start(); } } class Phone3 { public synchronized void sendSms() { System.out.println("send..."); } public synchronized void call() { System.out.println("call..."); } public void hello() { System.out.println("Hello..."); } } // 问题 4:对象A synchronized 方法延迟1秒,对象 B synchronized 方法无延迟,哪个先执行? // B 因为AB是两个对象,不是同一把锁,没有延迟先执行。 import java.util.concurrent.TimeUnit; public class test04 { public static void main(String[] args) { Phone4 phone41 = new Phone4(); Phone4 phone42 = new Phone4(); new Thread(phone41::sendSms,"A").start(); new Thread(phone42::call,"B").start(); } } class Phone4 { public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } public synchronized void call() { System.out.println("call..."); } } // 上述问题,锁的对象是都是方法的调用者。
// 问题 5:两个方法都使用 static 和 synchronized 修饰,一个对象,哪个先执行? // A 因为 static 修饰以后为静态方法,synchronized 锁的是对应的类。 import java.util.concurrent.TimeUnit; public class test05 { public static void main(String[] args) { Phone5 phone5 = new Phone5(); new Thread(()-> phone5.sendSms(),"A").start(); new Thread(()-> phone5.call(),"B").start(); } } class Phone5 { public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } public static synchronized void call() { System.out.println("call..."); } } // 问题 6:两个方法都使用 static 和 synchronized 修饰,两个对象,哪个先执行? // A 因为 static 修饰以后为静态方法,synchronized 锁的是对应的类,AB同属一个类。 import java.util.concurrent.TimeUnit; public class Test06 { public static void main(String[] args) { Phone6 phone61 = new Phone6(); Phone6 phone62 = new Phone6(); new Thread(()-> phone61.sendSms(),"A").start(); new Thread(()-> phone62.call(),"B").start(); } } class Phone6 { public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } public static synchronized void call() { System.out.println("call..."); } } // 问题 7:1个方法都使用 static 和 synchronized 修饰,1个synchronized 修饰,一个对象,哪个先执行? // B 因为 static 修饰以后为静态方法,synchronized 锁的是对应的类,第二个 synchronizedAB 锁的是方法的调用者,延迟少的优先。 import java.util.concurrent.TimeUnit; public class Test07 { public static void main(String[] args) { Phone7 phone7 = new Phone7(); new Thread(()-> phone7.sendSms(),"A").start(); new Thread(()-> phone7.call(),"B").start(); } } class Phone7 { // 锁的是类 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } // 锁的是 方法的调用者 public synchronized void call() { System.out.println("call..."); } } // 问题 8:1个方法都使用 static 和 synchronized 修饰,1个synchronized 修饰,两个对象,哪个先执行? // B 因为 static 修饰以后为静态方法,synchronized 锁的是对应的类,第二个 synchronizedAB 锁的是方法的调用者,延迟少的优先。 import java.util.concurrent.TimeUnit; public class Test08 { public static void main(String[] args) { Phone8 phone81 = new Phone8(); Phone8 phone82 = new Phone8(); new Thread(()-> phone81.sendSms(),"A").start(); new Thread(()-> phone82.call(),"B").start(); } } class Phone8 { // 锁的是类 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send..."); } // 锁的是 方法的调用者 public synchronized void call() { System.out.println("call..."); } }
并发下 ArrayList 不安全
解决方案1 :
List<String> list = new Vector<>();
解决方案2 :
List<String> list = Collections.synchronizedList(new ArrayList<>());
解决方案3 :
List<String> list = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList 使用的是 lock 锁, Vector 使用的是 synchronized, CopyOnWriteArrayList 更好一些
import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; public class ListTest { public static void main(String[] args) { // List<String> list = Arrays.asList("1", "2", "3"); // list.forEach(System.out::println); // java.util.ConcurrentModificationException 并发修改异常 List<String> list2 = new ArrayList<>(); // 在并发下,ArrayList 是不安全的 // for (int i = 1; i <= 10; i++) { // new Thread(() -> { // list2.add(UUID.randomUUID().toString().substring(0,5)); // System.out.println(list2); // }).start(); // } // 解决方案 1 // List<String> list3 = new Vector<>(); // for (int i = 1; i <= 10; i++) { // new Thread(() -> { // list3.add(UUID.randomUUID().toString().substring(0,5)); // System.out.println(list3); // }).start(); // 解决方案 2 List<String> list4 = Collections.synchronizedList(new ArrayList<>()); // 解决方案 3 // CopyOnWrite 写入时复制 COW List<String> list5 = new CopyOnWriteArrayList<>(); for(int i = 1; i <= 10; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { list5.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list5); }).start(); } } } /* java.util.ConcurrentModificationException 并发修改异常 OOM 内存溢出 StackOverflow 栈溢出 */
Map 不安全
Map
了解一下 HashMap 加载因子、和初始化容量
import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class MapTest { public static void main(String[] args) { // java.util.ConcurrentModificationException Map<String, String> map1 = new HashMap<>(); for (int i = 1; i <= 30; i++) { new Thread(() -> { map1.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(map1); }, String.valueOf(i)).start(); } // map 是这样用的吗? 不是,工作中不用HashMap // 默认等价于什么? new HashMap<>(16,0.75); Map<String, String> map2 = new ConcurrentHashMap<>(); for (int i = 1; i <= 30; i++) { new Thread(() -> { map2.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(map2); }, String.valueOf(i)).start(); } } }
Set 不安全
Set
import java.util.HashSet; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; public class SetTest { public static void main(String[] args) { // java.util.ConcurrentModificationException Set<String> set = new HashSet<String>(); // for (int i = 1; i < 20; i++) { // new Thread(()-> { // set.add(UUID.randomUUID().toString().substring(0, 5)); // System.out.println(set); // }).start(); // } Set<String> set2 = new CopyOnWriteArraySet<>(); for (int i = 1; i < 20; i++) { new Thread(()-> { set2.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set2); }).start(); } } }
可以有返回值
可以抛出异常
方法不同,run()/call()
new Thread(new Runnable()).start();
new Thread(new FutureTask
new Thread(new FutureTAsk
Runnable
接口的实现类FutureTask
的构造方法FutureTask(Callable<V> callable)
:创建一个FutureTask
,一旦运行就执行给定的 Callable
。代码测试
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { // new Thread(new Runnable()).start(); // new Thread(new FutureTask<V>()).start(); // new Thread(new FutureTAsk<V>(Callable)).start(); new Thread().start(); MyThread thread = new MyThread(); FutureTask futureTask = new FutureTask(thread); new Thread(futureTask,"A").start(); Integer o = (Integer) futureTask.get(); System.out.println(o); } } class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("call..."); return 1; } } /* 1. 有缓存 2. 结果可能需要等待,会阻塞。 */
import java.util.concurrent.CountDownLatch; // 减法计数器 public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { // 总数是 6, 必须要执行任务的时候再使用 CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName() + "Go out"); countDownLatch.countDown(); // 数量 -1 },String.valueOf(i)).start(); } countDownLatch.await();// 等待计数器归零,然后才向下执行 System.out.println("Close door"); } } /* countDownLatch.countDown(); // 数量 -1 countDownLatch.await();// 等待计数器归零,然后才向下执行 每次有线程调用 countDown() 数量 -1, 当计数器变为0时, countDownLatch.await()就会被唤醒继续向下执行 如果线程数不能满足要求程序会一直等待 如果线程数超过要求,满足要求后会继续向下执行,且后续线程也会执行完毕。 */
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; // 加法计数器 public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{ System.out.println("Success"); }); for (int i = 1; i <= 8; i++) { final int temp = i; // 不加 final lambda 表达式拿不到 i try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(()-> { System.out.println(Thread.currentThread().getName() + "是第" + temp + "条线程"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } } /* 1. 如果线程数不能满足要求程序会一直等待 2. 如果线程数超过要求,满足要求后会继续向下执行,且后续线程也会执行完毕。 */
原理:
semaphore.acquire();
获得,假设如果已经满了,等待,等待被释放为止!
semaphore.release();
释放,会将当前的信号量释放+1,然后唤醒等待的线程!作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; // 计数信号量 public class SemaphoreDemo { public static void main(String[] args) { // 线程数量:停车位 Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i++) { new Thread(() -> { try { semaphore.acquire(); // 得到 System.out.println(Thread.currentThread().getName() + "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + "离开车位"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); // 释放 } }, String.valueOf(i)).start(); } } }
ReadWriteLock
读-读: 可以共存
读-写: 不能共存
写-写: 不能共存
独占锁(写锁): 一次只能被一个线程占有
共享锁(读锁): 多个线程可以同时占有
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockDemo { public static void main(String[] args) { // 不加锁 MyCache myCache = new MyCache(); for (int i = 1; i <= 5; i++) { final int tmp = i; new Thread(()->{ myCache.put(tmp + " ", tmp + " "); },String.valueOf(tmp)).start(); } for (int i = 1; i <= 5; i++) { final int tmp = i; new Thread(()->{ myCache.get(tmp + " "); },String.valueOf(tmp)).start(); } // 休眠 1 秒 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("========================="); // 读写锁 MyCacheLock myCacheLock = new MyCacheLock(); for (int i = 1; i <= 5; i++) { final int tmp = i; new Thread(()->{ myCacheLock.put(tmp + " ", tmp + " "); },String.valueOf(tmp)).start(); } for (int i = 1; i <= 5; i++) { final int tmp = i; new Thread(()->{ myCacheLock.get(tmp + " "); },String.valueOf(tmp)).start(); } } } class MyCache { private volatile Map<String, Object> map = new HashMap<>(); public void put(String key, Object value) { System.out.println(Thread.currentThread().getName() + "写入" + key); map.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完成" + key); } public void get(String key) { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成" + key); } } class MyCacheLock { private volatile Map<String,Object> map2 = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put(String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入" + key); map2.put(key, value); System.out.println(Thread.currentThread().getName() + "写入完成" + key); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "读取" + key); Object o = map2.get(key); System.out.println(Thread.currentThread().getName() + "读取完成" + key); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
写入:如果队列满了,就必须阻塞等待
取:如果是队列是空的,必须阻塞等待生产
方式 | 抛出异常 | 有返回值 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add("a") | offer("a") | put("a") | offer("a",2,TimeUnit.SECONDS) |
移除 | remove() | poll() | take() | poll(2,TimeUnit.SECONDS) |
检测队列首尾 | element() | peek() | - | - |
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class BlockQueueTest { public static void main(String[] args) throws InterruptedException { System.out.println("############### test01() #####################"); test01(); System.out.println("############### test02() #####################"); test02(); System.out.println("############### test03() #####################"); test03(); System.out.println("############### test04() #####################"); test04(); } /* 会报出异常: 1. // Queue full 2. // java.util.NoSuchElementException */ public static void test01() { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); // true System.out.println(blockingQueue.add("b")); // true System.out.println(blockingQueue.add("c")); // true // System.out.println(blockingQueue.add("d")); // Queue full // 队首 System.out.println(blockingQueue.element()); // a System.out.println("==========================="); System.out.println(blockingQueue.remove()); // a System.out.println(blockingQueue.remove()); // b System.out.println(blockingQueue.remove()); // c // System.out.println(blockingQueue.remove()); // java.util.NoSuchElementException } /* 有返回值,没有异常 */ public static void test02() { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); // true System.out.println(blockingQueue.offer("b")); // true System.out.println(blockingQueue.offer("c")); // true System.out.println(blockingQueue.offer("d")); // false System.out.println(blockingQueue.peek()); // a System.out.println("==========================="); System.out.println(blockingQueue.poll()); // a System.out.println(blockingQueue.poll()); // b System.out.println(blockingQueue.poll()); // c System.out.println(blockingQueue.poll()); // null } /* 阻塞等待(一直阻塞) */ public static void test03() throws InterruptedException { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); // void blockingQueue.put("b"); // void blockingQueue.put("c"); // void // blockingQueue.put("d"); // 一直阻塞,没有异常 System.out.println("==========================="); System.out.println(blockingQueue.take()); // a System.out.println(blockingQueue.take()); // b System.out.println(blockingQueue.take()); // c // System.out.println(blockingQueue.take()); // 一直阻塞,没有异常 } /* 等待2秒,超时结束等待,有返回值 */ public static void test04() throws InterruptedException { ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.offer("a")); // true System.out.println(blockingQueue.offer("b")); // true System.out.println(blockingQueue.offer("c")); // true System.out.println(blockingQueue.offer("d",2, TimeUnit.SECONDS)); // false System.out.println("==========================="); System.out.println(blockingQueue.poll()); // a System.out.println(blockingQueue.poll()); // b System.out.println(blockingQueue.poll()); // c System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); // null } }
StnchronousQueue 同步队列
没有容量,进去一个元素,必须等取出来之后,才能再往里放一个元素
/* 同步队列 和其他BlockingQueue 不一样,SynchronousQueue 不存储元素 put 了一个元素,必须从里面先 take 出来,否则不能再 put 进去值。 */ import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName() + " put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " --> " + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " --> " + blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " --> " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
池化技术是一种很常见的编程技巧,在请求量大时能明显优化应用性能,降低系统频繁建连的资源开销。我们日常工作中常见的有数据库连接池、线程池、对象池等,它们的特点都是将 “昂贵的”、“费时的” 的资源维护在一个特定的 “池子” 中,规定其最小连接数、最大连接数、阻塞队列等配置,方便进行统一管理和复用,通常还会附带一些探活机制、强制回收、监控一类的配套功能。
线程池:JDK1.5之后提供的
java.util.concurrent.Executors: 线程池的工厂类,用来生成线程池
线程池的使用步骤:
package com.lee.demo12_pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; public class Demo01ThreadPool { public static void main(String[] args) throws InterruptedException { // 创建线程池的三大方法 ExecutorService threadPool1 = Executors.newSingleThreadExecutor(); // 单个线程 ExecutorService threadPool2 = Executors.newFixedThreadPool(5);// 创建一个固定大小的线程池 ExecutorService threadPool3 = Executors.newCachedThreadPool();// 可变的 // 周期性线程池 ScheduledExecutorService threadPool4 = Executors.newScheduledThreadPool(4); try { for (int i = 0; i < 10; i++) { // 使用线程池来创建线程 threadPool1.execute(()->{ System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool1.shutdown(); } Thread.sleep(1000); System.out.println("========================"); try { for (int i = 0; i < 10; i++) { // 使用线程池来创建线程 threadPool2.execute(()->{ System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool2.shutdown(); } Thread.sleep(1000); System.out.println("========================"); try { for (int i = 0; i < 10; i++) { // 使用线程池来创建线程 threadPool3.execute(()->{ System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool3.shutdown(); } Thread.sleep(1000); System.out.println("========================"); try { for (int i = 0; i < 10; i++) { // 使用线程池来创建线程 threadPool4.execute(()->{ System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool4.shutdown(); } } }
// 三大方法 // newSingleThreadExecutor public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // newCachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 都源于 ThreadPoolExecutor
【阿里开发手册】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象的弊端如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // this 即为下方内容 /** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- int corePoolSize ——核心线程大小
- int maximumPoolSize —— 最大核心线程池大小
- long keepAliveTime —— 超时不被调用就会释放
- TimeUnit unit —— 超时单位
- BlockingQueue
workQueue —— 阻塞队列 - ThreadFactory threadFactory —— 线程工厂,创建线程的,一般不动
- RejectedExecutionHandler handler) —— 拒绝策略
package com.lee.demo12_pool; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Demo02ThreadPool { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 2, // corePoolSize 5, // maximumPoolSize 3, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 银行满了,还有人进来,不处理这个人的,抛出异常 /* 银行有2个长时间开启的窗口, 总共有5个窗口, 窗口超时等待的时间 3 单位为 秒 排队等待的队列长上限 3 默认的线程工厂(一般不修改)... (拒绝策略)银行满了,还有人进来,不处理这个人的,抛出异常 */ try { for (int i = 0; i < 9; i++) { // 使用线程池来创建线程 threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + "ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭线程池 threadPool.shutdown(); } } }
CallerRunsPolicy() // 队列满了,哪来的回哪去(主线程调用的话,就让主线程去执行) AbortPolicy() // 队列满了,拒绝处理, 抛出异常 DiscardPolicy() // 队列满了,丢掉任务不会抛出异常 DiscardOldestPolicy() // 队列满了,去和最早进来的去竞争,不会抛出异常
最大线程的定义
CPU 密集型 : 等于电脑核数
// 获取 CPU 核数 Runtime.getRuntime().availableProcessors(); // 12
IO 密集型 : 判断十分消耗IO的线程,设置为其倍数(2倍)
函数式接口
// Runnable @FunctionalInterface public interface Runnable { public abstract void run(); } // 编程中会有很多的 Functional Interface // 可以简化编程模型,在新版本框架底层大量应用 // foreach (消费者类的函数式接口)
public class Demo01 { public static void main(String[] args) { // Function<String, String> f1 = new Function<String, String>() { // @Override // public String apply(String str) { // return str; // } // }; Function<String, String> f1 = str -> str; System.out.println(f1.apply("asd")); } }
/* 断定型接口, 有一个输入参数,返回值只能是 布尔值 */ public class Demo02 { public static void main(String[] args) { // 判断字符串是否为空 // Predicate<String> predicate = new Predicate<String>() { // @Override // public boolean test(String str) { // return str.isEmpty(); // } // }; // Predicate<String> predicate = ((str) -> str.isEmpty()); Predicate<String> predicate = (String::isEmpty); System.out.println(predicate.test("abc")); // false } }
/* Consumer 消费性接口 只有输入没有返回值 */ public class Demo03 { public static void main(String[] args) { // Consumer<String> consumer = new Consumer<String>() { // @Override // public void accept(String str) { // System.out.println(str); // } // }; Consumer<String> consumer = System.out::println; consumer.accept("what"); } }
/* Supplier 供给型接口没有参数,只有返回值 */ public class Demo04 { public static void main(String[] args) { // Supplier<String> supplier = new Supplier<String>() { // @Override // public String get() { // return "Hello Supplier"; // } // }; Supplier<String> supplier = () -> "Hello Supplier"; System.out.println(supplier.get()); } }
/* 链式编程 1. ID必须是偶数 2. 年龄必须大于23岁 3. 用户名转为大写字母 4. 用户名字母倒着排序 5. 只输出一个用户! */ public class Test { public static void main(String[] args) { User user1 = new User(1, "abc", 21); User user2 = new User(2, "b", 22); User user3 = new User(3, "b", 23); User user4 = new User(4, "d", 24); User user5 = new User(5, "e", 25); User user6 = new User(6, "fgh", 26); // 集合就是存储 List<User> list = Arrays.asList(user1, user2, user3, user4, user5, user6); // 计算交给 Stream 流 // 1. ID必须是偶数 System.out.println("-------- 1. ID必须是偶数"); list.stream().filter(user -> user.getId()%2 == 0).forEach(System.out::println); // 2. 年龄必须大于23岁 System.out.println("-------- 2. 年龄必须大于23岁"); list.stream().filter(user -> user.getAge() > 23).forEach(System.out::println); // 3. 用户名转为大写字母 System.out.println("-------- 3. 用户名转为大写字母"); list.stream().map(user -> user.getName().toUpperCase()).forEach(System.out::println); // 4. 用户名字母倒着排序 System.out.println("-------- 4. 用户名字母倒着排序"); list.stream().sorted((u1,u2) -> (u2.getName().compareTo(u1.getName()))).forEach(System.out::println); // 5. 只输出一个用户! System.out.println("-------- 5. 只输出一个用户!"); list.stream().limit(1).forEach(System.out::println); // 五个要求同时满足 list.stream() .filter(user -> user.getId()%2 == 0) .filter(user -> user.getAge() > 23) .map(user -> user.getName().toUpperCase()) .sorted((u1,u2) -> (u2.compareTo(u1))) // .sorted(Comparator.reverseOrder()) .limit(1) .forEach(System.out::println); } }
- RecursiveAction : 递归事件,没有返回值
- RecursiveTask : 递归任务,有返回值
package com.lee.demo15_forkjoin; import java.util.concurrent.RecursiveTask; public class ForkJoinDemo extends RecursiveTask<Long> { private long start; private long end; // 临界值 private long temp = 100000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { if ((end - start) < temp) { long sum = 0L; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long mid = start + (end - start >> 1); ForkJoinDemo task1 = new ForkJoinDemo(start, mid); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(mid + 1, end); task2.fork(); // 拆分任务,把任务压入线程队列 return task1.join() + task2.join(); } } }
比较三种方法(直接加和、forkjoin、Stream 流)各自运行10次 1到10亿的加和所耗费的时间:
package com.lee.demo15_forkjoin; import java.awt.event.FocusEvent; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; // 求 1到 10亿的加和 public class BillionSumTest { public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("使用三种方法各自运行10次 1到10亿的加和:"); test1(); test2(); test3(); } // 直接加和 (运行10遍) public static void test1() { long startTime = System.currentTimeMillis(); for (int j = 0; j < 10; j++) { long sum = 0L; for (long i = 0L; i <= 10_0000_0000L; i++) { sum += i; } if (j == 9) { System.out.println("sum1 = " + sum); } } long endTime = System.currentTimeMillis(); System.out.println("直接加和运行时间为 : " + (endTime - startTime)); } // forkjoin (运行10遍) public static void test2() throws ExecutionException, InterruptedException { long startTime = System.currentTimeMillis(); for (int j = 0; j < 10; j++) { ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); long sum = submit.get(); if (j == 9) { System.out.println("sum2 = " + sum); } } long endTime = System.currentTimeMillis(); System.out.println("forkjoin运行时间为 : " + (endTime - startTime)); } // Stream (运行10遍) public static void test3() { long startTime = System.currentTimeMillis(); for (int j = 0; j < 10; j++) { long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum); if (j == 9) { System.out.println("sum3 = " + sum); } } long endTime = System.currentTimeMillis(); System.out.println("Stream 流计算运行时间为 : " + (endTime - startTime)); } }
结果如下:
使用三种方法各自运行10次 1到10亿的加和: sum1 = 500000000500000000 直接加和运行时间为 : 5184 sum2 = 500000000500000000 forkjoin运行时间为 : 2070 sum3 = 500000000500000000 Stream 流计算运行时间为 : 1200
forkjoin特点: 工作窃取(使用双端队列维护)
没有返回值的 runAsync 异步回调
一般线程任务 ForkJoinPool.commonPool-worker-9runAsync --> Void
有返回值的 supplyAsync 异步回调
正常执行
ForkJoinPool.commonPool-worker-9CompletableFuture.supplyAsync t --> 1024 u --> null 1024
有异常
ForkJoinPool.commonPool-worker-9CompletableFuture.supplyAsync t --> null u --> java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero 2333
package com.lee.demo16_completablefuture; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; public class Demo01RunAsync { public static void main(String[] args) throws ExecutionException, InterruptedException { runAsyncTest(); System.out.println("------------------------"); supplyAsyncTest(); } // 没有返回值的 runAsync 异步回调 public static void runAsyncTest() throws ExecutionException, InterruptedException { // 注意 <Void> CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "runAsync --> Void"); }); System.out.println("一般线程任务"); completableFuture.get(); } // 有返回值的 supplyAsync 异步回调 public static void supplyAsyncTest() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + "CompletableFuture.supplyAsync"); int i = 1/0; return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { System.out.println("t --> " + t); // t --> 1024 System.out.println("u --> " + u); // u --> null }).exceptionally((e) -> { System.out.println(e.getMessage()); return 2333; }).get()); /* 正常执行: ForkJoinPool.commonPool-worker-9CompletableFuture.supplyAsync t --> 1024 u --> null 1024 加入 int i = 1/0; ForkJoinPool.commonPool-worker-9CompletableFuture.supplyAsync t --> null u --> java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero java.lang.ArithmeticException: / by zero 2333 */ } }
由于主存与 CPU 处理器的运算能力之间有数量级的差距,所以在传统计算机内存架构中会引入高速缓存来作为主存和处理器之间的缓冲,CPU 将常用的数据放在高速缓存中,运算结束后 CPU 再讲运算结果同步到主存中。
使用高速缓存解决了 CPU 和主存速率不匹配的问题,但同时又引入另外一个新问题:缓存一致性问题
在多CPU的系统中(或者单CPU多核的系统),每个CPU内核都有自己的高速缓存,它们共享同一主内存(Main Memory)。当多个CPU的运算任务都涉及同一块主内存区域时,CPU 会将数据读取到缓存中进行运算,这可能会导致各自的缓存数据不一致。因此需要每个 CPU 访问缓存时遵循一定的协议,在读写数据时根据协议进行操作,共同来维护缓存的一致性。这类协议有 MSI、MESI、MOSI、和 Dragon Protocol 等。
为了使处理器内部的运算单元能够最大化被充分利用,处理器会对输入代码进行乱序执行处理,这就是处理器优化。
除了处理器会对代码进行优化处理,很多现代编程语言的编译器也会做类似的优化,比如像 Java 的即时编译器(JIT)会做指令重排序。
处理器优化其实也是重排序的一种类型,这里总结一下,重排序可以分为三种类型:
关于JMM的一些同步的约定:
JMM的三个特征:
如果从更深层次看这三个问题,其实就是『缓存一致性』、『处理器优化』、『指令重排序』造成的。
lock:锁定。作用于主内存的变量,把一个变量标识为一条线程独占状态。
unlock:解锁。作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read:读取。作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
load:载入。作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
use:使用。作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
assign:赋值。作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
store:存储。作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
write:写入。作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。
改变j的值;
read -- load
use -- assign
write -- store
lock -- unlock
如上图所示,本地内存A和B有主内存中共享变量x的副本。假设初始时,这三个内存中的x值都为0。线程A在执行时,把更新后的x值(假设值为1)临时存放在自己的本地内存A中。当线程A和线程B需要通信时,线程A首先会把自己本地内存中修改后的x值刷新到主内存中,此时主内存中的x值变为了1。随后,线程B到主内存中去读取线程A更新后的x值,此时线程B的本地内存的x值也变为了1。
从整体来看,这两个步骤实质上是线程A在向线程B发送消息,而且这个通信过程必须要经过主内存。JMM通过控制主内存与每个线程的本地内存之间的交互,来为java程序员提供内存可见性保证
Volatile是Java虚拟机提供轻量级的同步机制
保证可见性
public class Demo01Volatile { private volatile static int num = 0; // 不加 volatile 就会陷入死循环 public static void main(String[] args) { new Thread(() -> { while (num == 0) { } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; } }
不保证原子性
原子性:ACID不可分割
public class Demo02 { private volatile static int num = 0; // 加上 volatile 依然不能保证原子性 public /*synchronized*/ static void add() { // lock 和 synchronized 都可以保证原子性 num++; } public static void main(String[] args) { // 理论上结果为 200000 for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { add(); } }).start(); } while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + num); } }
num++
不是原子性的操作,所以不安全
不使用 lock 和 synchronized 如何保证原子性
package com.lee.demo17_jmm; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class Demo03 { private /*volatile*/ static AtomicInteger num = new AtomicInteger(); public /*synchronized*/ static void add() { // lock 和 synchronized 都可以保证原子性 num.getAndIncrement(); // AtomicInteger + 1 方法 CAS } public static void main(String[] args) { for (int i = 0; i < 20; i++) { new Thread(() -> { for (int j = 0; j < 1000; j++) { add(); } }).start(); } while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println(Thread.currentThread().getName() + num); } }
volatile 可以避免指令重排(内存屏障)
源代码->编译器优化的重排->指令并行也可能会重排->内存系统也会重排…>执行
(1)类加载检查:
虚拟机遇到一条 new 指令时,首先将去检查这个指令的参数是否能在常量池中定位到这个类的符号引用,并且检查这个符号引用代表的类型是否已被加载过、解析和初始化过。如果没有,那必须先执行相应地类加载过程
(2)分配内存:
在类加载检查通过后,接下来虚拟机将为新生对象分配内存。对象所需的内存大小在类加载完成后便可确定,为对象分配空间的任务等同于把一块确定大小的内存从 Java 堆中划分出来。分配方式有 “指针碰撞” 和 “空闲列表” 两种,选择那种分配方式由 Java 堆是否规整决定,而 Java 堆是否规整又有所采用垃圾收集器是否带有压缩整理功能决定的。
(3)初始化零值:
内存分配完成后,虚拟机需要将分配到的内存空间都初始化为零值(不包括对象头),这一步操作保证了对象的实例字段在 Java 代码中可以不赋初始值就直接使用,程序能访问到这些字段的数据类型所对应的零值。
(4)设置对象头:
初始化零值完成之后,虚拟机要对对象进行必要的设置,例如这个对象是那个类的实例、如何才能找到类的元数据信息、对象的哈希码、对象的 GC 分代年龄等信息。 这些信息存放在对象头中。 另外,根据虚拟机当前运行状态的不同,如是否启用偏向锁等,对象头会有不同的设置方式
(5)执行init()方法:
在上面工作都完成之后,从虚拟机的视角来看,一个新的对象已经产生了,但从Java 程序的视角来看,对象创建才刚开始, 方法还没有执行,所有的字段都还为零。所以一般来说,执行 new指令之后会接着执行 方法,把对象按照程序员的意愿进行初始化,这样一个真正可用的对象才算完全产生出来。
synchronized的特点
一个线程执行互斥代码过程如下:
所以,synchronized既保证了多线程的并发有序性,又保证了多线程的内存可见性。
volatile是第二种Java多线程同步的手段,根据JLS的说法,一个变量可以被volatile修饰,在这种情况下内存模型确保所有线程可以看到一致的变量值
class Test { static volatile int i = 0, j = 0; static void one() { i++; j++; } static void two() { System.out.println("i=" + i + " j=" + j); } }
加上volatile可以将共享变量i和j的改变直接响应到主内存中,这样保证了i和j的值可以保持一致,然而我们不能保证执行two方法的线程是在i和j执行到什么程度获取到的,所以volatile可以保证内存可见性,不能保证并发有序性(不具有原子性)。
如果没有volatile,则代码执行过程如下:
- CAS compareAndSet : 比较并交换
- unsafe 类
// CAS public class CASDemo { // CAS compareAndSet : 比较并交换 public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2); System.out.println(atomicInteger.compareAndSet(2, 3)); // true System.out.println(atomicInteger.get()); // 3 System.out.println(atomicInteger.compareAndSet(3, 4)); // true System.out.println(atomicInteger.get()); // 4 } }
CAS:比较当前工作内存中的值和主内存中的值,如果这个值是期望的,那么则执行操作!如果不是就一直循环!
缺点:
package com.lee.demo18_aba; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicStampedReference; // ABA public class ABADemo { static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(2,1); public static void main(String[] args) { // test1(); test2(); } public static void test1() { AtomicInteger atomicInteger = new AtomicInteger(2); System.out.println(atomicInteger.compareAndSet(2, 3)); // true System.out.println(atomicInteger.get()); // 3 System.out.println(atomicInteger.compareAndSet(3, 2)); // true System.out.println(atomicInteger.get()); // 2 System.out.println(atomicInteger.compareAndSet(2, 4)); // true System.out.println(atomicInteger.get()); // 4 } public static void test2() { new Thread(() -> { // 获得版本号 int stamp = atomicStampedReference.getStamp(); System.out.println("a1 - - > " + stamp); try { TimeUnit.SECONDS.sleep(1); // 设为 2 结果不同 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(2, 3, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a2 - - > " + atomicStampedReference.getStamp()); System.out.println(atomicStampedReference.compareAndSet(3, 2, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1)); System.out.println("a3 - - > " + atomicStampedReference.getStamp()); }, "a").start(); new Thread(() -> { // 获得版本号 int stamp = atomicStampedReference.getStamp(); System.out.println("b1 - - > " + stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(atomicStampedReference.compareAndSet(2, 6, stamp, stamp + 1)); System.out.println("b2 - - > " + atomicStampedReference.getStamp()); }, "b").start(); } } /* a1 - - > 1 b1 - - > 1 true a2 - - > 2 true a3 - - > 3 false b2 - - > 3 */ // a 线程 TimeUnit.SECONDS.sleep(2) 时; // a b 各自占用了1次 /* b 抢占 a1 - - > 1 b1 - - > 1 true false a2 - - > 2 false a3 - - > 2 b2 - - > 2 或者 a 抢占 a1 - - > 1 b1 - - > 1 false true a2 - - > 2 true a3 - - > 3 b2 - - > 2 */
synchronized
package com.lee.demo19_lock.reentry; public class Demo01Syn { public static void main(String[] args) { Phone phone = new Phone(); // phone.send(); new Thread(phone::send,"A").start(); new Thread(phone::send,"B").start(); } } class Phone { public synchronized void send() { System.out.println(Thread.currentThread().getName() + " send"); call(); } public synchronized void call() { System.out.println(Thread.currentThread().getName() + " call"); } }
lock
package com.lee.demo19_lock.reentry; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Demo02Lock { public static void main(String[] args) throws InterruptedException { Phone2 phone2 = new Phone2(); new Thread(phone2::send, "C").start(); TimeUnit.SECONDS.sleep(2); new Thread(phone2::send, "D").start(); } } class Phone2 { public void send() { Lock lock = new ReentrantLock(); lock.lock(); lock.lock(); try { System.out.println(Thread.currentThread().getName() + " send"); call(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); lock.unlock(); } } public void call() { Lock lock = new ReentrantLock(); lock.lock(); try { System.out.println(Thread.currentThread().getName() + " call"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
package com.lee.demo19_lock.spinlock; /* 自旋锁 */ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; public class Demo01 { public static void main(String[] args) { // ReentrantLock reentrantLock = new ReentrantLock(); // reentrantLock.lock(); // reentrantLock.unlock(); SpinLock lock = new SpinLock(); new Thread(() -> { lock.myLock(); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.myUnlock(); } }, "A").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { lock.myLock(); try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.myUnlock(); } }, "B").start(); } } class SpinLock { AtomicReference<Thread> atomicReference = new AtomicReference<>(); // 加锁 public void myLock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + " --> myLock"); // 自旋锁 while (!atomicReference.compareAndSet(null, thread)) { } } // 解锁 public void myUnlock() { Thread thread = Thread.currentThread(); System.out.println(Thread.currentThread().getName() + " --> myUnlock"); atomicReference.compareAndSet(thread, null); } }
A --> myLock (00:00:00)
B --> myLock (00:00:01)
A --> myUnlock(00:00:04)
B --> myUnlock(00:00:08)(00:00:01)~(00:00:01)B锁进入自旋
package com.lee.demo19_lock.deadlock; import java.util.concurrent.TimeUnit; public class DeadLock { public static void main(String[] args) { String lockA = "lockA"; String lockB = "lockB"; new Thread(new MyThread(lockA, lockB), "T1").start(); new Thread(new MyThread(lockB, lockA), "T2").start(); } } class MyThread implements Runnable { private String lockA; private String lockB; public MyThread(String lockA, String lockB) { this.lockA = lockA; this.lockB = lockB; } @Override public void run() { synchronized (lockA) { System.out.println(Thread.currentThread().getName() + "lock: " + lockA + " - - > get " + lockB); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockB) { System.out.println(Thread.currentThread().getName() + "lock: " + lockB + " - - > get " + lockA); } } } }
打开终端:
jps -l
jstack XXXXX