上传前先在启动HDFS,然后新建一个文件,文件名为tmp:
在web上查看是否新建(授权)成功:
打开idea:
package com.njbdqn.myhdfs.services; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.net.URI; public class UploadFileToHDFS { public static void main(String[] args) throws Exception{ Configuration cfg = new Configuration(); //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//这里只连上了windows系统提供的Hadoop环境,与Linux的环境无关 /*System.out.println(fs);*/ //上传文件 //获得上传文件的路径(要包含文件名) Path src = new Path("E:/mylog/log_20200102.log"); //上传的位置(HDFS路径) Path dst = new Path("/tmp"); //下达上传命令 fs.copyFromLocalFile(src,dst); fs.close(); } }
运行:
查看上传的位置:
JSON文件(半结构化数据)很难直接变成表格(结构化数据)。
可以用Java将JSON文件变为一种容易变为表格的文件,间接变为表格。----格式转化。
准备工作:
package com.njbdqn.myhdfs.services; import com.alibaba.fastjson.JSON; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileReader; import java.net.URI; public class UploadFileToHDFS { public static void main(String[] args) throws Exception{ Configuration cfg = new Configuration(); //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关 /*System.out.println(fs);*/ //文件转换 FileReader fis = new FileReader("E:/mylog/log_20200102.log"); BufferedReader bis = new BufferedReader(fis); String line = ""; while ((line=bis.readLine()) != null) { Info info = JSON.parseObject(line, Info.class); System.out.println(info.getGoodid()+","+info.getMachine().getMemory()); } bis.close(); fis.close(); } }
运行结果(获得文件E:/mylog/log_20200102.log里的信息):
新建几个class:
package com.njbdqn.myhdfs.services; public class Info { private Machine machine; private String actTime; private String actType; private String goodid; private String page; private String userid; private Browse browse; @Override public String toString() { return "Info{" + "machine=" + machine + ", actTime='" + actTime + '\'' + ", actType='" + actType + '\'' + ", goodid='" + goodid + '\'' + ", page='" + page + '\'' + ", userid='" + userid + '\'' + ", browse='" + browse + '\'' + '}'; } public Machine getMachine() { return machine; } public void setMachine(Machine machine) { this.machine = machine; } public String getActTime() { return actTime; } public void setActTime(String actTime) { this.actTime = actTime; } public String getActType() { return actType; } public void setActType(String actType) { this.actType = actType; } public String getGoodid() { return goodid; } public void setGoodid(String goodid) { this.goodid = goodid; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public String getUserid() { return userid; } public void setUserid(String userid) { this.userid = userid; } public Browse getBrowse() { return browse; } public void setBrowse(Browse browse) { this.browse = browse; } }
package com.njbdqn.myhdfs.services; public class Browse { private String browseType; private String browseVersion; public String getBrowseType() { return browseType; } public void setBrowseType(String browseType) { this.browseType = browseType; } public String getBrowseVersion() { return browseVersion; } public void setBrowseVersion(String browseVersion) { this.browseVersion = browseVersion; } }
package com.njbdqn.myhdfs.services; public class Machine { private String cpuType; private String memory; private String cpuSeed; public String getCpuType() { return cpuType; } public void setCpuType(String cpuType) { this.cpuType = cpuType; } public String getMemory() { return memory; } public void setMemory(String memory) { this.memory = memory; } public String getCpuSeed() { return cpuSeed; } public void setCpuSeed(String cpuSeed) { this.cpuSeed = cpuSeed; } }
运行下面类:
package com.njbdqn.myhdfs.services; import com.alibaba.fastjson.JSON; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileReader; import java.net.URI; public class UploadFileToHDFS { public static void main(String[] args) throws Exception{ Configuration cfg = new Configuration(); //cfg.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),cfg);//连上windows系统提供的Hadoop环境,与Linux的环境无关 //文件转换 FileReader fis = new FileReader("E:/mylog/log_20200102.log"); BufferedReader bis = new BufferedReader(fis); //在HDFS上创建一个文件(不是文件夹) FSDataOutputStream fos = fs.create(new Path("/tmp/lg_20200102.log")); String line = ""; while ((line=bis.readLine()) != null) { Info info = JSON.parseObject(line, Info.class); String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", info.getMachine().getCpuType(), info.getMachine().getMemory(), info.getMachine().getCpuSeed(), info.getActTime(), info.getActType(), info.getGoodid(), info.getPage(), info.getUserid(), info.getBrowse().getBrowseType(), info.getBrowse().getBrowseVersion());//String模板 fos.write(ctx.getBytes()); } fos.flush(); fos.close(); bis.close(); fis.close(); } }
web上得结果:
JSON格式转为下图的格式:
Hadoop不能用多线程完成任务(单文件不能用,多文件可以),下面写的是多线程,这是因为是多个文件
package com.njbdqn.myhdfs.services; import com.alibaba.fastjson.JSON; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.*; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class UploadFileToHDFS { /* 确定文件没问题 public static void main(String[] args) { File file = new File("e:/mylog"); String[] fst = file.list(); for (String f:fst){ System.out.println(f); } } */ public void writeFileToHDFS(String path,String fileName) { FileSystem fs = null; FileReader fis = null; BufferedReader bis = null; FSDataOutputStream fos = null; try { fs = FileSystem.get(new URI("hdfs://192.168.126.200:9000"),new Configuration()); fis = new FileReader(path+"/"+fileName); bis = new BufferedReader(fis); //在HDFS上创建一个文件 fos = fs.create(new Path("/logs/"+fileName)); String line = ""; while((line=bis.readLine())!=null) { Info info = JSON.parseObject(line, Info.class); String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", info.getMachine().getCpuType(), info.getMachine().getMemory(), info.getMachine().getCpuSeed(), info.getActTime(), info.getActType(), info.getGoodid(), info.getPage(), info.getUserid(), info.getBrowse().getBrowseType(), info.getBrowse().getBrowseVersion());//String模板 fos.write(ctx.getBytes()); } } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); }finally { try { fos.close(); bis.close(); fis.close(); //fs.close();不一定要关闭,视具体情况而定 } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(30); final UploadFileToHDFS ufh = new UploadFileToHDFS(); String filePath = "e:/mylog"; //循环获取所有的文件 File file = new File(filePath); String [] fs = file.list(); for (String fileName:fs) { es.execute(new Runnable() { @Override public void run() { ufh.writeFileToHDFS(filePath,fileName); } }); } es.shutdown(); } }
出现下图表示上传成功:
上图中的列名为block是128MB,但列名为size是22.11MB,浪费了很多,所以要将小文件合并为大文件。