当多线程并发修改一个集合数据时,可能同一个下标位置被覆盖。
示例代码:
一个List,我们创建10个线程,每个线程往这个List中添加1000条数据,结果往往不是预期的10000个大小:import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListNotSafe * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolArrayListNotSafe { public static void main(String[] args) throws InterruptedException { /** * 存放数据的集合 */ List<Integer> nums = new ArrayList<>(); /** * 随机数类 */ Random random = new Random(); /** * 线程池 */ ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 线程池提交任务类 */ for (int j=0; j<10; j++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { for (int i=0; i<1000; i++){ nums.add(random.nextInt()); } } }); System.out.println((j+1) + "千次提交"); } /** * 等待执行结束 */ forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); /** * 关闭提交入口 */ forkJoinPool.shutdown(); /** * 查看执行结果 */ System.out.println("计算结果:num.size():" + nums.size()); } }
将非线程安全集合转为线程安全集合(底层实现逻辑:synchronized 效果变为串行)Collctions提供了如下几个静态方法
如上示例代码包装后:
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListSynchornized * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolArrayListSynchornized { public static void main(String[] args) throws InterruptedException { /** * 集合包装类包装线程不安全集合 */ List<Integer> nums = Collections.synchronizedList(new ArrayList<>()); Random random = new Random(); ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 提交多线程任务向集合添加1万个元素 */ for (int j=0; j<10; j++){ for (int i=0; i<1000; i++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { nums.add(random.nextInt()); } }); } } /** * 等待执行结果 */ forkJoinPool.awaitTermination(1, TimeUnit.SECONDS); forkJoinPool.shutdown(); System.out.println("num.size():" + nums.size()); } }
如上示例用CopyOnWriteArrayList代替
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolArrayListSynchornized * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolCopyOnWriteArrayList { public static void main(String[] args) throws InterruptedException { /** * 集合包装类包装线程不安全集合 */ List<Integer> nums = new CopyOnWriteArrayList<>(); Random random = new Random(); ForkJoinPool forkJoinPool = new ForkJoinPool(); /** * 提交多线程任务向集合添加1万个元素 */ for (int j=0; j<10; j++){ for (int i=0; i<1000; i++){ forkJoinPool.submit(new RecursiveAction() { @Override protected void compute() { nums.add(random.nextInt()); } }); } } /** * 等待执行结果 */ forkJoinPool.awaitTermination(1, TimeUnit.SECONDS); forkJoinPool.shutdown(); System.out.println("num.size():" + nums.size()); } }
测试ConcurrentHashMap:
import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; /** * @ClassName ForkJoinPoolConcurrentHashMapTest * @projectName: object1 * @author: Zhangmingda * @description: XXX * date: 2021/4/28. */ public class ForkJoinPoolConcurrentHashMapTest { public static void main(String[] args) throws InterruptedException { Map<String,Integer> persons = new ConcurrentHashMap<>(); ForkJoinPool pool = new ForkJoinPool(); Random random = new Random(); for (int i=0; i<10; i++){ for (int j=0; j<1000; j++) { pool.submit(new RecursiveAction() { @Override protected void compute() { persons.put("random:" + random.nextInt(), random.nextInt()); } }); } } pool.awaitTermination(15, TimeUnit.MILLISECONDS); persons.forEach((k,v) -> System.out.println(k + "=" +v)); pool.shutdown(); System.out.println("persons.size:" + persons.size()); } }