package pers.aishuang.flink.streaming.task; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; public class FileSystemTest { public static void main(String[] args) throws Exception{ //Configuration conf = new Configuration(); //conf.set("fs.defaultFS","hdfs://node01:8020/"); //FileSystem fileSystem = FileSystem.get(conf); FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(),"root"); RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(new Path("/test/"), true); Path outPath = new Path("/fileSystemData02/"); BufferedWriter writer; FSDataOutputStream out = fileSystem.create(outPath); FSDataInputStream in; while (itr.hasNext()){ LocatedFileStatus next = itr.next(); Path path = next.getPath(); in = fileSystem.open(path); BufferedReader reader = new BufferedReader(new InputStreamReader(in, "utf-8")); writer = new BufferedWriter(new OutputStreamWriter(out, "utf-8")); String line; while((line = reader.readLine()) != null) { writer.write(line); writer.newLine(); writer.flush(); } in.close(); } out.close(); } }
结果: