本文主要总结下工作中遇到的forkjoin线程池的使用场景,对大量小文件的磁盘IO优化
公司大数据部服务器存有海量非结构化数据,文件格式是xml,量级千万级,需要编写java程序解析xml,单个文件只有几百KB到几MB ,j使用传统的递归逻辑遍历磁盘目录时文件处理速度非常慢,后选择使用forkjoin进行了速度上的优化。
关于forkjoin线程池的概念,此处不再一一赘述,其他博客已经写的很清楚了,主要是任务的拆分,即大的任务会被划分成小的任务,还有是工作窃取。
现在分别用递归和forkjoin对目录 C:/Windows遍历,文件操作为打印路径,对比两种方法的结果。
import java.io.File; import java.util.concurrent.atomic.AtomicLong; public class DiguiTest { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) { long start = System.currentTimeMillis(); digui(new File("C:/Windows")); System.out.println("花费:"+(System.currentTimeMillis()-start)/1000.0+"秒"); } private static void digui(File file) { if (file != null) { if (file.isDirectory()) { File[] files = file.listFiles(); if (files!=null) for (File son :files) { digui(son); } } else { System.out.println(counter.incrementAndGet()+"==>"+file.getAbsolutePath()); } } } }
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Test { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkjoinPool = new ForkJoinPool(6); File file = new File("C:/Windows"); MyTask task = new MyTask(file); forkjoinPool.invoke(task); // 停止接收任务 forkjoinPool.shutdown(); //等待任务执行结束 forkjoinPool.awaitTermination(2, TimeUnit.SECONDS); System.out.println("花费:" + (System.currentTimeMillis() - start) / 1000.0 + "秒"); } public static class MyTask extends RecursiveAction { private File path; // 当前要搜索的目录 public MyTask(File path) { this.path = path; } @Override protected void compute() { try { // 定义一个文件目录集合 List<MyTask> subTasks = new ArrayList<>(); // 根据当前要搜索目录的,找到所有的文件 File[] files = path.listFiles(); // 判断是否为空,不为空则继续往下搜索 if (null != files) { for (File file : files) { // 判断是否是目录 if (file.isDirectory()) { subTasks.add(new MyTask(file)); // System.out.println(" 目录: " + file.getAbsolutePath()); } else { // 判断不是目录,则是文件 // todo 处理文件 此处打印 System.out.println(counter.incrementAndGet() + "==>" + file.getAbsolutePath()); } } // 判断集合是否为空 if (null != subTasks && subTasks.size() > 0) { for (MyTask subTask : invokeAll(subTasks)) { // 等待子任务完成 subTask.join(); } } } } catch (Exception e) { e.printStackTrace(); } } } }
可见forkjoin线程池在处理大量小文件时,遍历目录比传统递归要快很多.
注意: forkjoin线程池的线程数并不是越大速度越快。
上述的forkjoin示例是无返回,只需要打印下文件路径,并不需要子目录对上层目录传递计算结果。
新的需求: 遍历目录 C:/Windows, 将所有文件路径保存到mysql的一张表中,为了提高写入效率,每1000条路径作为一个批次,写入mysql。
RecursiveTask后的泛型是返回值类型
import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class Main { private static AtomicLong counter = new AtomicLong(0L); public static void main(String[] args) throws InterruptedException { ForkJoinPool forkjoinPool = new ForkJoinPool(8); File file = new File("C:/Windows"); Task task = new Task(file); List<String> list = forkjoinPool.invoke(task); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束 // 关闭线程池 forkjoinPool.shutdown(); // todo 缓存中不足1000条的剩余数据处理, 此处写入mysql逻辑省略 System.out.println(counter.addAndGet(list.size())); } private static class Task extends RecursiveTask<List<String>> { private File file; public Task(File file) { this.file = file; } @Override protected List<String> compute() { List<String> buffer = new ArrayList<>(); if (file != null) { if (file.isDirectory()) { File[] sons = file.listFiles(); if (sons != null && sons.length > 0) { for (File son : sons) { Task task = new Task(son); invokeAll(task); List<String> join = task.join(); buffer.addAll(join); if (buffer.size() > 1000) { // todo 缓存批处理, 写入mysql逻辑省略 System.out.println(counter.addAndGet(buffer.size())); buffer.clear(); } } } } else { buffer.add(file.getAbsolutePath()); // counter.incrementAndGet(); } } return buffer; } } }
可见,ForkJoin有返回值的批处理和无返回值的处理数据条数相同。