文件IO中,常用的方法如下方代码中的readMethod1~8方法所示。
测试了2.5M读100次、100M读3次、250M读1次三种情况,耗时(单位:毫秒)如下:
2.5M读100次 | 2.5M读100次 | 100M读3次 | 100M读3次 | 250M读1次 | 250M读1次 | |
---|---|---|---|---|---|---|
普通 | HDFS | 普通 | HDFS | 普通 | HDFS | |
method1,一次性全部读取 | 635 | 1604 | 976 | 965 | 1270 | 482 |
method2,ByteArrayOutputStream+byte[] | 616 | 5759 | 669 | 5135 | 843 | 4375 |
method3,InputStreamReader+char[8192]+StringWriter | 1236 | 5097 | 1454 | 4370 | 1167 | 3976 |
method4,InputStreamReader+BufferedReader(char[8192]) | 1565 | 4556 | 1986 | 4763 | 1608 | 3230 |
method5,bufferedReader+stream | 1414 | 4167 | 62546 | 140485 | - | - |
method6,bufferedReader+stream+parallel | 1941 | 4526 | OOM | OOM | OOM | OOM |
method7,Deque<byte[8196]> | 628 | 5331 | 761 | 4456 | 669 | 3321 |
method8,ByteBuffer(2048)+LineBuffer | 1910 | 5325 | 2310 | 4426 | 2300 | 3575 |
个人思考:
1、普通文件系统,使用char[]作为中间缓冲(method3~6),速度都比较慢,因为java的string底层是byte[],先转成char[],又转回byte[],会消耗多余的时间。
2、使用method6使用parallel并不能提升性能,因为底层InputStreamReader是加锁的,IO是不能并行的。
3、HDFS不会用,使用最朴素的连接方式,肯定是那里有问题,才会导致IO速度这么慢。但是好像一次性全部读取HDFS的速度,会随着文件的增大而相对更快。
4、最后method7是google guava库中的一种读取全部字符串的方法,脑洞大开,性能都还不错。
5、method3、4、5、6、8都是一行一行读取的模式,适用于需要对每一行进行后续处理的情况。
6、谨慎对读取全部字符串这种批作业使用流处理方式,速度很慢,parallel的甚至直接OOM。
7、总结下来,如果是读取文件中全部字符串,method2 和 method7都是比较不错的方式;如果是需要一行一行处理,则可能还是method4的BufferedReader性能更好。
package net.yury; import com.google.common.io.ByteStreams; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.file.tfile.ByteArray; import java.io.*; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Test1 { public static FileSystem fileSystem; static { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://myubuntu1:8020"); try { fileSystem = FileSystem.get(configuration); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { System.out.println("测试普通文件系统:"); testReadMethod(new InputStreamBuilder("NORMAL", "C:/Users/yury/Desktop/size100M.txt"), 3); System.out.println("====================================="); System.out.println("测试HDFS文件系统:"); testReadMethod(new InputStreamBuilder("HDFS", "/test1/size100M.txt"), 3); } public static class InputStreamBuilder { private String type; private String fileName; public InputStreamBuilder(String type, String fileName) { this.type = type; this.fileName = fileName; } public InputStream getInputStream () throws Exception { switch (type) { case "NORMAL": return new FileInputStream(fileName); case "HDFS": return fileSystem.open(new Path(fileName)); default: throw new Exception("unkonw file system"); } } } public static void testReadMethod(InputStreamBuilder builder, int n) throws Exception { long time1 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod1(builder.getInputStream()); } long time2 = System.currentTimeMillis(); System.out.println("method1,耗时:" + (time2 - time1) + " 直接读取"); long time3 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod2(builder.getInputStream()); } long time4 = System.currentTimeMillis(); System.out.println("method2,耗时:" + (time4 - time3) + " ByteArrayOutputStream+byte[]"); long time5 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod3(builder.getInputStream()); } long time6 = System.currentTimeMillis(); System.out.println("method3,耗时:" + (time6 - time5) + " InputStreamReader+char[8192]+StringWriter"); long time7 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod4(builder.getInputStream()); } long time8 = System.currentTimeMillis(); System.out.println("method4,耗时:" + (time8 - time7) + " InputStreamReader+BufferedReader(char[8192])"); long time9 = System.currentTimeMillis(); for (int i = 0; i < 100; i++) { readMethod5(builder.getInputStream()); } long time10 = System.currentTimeMillis(); System.out.println("method5,耗时:" + (time10 - time9) + " bufferedReader+stream"); long time11 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod6(builder.getInputStream()); } long time12 = System.currentTimeMillis(); System.out.println("method6,耗时:" + (time12 - time11) + " bufferedReader+stream+parallel"); long time13 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod7(builder.getInputStream()); } long time14 = System.currentTimeMillis(); System.out.println("method7,耗时:" + (time14 - time13) + " Deque<byte[8196]>"); long time15 = System.currentTimeMillis(); for (int i = 0; i < n; i++) { readMethod8(builder.getInputStream()); } long time16 = System.currentTimeMillis(); System.out.println("method8,耗时:" + (time16 - time15) + " ByteBuffer(2048)+LineBuffer"); } /** * 一次性全部读取 * 不建议使用 */ public static String readMethod1(InputStream inputStream) throws Exception { byte[] bytes = new byte[inputStream.available()]; int size = inputStream.read(bytes); String s = new String(bytes, 0, size, StandardCharsets.UTF_8); // System.out.println(s.length()); inputStream.close(); return s; } /** * 使用ByteArrayOutputStream+自定义缓冲区,缓冲区大小可以依据文件大小而定 * 本质:ByteArrayOutputStream在write数据时,会检测容量是否满足需求,若不满足需求则会扩容,直到InputStream读取完毕 * 最佳实践:可以使用new ByteArrayOutputStream(inputStream.available()); 这样可以避免扩容时产生的时间损耗;同时按照大小调整缓冲区大小。 */ public static String readMethod2(InputStream inputStream)throws Exception { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(inputStream.available()); byte[] buffer = new byte[1024 * 1024]; int len = 0; while ((len = inputStream.read(buffer)) > 0) { byteArrayOutputStream.write(buffer, 0, len); } String s = byteArrayOutputStream.toString(StandardCharsets.UTF_8); // System.out.println(s.length()); byteArrayOutputStream.close(); inputStream.close(); return s; } /** * 使用StringWriter+org.apache.commons.io.IOUtils.copy * 本质:该copy方法使用的InputStreamReader,每次读取char[8192]作为缓冲区,然后while循环写入StringBuffer * InputStreamReader是将字节流按照编码转换为字符流,read方法是按编码来读取字符,而不是读取字节。 * StringWriter底层是StringBuffer,StringBuffer底层还是byte[],若超过初始设定的长度,则进行扩容 * 关键代码:AbstractStringBuilder的683行append(char str[], int offset, int len)方法 */ public static String readMethod3(InputStream inputStream) throws Exception { StringWriter writer = new StringWriter(); IOUtils.copy(inputStream, writer, StandardCharsets.UTF_8); String s = writer.toString(); // System.out.println(s.length()); writer.close(); inputStream.close(); return s; } /** * 使用BufferedReader+while * 本质:BufferedReader是建立再InputStreamReader之上,读取char[8192]作为缓冲区 * readLine()方法则是将缓冲区上的字符按换行符处理成一行字符串后返回,若缓冲区读完了还没有换行符则继续读取下一批char[8192] * BufferedReader.readLine()适用于一行一行,并有后续操作的需求,而不是读取整个文件到字符串中 */ public static String readMethod4(InputStream inputStream) throws Exception { InputStreamReader reader = new InputStreamReader(inputStream); BufferedReader bufferedReader = new BufferedReader(reader); String s; StringBuilder sb = new StringBuilder(); while ((s = bufferedReader.readLine()) != null) { sb.append(s).append("\n"); } s = sb.toString(); // System.out.println(s.length()); bufferedReader.close(); reader.close(); inputStream.close(); return s; } /** * 使用bufferedReader+stream * 本质:lines()方法返回一个Stream,该流的数据由迭代器生成,迭代器方法还是readList() */ public static String readMethod5(InputStream inputStream) throws Exception { InputStreamReader reader = new InputStreamReader(inputStream); BufferedReader bufferedReader = new BufferedReader(reader); String s = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); // System.out.println(s.length()); bufferedReader.close(); reader.close(); inputStream.close(); return s; } /** * 使用bufferedReader+stream+parallel * 同上,只是使用parallel并行计算 */ public static String readMethod6(InputStream inputStream) throws Exception { InputStreamReader reader = new InputStreamReader(inputStream); BufferedReader bufferedReader = new BufferedReader(reader); String s = bufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator())); // System.out.println(s.length()); bufferedReader.close(); reader.close(); inputStream.close(); return s; } /** * 使用google的guava * 本质:独树一帜,不使用缓冲区,而是使用Deque<byte[8196]>作为接收byte的数据区,等全部接收完毕后,再整合成一个完整的byte[] * 注意guava 27.0版本的该方法还是beta方法,可能会存在潜在风险 */ public static String readMethod7(InputStream inputStream) throws Exception { String s = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8); return s; } /** * 使用google的guava的CharStreams.readLines()方法 * 本质:以ByteBuffer(2048)为缓冲区读取字符流,并使用LineBuffer作为行缓冲,底层是StringBuilder */ public static String readMethod8(InputStream inputStream) throws Exception { InputStreamReader reader = new InputStreamReader(inputStream); List<String> stringList = CharStreams.readLines(reader); StringBuilder sb = new StringBuilder(); for (String s : stringList) { sb.append(s).append("\n"); } String s = sb.toString(); // System.out.println(s.length()); reader.close(); inputStream.close(); return s; }