协同过滤算法:
基于物品的协同过滤算法主要有两步:
1、计算物品之间的相似度:可依据物品共现次数、余弦夹角、欧氏距离这三种方法计算得到物品之间的相似度。
2、根据物品的相似度和用户的历史购买记录给用户生成推荐列表
最终推荐的是什么物品,是由推荐度决定的。
核心:找出所有两两同时被购买商品出现的次数,
现在其中有用户购买了其中一个商品,推荐该商品组合另外一件商品
//创建用户表 create table s_user( id int primary key auto_increment, name varchar(20), age int, phone varchar(20) ); insert into s_user values(10001,'jake',20,'15023453003'),(10002,'rose',22,'18923452343'),(10003,'tom',21,'15113453001'),(10004,'briup',22,'18823452456'),(10005,'kevin',24,'15925671003'),(10006,'patel',28,'15983432459'); //创建商品表 create table s_product( id int primary key auto_increment, name varchar(20), price double, descrition varchar(100), kc double ); insert into s_product values(20001,'hadoop',89,'bigdata',1000),(20002,'hbase',36,'bigdata',110),(20003,'mysql',58,'bigdata',190),(20004,'sqoop',28,'bigdata',70),(20005,'flume',34,'bigdata',109),(20006,'kafka',29,'bigdata',78),(20007,'hive',31,'bigdata',83); //创建订单表 create table s_order( id int primary key auto_increment, name varchar(20), order_date timestamp default current_timestamp on update current_timestamp, user_id int references s_user(id) ); insert into s_order(id,name,user_id) values(1,'briup_store',10001),(2,'briup_store',10002),(3,'briup_store',10003),(4,'briup_store',10004),(5,'briup_store',10005),(6,'briup_store',10006),(7,'briup_store',10007); //创建订单表和用户表之间的桥表 create table order_line( order_id int references s_order(id), product_id int references s_product(id), num double, primary key(order_id,product_id) ); insert into order_line values(1,20001,1),(1,20002,1),(1,20005,1),(1,20006,1),(1,20007,1),(2,20003,1),(2,20004,1),(2,20006,1),(3,20002,1),(3,20007,1),(4,20001,1),(4,20002,1),(4,20005,1),(4,20006,1),(5,20001,1),(6,20004,1),(6,20007,1); //创建最终形成商品推荐结果表 create table recommend( uid int references s_user(id), gid int references s_product(id), nums double, primary key(uid,gid) );
sqoop import --connect jdbc:mysql://192.168.43.158:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
@原始数据:用户 商品id 购买次数 10001 20001 1 10001 20002 1 10001 20005 1 10001 20006 1 10001 20007 1 10002 20003 1 10002 20004 1 10002 20006 1 10003 20002 1 10003 20007 1 10004 20001 1 10004 20002 1 10004 20005 1 10004 20006 1 10005 20001 1 10006 20004 1 10006 20007 1
数据来源于原始数据。
package com.briup.mr.one; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ //step1:计算用户购买商品的列表 //结果数据: 10001 20001,20005,20006,20007,20002 public class UserBuyGoodsList{ //输入:10001 20001 1 public static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); //设置输出的键为用户id outK.set(line[0]); outV.set(line[1]); context.write(outK, outV); } } public static class UserBuyGoodsListReducer extends Reducer<Text, Text, Text, Text> { private Text outV = new Text(); 结果数据: 10001 20001,20005,20006,20007,20002 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text value : values) { //拼接字符串 sb.append(value.toString() + ","); } //将字符串最后的‘,’去掉 sb.setLength(sb.length() - 1); outV.set(sb.toString()); context.write(key, new Text(sb.toString())); outV.clear(); } } }
结果数据:
10001 20001,20005,20006,20007,20002 10002 20006,20003,20004 10003 20002,20007 10004 20001,20002,20005,20006 10005 20001 10006 20004,20007
数据来源:第1步的计算结果
package com.briup.mr.two; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ //计算商品的共现关系 即两两商品出现的组合有哪些 期望结果:20001 20001 //不需要reduce程序 public class GoodsCooccurrenceList { //使用sequencefileinputformat读取数据,读入的数据自动基于键和值分割 public static class GoodsCooccurrenceListMapper extends Mapper<Text, Text, Text, NullWritable> { private StringBuffer sb = new StringBuffer(); private Text outK = new Text(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); //每个商品id两两组合 for (String s : line) { for (String s1 : line) { sb.append(s).append("\t").append(s1); outK.set(sb.toString()); context.write(outK, NullWritable.get()); sb.setLength(0); outK.clear(); } } } } }
计算结果:
20001 20001 20001 20002 20001 20005 20001 20006 20001 20007 20001 20001 20001 20006 20001 20005 20001 20002 20002 20007 20002 20001 20002 20005 20002 20006 20002 20007 20002 20002 20002 20006 20002 20005 ... ...
数据来源:第2步的结果
package com.briup.mr.three; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by Intellij IDEA. * * @author zhudezhong */ public class GoodsCooccurrenceMatrix { //输入数据: // 20001 20001 // 20001 20002 //20001 20005 public static class GoodsCooccurrenceMatrixMapper extends Mapper<Text, NullWritable, Text, Text> { private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split("\t"); outK.set(line[0]); outV.set(line[1]); context.write(outK, outV); } } public static class GoodsCooccurrenceMatrixReducer extends Reducer<Text, Text, Text, Text> { //定义一个map来存储输出的键信息 private Map<String, Integer> map = new HashMap<String, Integer>(); private StringBuffer sb = new StringBuffer(); private Text outV = new Text(); //输入数据: //20001 [20001,20002,20005。。。] @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { if (map.containsKey(val.toString())){ //如果map中包含该键 map.put(val.toString(),map.get(val.toString())+1); }else { map.put(val.toString(),1); } } //拼接字符串 for (Map.Entry<String, Integer> en : map.entrySet()) { sb.append(en.getKey()).append(":").append(en.getValue()).append(","); } //去除末尾的“,” sb.setLength(sb.length()-1); outV.set(sb.toString()); context.write(key,outV); sb.setLength(0); map.clear(); outV.clear(); } } }
计算结果:
20001 20001:3,20002:2,20005:2,20006:2,20007:1 20002 20001:2,20002:3,20005:2,20006:2,20007:2 20003 20003:1,20004:1,20006:1 20004 20003:1,20004:2,20006:1,20007:1 20005 20001:2,20002:2,20005:2,20006:2,20007:1 20006 20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1 20007 20001:1,20002:2,20004:1,20005:1,20006:1,20007:3
数据来源:第1步的结果或者最原始数据。
package com.briup.mr.four; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by Intellij IDEA. * * @author zhudezhong */ //计算用户的购买向量 public class UserBuyGoodsVector { /* 输入数据:10001 20001 1 读取源文件为源数据 */ public static class UserBuyGoodsVectorMapper extends Mapper<LongWritable, Text, Text, Text> { /* 商品id为键,用户id为值 */ private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); outK.set(line[1]); outV.set(line[0]); context.write(outK, outV); } } public static class UserBuyGoodsVectorReducer extends Reducer<Text, Text, Text, Text> { /* 输入为:20001 [10001,10002.。。] */ private Text outV = new Text(); private Map<String, Integer> map = new HashMap<>(); private StringBuffer sb = new StringBuffer(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { /* map端输出的key为商品id,值为用户id的集合 结果: 20001 10001:1,10004:1,10005:1 */ for (Text value : values) { if (map.containsKey(value.toString())) { map.put(value.toString(), map.get(value.toString()) + 1); } else { map.put(value.toString(), 1); } } for (Map.Entry<String, Integer> en : map.entrySet()) { sb.append(en.getKey()).append(":").append(en.getValue()).append(","); } sb.setLength(sb.length()-1); outV.set(sb.toString()); context.write(key,outV); //重置数据 sb.setLength(0); map.clear(); outV.clear(); } } }
结果数据:
20001 10001:1,10004:1,10005:1 20002 10001:1,10003:1,10004:1 20003 10002:1 20004 10002:1,10006:1 20005 10001:1,10004:1 20006 10001:1,10002:1,10004:1 20007 10001:1,10003:1,10006:1
原始数据:第3步和第4步的结果数据
思考:文件的来源,来自于两个文件,第一个是第3步的结果(物品的共现矩阵),第二个文件是第4步的结果(用户的购买向量)。所以在一个MR程序中,需要使用两个自定义Mapper分别处理,然后定义一个自定义Reducer来处理这两个Mapper的中间结果。
GoodsBean类:
package com.briup.mr.five; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ public class GoodsBean implements WritableComparable<GoodsBean> { private String g_id; //商品id //flag为1表示数据来自商品共现次数(第3步结果) //flag为0表示数据来自用户购买向量(第四步结果) private int flag; public GoodsBean() { } public GoodsBean(String g_id, int flag) { this.g_id = g_id; this.flag = flag; } public String getG_id() { return g_id; } public void setG_id(String g_id) { this.g_id = g_id; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public int compareTo(GoodsBean o) { int n = this.g_id.compareTo(o.g_id); if (n != 0) { return n; } else { //将商品共现表的数据放在前面 return -(this.flag - o.flag); } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(g_id); dataOutput.writeInt(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.g_id = dataInput.readUTF(); this.flag = dataInput.readInt(); } }
mapredecu分区类GoodsPartitioner:
package com.briup.mr.five; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Partitioner; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/28 20:07 */ public class GoodsPartitioner extends Partitioner<GoodsBean, Text> { @Override public int getPartition(GoodsBean goodsBean, Text text, int numPartitions) { return Math.abs(Integer.parseInt(goodsBean.getG_id()) * 127) % numPartitions; } }
mapreduce分组类GoodsGroup:
package com.briup.mr.five; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/28 20:14 */ public class GoodsGroup extends WritableComparator { public GoodsGroup() { super(GoodsBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { //基于商品id分组,id相同的分为一组 GoodsBean o = (GoodsBean) a; GoodsBean o1 = (GoodsBean) b; return o.getG_id().compareTo(o1.getG_id()); } }
mapreduce类:
package com.briup.mr.five; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.*; /** * Created by Intellij IDEA. * * @author zhudezhong */ /* step5: 商品共现矩阵乘以用户购买向量,形成临时的推荐结果。 */ public class MultiplyGoodsMatrixAndUserVector { /* 输入数据:第3步的结果 物品共现矩阵: 20001 20005:2,20002:2,20001:3,20007:1,20006:2 */ public static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<Text, Text, GoodsBean, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //map输出数据为: GoodsBean(20001,1) 20005:2,20002:2,20001:3,20007:1,20006:2 context.write(new GoodsBean(key.toString(), 1), value); } } /* 输入数据: 用户购买向量: 20001 10001:1,10004:1,10005:1 */ public static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<Text, Text, GoodsBean, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //map输出数据为:GoodsBean(20001,0) 10001:1,10004:1,10005:1 context.write(new GoodsBean(key.toString(), 0), value); } } /* 期望输出数据: 10001,20001 2 */ public static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<GoodsBean, Text, Text, DoubleWritable> { /* 进入reduce的数据为: GoodsBean(20001,1)GoodsBean(20001,0) 20005:2,20002:2,20001:3,20007:1,20006:2 || 10001:1,10004:1,10005:1 */ @Override protected void reduce(GoodsBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); //拿到商品项:20005:2,20002:2,20001:3,20007:1,20006:2 String[] goods = iter.next().toString().split(","); while (iter.hasNext()) { //拿到用户购买向量:10001:1,10004:1,10005:1 String[] users = iter.next().toString().split(","); // System.out.println(Arrays.toString(users)); for (String user : users) { String[] uid_nums = user.split(":"); for (String good : goods) { String[] gid_nums = good.split(":"); //sb作为key输出 StringBuffer sb = new StringBuffer(); sb.append(uid_nums[0]).append(",").append(gid_nums[0]); context.write(new Text(sb.toString()), new DoubleWritable(Double.parseDouble(uid_nums[1]) * Double.parseDouble(gid_nums[1]))); sb.setLength(0); } } } } } }
结果数据:
10001,20001 2 10001,20001 2 10001,20001 3 10001,20001 1 10001,20001 2 10001,20002 3 10001,20002 2 10001,20002 2 10001,20002 2 10001,20002 2 ... ...
原始数据:第5步的计算结果
package com.briup.mr.six; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:04 */ //第六步:对第5步计算的推荐的零散结果进行求和。 public class MakeSumForMultiplication { public static class MakeSumForMultiplicationMapper extends Mapper<Text, DoubleWritable, Text, DoubleWritable> { //MAP读入数据:10006,20007 3 @Override protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class MakeSumForMultiplicationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0; for (DoubleWritable value : values) { sum += value.get(); } context.write(key, new DoubleWritable(sum)); } } }
结果数据:
10001,20001 10 10001,20002 11 10001,20003 1 10001,20004 2 10001,20005 9 10001,20006 10 ... ...
step7:数据去重,在推荐结果中去掉用户已购买的商品信息。
数据来源:
1.FirstMapper处理用户的购买列表数据。
2.SecondMapper处理第6的推荐结果数据。
javaBean类UserAndGoods:
package com.briup.mr.seven; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:24 */ public class UserAndGoods implements WritableComparable<UserAndGoods> { private String userId; private String goodsId; //flag 为1 表示数据来源于源数据 //flag 为0 表示数据来源于第六步结果 private int flag; public UserAndGoods() { } public UserAndGoods(String userId, String goodsId, int flag) { this.userId = userId; this.goodsId = goodsId; this.flag = flag; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getGoodsId() { return goodsId; } public void setGoodsId(String goodsId) { this.goodsId = goodsId; } @Override public int compareTo(UserAndGoods o) { int i = this.getUserId().compareTo(o.getUserId()); //当用户i不相同时 if (i != 0) { return i; } else return this.getGoodsId().compareTo(o.getGoodsId()); } @Override public String toString() { return "UserAndGoods{" + "userId='" + userId + '\'' + ", goodsId='" + goodsId + '\'' + ", flag=" + flag + '}'; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(userId); dataOutput.writeUTF(goodsId); dataOutput.writeInt(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.userId = dataInput.readUTF(); this.goodsId = dataInput.readUTF(); this.flag = dataInput.readInt(); } }
mapreduce类:
package com.briup.mr.seven; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Collections; import java.util.Iterator; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:19 */ //数据去重,在推荐结果中去掉用户已购买的商品信息。 public class DuplicateDataForResult { //FirstMapper处理用户的购买列表数据 public static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable, Text, UserAndGoods, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //flag 为1 表示数据来源于源数据 //flag 为0 表示数据来源于第六步结果 String[] line = value.toString().split("\t"); context.write(new UserAndGoods(line[0], line[1], 1), value); } } //SecondMapper处理第6的推荐结果数据 public static class DuplicateDataForResultSecondMapper extends Mapper<Text, DoubleWritable, UserAndGoods, Text> { @Override protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split(","); context.write(new UserAndGoods(line[0], line[1], 0), new Text(key.toString() + "\t" + value.get())); } } /* reduce期望输出的数据:10001 20004 2 */ public static class DuplicateDataForResultReducer extends Reducer<UserAndGoods, Text, Text, NullWritable> { int i = 0; @Override protected void reduce(UserAndGoods key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); System.out.println((i++) + "--" + key); //集合的第一个元素 Text res = iter.next(); System.out.println(res.toString()); //如果集合没有下一个元素,直接写出 if (!iter.hasNext()) { System.out.println("有下一个元素"); context.write(res, NullWritable.get()); } } } }
计算结果:
10001 20004 2 10001 20003 1 10002 20002 2 10002 20007 2 10002 20001 2 10002 20005 2 10003 20006 3 10003 20005 3 10003 20001 3 10003 20004 1 10004 20007 5 10004 20004 1 10004 20003 1 10005 20006 2 10005 20002 2 ... ...
数据来源:第七步计算结果
package com.briup.mr.eight; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 16:51 */ public class DataInDB { public static class DataInDBMapper extends Mapper<Text, NullWritable, RecommendResultBean, NullWritable> { @Override protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split("\t"); RecommendResultBean outK = new RecommendResultBean(); outK.setNums(Double.parseDouble(line[1])); String[] split = line[0].split(","); outK.setUid(Integer.parseInt(split[0])); outK.setGid(Integer.parseInt(split[1])); context.write(outK, NullWritable.get()); } } public static class DataInDBReducer extends Reducer<RecommendResultBean, DoubleWritable, RecommendResultBean, NullWritable> { @Override protected void reduce(RecommendResultBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-boi4R1TR-1627699308225)(C:\Users\ASUS\AppData\Roaming\Typora\typora-user-images\image-20210729205026043.png)]
Step9:构建job作业流,提交作业到集群运行
package com.briup.mr; import com.briup.mr.eight.DataInDB; import com.briup.mr.eight.RecommendResultBean; import com.briup.mr.five.GoodsBean; import com.briup.mr.five.GoodsGroup; import com.briup.mr.five.GoodsPartitioner; import com.briup.mr.five.MultiplyGoodsMatrixAndUserVector; import com.briup.mr.four.UserBuyGoodsVector; import com.briup.mr.one.UserBuyGoodsList; import com.briup.mr.seven.DuplicateDataForResult; import com.briup.mr.seven.UserAndGoods; import com.briup.mr.six.MakeSumForMultiplication; import com.briup.mr.three.GoodsCooccurrenceMatrix; import com.briup.mr.two.GoodsCooccurrenceList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/27 16:36 */ public class FinalJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { //第一步计算结果的job任务 Configuration conf = getConf(); // Path input = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\input.txt"); // Path one_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out11"); // Path two_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out22"); // Path three_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out33"); // Path four_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out44"); // Path five_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out55"); // Path six_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out66"); // Path seven_output = new Path("C:\\Users\\ASUS\\Desktop\\briup\\大数据\\out77"); Path input = new Path("/user/zhudz/goods/input.txt"); Path one_output = new Path("/user/zhudz/goods/out11"); Path two_output = new Path("/user/zhudz/goods/out22"); Path three_output = new Path("/user/zhudz/goods/out33"); Path four_output = new Path("/user/zhudz/goods/out44"); Path five_output = new Path("/user/zhudz/goods/out55"); Path six_output = new Path("/user/zhudz/goods/out66"); Path seven_output = new Path("/user/zhudz/goods/out77"); FileSystem fs = FileSystem.get(conf); //判断输出路径是否存在,存在就删除 if (fs.exists(one_output)) { fs.delete(one_output, true); } if (fs.exists(two_output)) { fs.delete(two_output, true); } if (fs.exists(three_output)) { fs.delete(three_output, true); } if (fs.exists(four_output)) { fs.delete(four_output, true); } if (fs.exists(five_output)) { fs.delete(five_output, true); } if (fs.exists(six_output)) { fs.delete(six_output, true); } if (fs.exists(seven_output)) { fs.delete(seven_output, true); } //Step1:计算用户购买商品的列表 Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName("Step1:计算用户购买商品的列表"); job.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); TextInputFormat.addInputPath(job, input); SequenceFileOutputFormat.setOutputPath(job, one_output); //第二步计算结果的job任务 Job job1 = Job.getInstance(conf); job1.setJarByClass(this.getClass()); job1.setJobName("Step2:计算商品的共现关系"); job1.setMapperClass(GoodsCooccurrenceList.GoodsCooccurrenceListMapper.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(NullWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(NullWritable.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job1, one_output); SequenceFileOutputFormat.setOutputPath(job1, two_output); //第三步计算结果的job任务 Job job2 = Job.getInstance(conf); job2.setJarByClass(this.getClass()); job2.setJobName("Step3:计算商品的共现次数(共现矩阵)"); job2.setMapperClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setReducerClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setInputFormatClass(SequenceFileInputFormat.class); job2.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job2, two_output); SequenceFileOutputFormat.setOutputPath(job2, three_output); //step4计算用户购买向量 Job job3 = Job.getInstance(conf); job3.setJarByClass(this.getClass()); job3.setJobName("Step3:计算用户的购买向量"); job3.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(Text.class); job3.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); //读取源文件数据 job3.setInputFormatClass(TextInputFormat.class); job3.setOutputFormatClass(SequenceFileOutputFormat.class); TextInputFormat.addInputPath(job3, input); SequenceFileOutputFormat.setOutputPath(job3, four_output); //第五步计算结果 商品共现矩阵乘以用户购买向量 Job job4 = Job.getInstance(conf); job4.setJarByClass(this.getClass()); job4.setJobName("Step4:商品共现矩阵乘以用户购买向量,形成临时的推荐结果"); // 构建多个不同的map任务 //商品共现次数mapper MultipleInputs.addInputPath(job4, three_output, SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class); //用户购买向量mapper MultipleInputs.addInputPath(job4, four_output, SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class); job4.setMapOutputKeyClass(GoodsBean.class); job4.setMapOutputValueClass(Text.class); job4.setPartitionerClass(GoodsPartitioner.class); job4.setGroupingComparatorClass(GoodsGroup.class); job4.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class); job4.setOutputKeyClass(Text.class); job4.setOutputValueClass(DoubleWritable.class); job4.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job4, five_output); //第六步:对第5步计算的推荐的零散结果进行求和 Job job5 = Job.getInstance(conf); job5.setJarByClass(this.getClass()); job5.setJobName("Step6:对第5步计算的推荐的零散结果进行求和"); job5.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class); job5.setMapOutputKeyClass(Text.class); job5.setMapOutputValueClass(DoubleWritable.class); job5.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class); job5.setOutputKeyClass(Text.class); job5.setOutputValueClass(DoubleWritable.class); job5.setInputFormatClass(SequenceFileInputFormat.class); job5.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job5, five_output); SequenceFileOutputFormat.setOutputPath(job5, six_output); //第七步 Job job6 = Job.getInstance(conf); job6.setJarByClass(this.getClass()); job6.setJobName("Step7:数据去重,在推荐结果中去掉用户已购买的商品信息"); // 构建多个不同的map任务 //FirstMapper处理用户的购买列表数据。 MultipleInputs.addInputPath(job6, input, TextInputFormat.class, DuplicateDataForResult.DuplicateDataForResultFirstMapper.class); //SecondMapper处理第6的推荐结果数据。 MultipleInputs.addInputPath(job6, six_output, SequenceFileInputFormat.class, DuplicateDataForResult.DuplicateDataForResultSecondMapper.class); job6.setMapOutputKeyClass(UserAndGoods.class); job6.setMapOutputValueClass(Text.class); //设置分组 // job6.setGroupingComparatorClass(DuplicateDataGroup.class); job6.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class); job6.setOutputKeyClass(Text.class); job6.setOutputValueClass(NullWritable.class); job6.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job6, seven_output); //第8步:将推荐结果保存到MySQL数据库中 数据来源于第七步 Job job7 = Job.getInstance(conf); job7.setJarByClass(this.getClass()); job7.setJobName("Step8:将推荐结果保存到MySQL数据库中"); DBConfiguration.configureDB(job7.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.10.131/briup", "root", "root"); DBOutputFormat.setOutput(job7, "recommend", "uid", "gid", "nums"); job7.setMapperClass(DataInDB.DataInDBMapper.class); job7.setMapOutputKeyClass(RecommendResultBean.class); job7.setMapOutputValueClass(NullWritable.class); job7.setReducerClass(DataInDB.DataInDBReducer.class); job7.setOutputKeyClass(RecommendResultBean.class); job7.setOutputValueClass(NullWritable.class); job7.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.addInputPath(job7, seven_output); job7.setOutputFormatClass(DBOutputFormat.class); //final: 构建job作业流 ControlledJob contro_job = new ControlledJob(conf); contro_job.setJob(job); ControlledJob contro_job1 = new ControlledJob(conf); contro_job1.setJob(job1); contro_job1.addDependingJob(contro_job); ControlledJob contro_job2 = new ControlledJob(conf); contro_job2.setJob(job2); contro_job2.addDependingJob(contro_job1); ControlledJob contro_job3 = new ControlledJob(conf); contro_job3.setJob(job3); ControlledJob contro_job4 = new ControlledJob(conf); contro_job4.setJob(job4); contro_job4.addDependingJob(contro_job2); contro_job4.addDependingJob(contro_job3); ControlledJob contro_job5 = new ControlledJob(conf); contro_job5.setJob(job5); contro_job5.addDependingJob(contro_job4); ControlledJob contro_job6 = new ControlledJob(conf); contro_job6.setJob(job6); contro_job6.addDependingJob(contro_job5); ControlledJob contro_job7 = new ControlledJob(conf); contro_job7.setJob(job7); contro_job7.addDependingJob(contro_job6); JobControl jobs = new JobControl("goods_recommends"); jobs.addJob(contro_job); jobs.addJob(contro_job1); jobs.addJob(contro_job2); jobs.addJob(contro_job3); jobs.addJob(contro_job4); jobs.addJob(contro_job5); jobs.addJob(contro_job6); jobs.addJob(contro_job6); jobs.addJob(contro_job7); Thread t = new Thread(jobs); t.start(); //打印日志 while (true) { for (ControlledJob c : jobs.getRunningJobList()) { c.getJob().monitorAndPrintJob(); } if (jobs.allFinished()) break; } return 0; } public static void main(String[] args) throws Exception { System.exit(new ToolRunner().run(new FinalJob(), args)); // new ToolRunner().run(new FinalJob(), args); } }
1、编写将mysql中的数据迁移到hdfs文件系统中的脚本
sudo vi mysqlToHDFS.sh #完成后添加执行权限 sudo chmod +x mysqlToHDFS.sh
#!bin/bash sqoop import --connect jdbc:mysql://localhost:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
2、编写提交改项目的任务到集群的脚本
sudo vi recomendMR.sh #完成后添加执行权限 sudo chmod +x recomendMR.sh
#!bin/bash yarn jar /home/hdfs/GoodsRecommend-1.0-SNAPSHOT.jar com.briup.mr.FinalJob
3、设置定时任务
crontab -e #每周一上午7点执行将mysql中的数据迁移到hdfs文件系统中的任务 0 7 * * 2 sh ~/bin/mysqlToHDFS.sh #每周一上午7:30执行提交改项目的任务到集群的任务 30 7 * * 2 sh ~/bin/recomendMR.sh
项目源码:https://gitee.com/zhu-dezhong/recomendGoods