前几天学习了基于map-reduce的物品推荐算法的实现,写下来和大家分享。
首先,整段代码可分为5步:
step1.根据用户行为列表构建评分矩阵。
step2.利用评分矩阵,构建物品与物品的相似度矩阵。(在这里利用余弦相似度计算物品与物品的相似度矩阵)
多维向量的余弦相似度:
a(A1,A2,A3.....An)
b(B1,B2,B3,....Bn)
step3.将评分矩阵转置。
step4.物品与物品相似度矩阵*评分矩阵(经过步骤3转置)。
step5.根据评分矩阵,将步骤4的输出中,用户已经有过行为的商品评分置0 。
接下来,我们看看具体实现。
step1.根据用户行为列表构建评分矩阵。
输入矩阵每列代表:用户ID;物品ID;分值 输入文件路径为: /ItemCF/step1_input/ActionList.txt
eg:输入矩阵:
A,1,1
C,3,5
B,2,3
B,5,3
B,6,5
A,2,10
C,3,10
C,4,5
C,1,5
A,1,1
A,6,5
A,4,3
<---------------step1的map阶段----------------->
[java]
- package step1;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- public class Mapper1 extends Mapper<LongWritable,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context)
- throws IOException, InterruptedException{
- try{
- String[] values = value.toString().split(",");
- String userID = values[0];
- String itemID = values[1];
- String score = values[2];
- outKey.set(itemID);
- outValue.set(userID+"_"+score);
- context.write(outKey, outValue);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
<---------------step1的reduce阶段----------------->
[java]
- package step1;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- public class Reducer1 extends Reducer<Text,Text,Text,Text> {
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- try{
- String itemID = key.toString();
- //<userID,score>
- Map<String,Integer> map = new HashMap<String,Integer>();
- for(Text value: values){
- String userID = value.toString().split("_")[0];
- String score = value.toString().split("_")[1];
- if(map.get(userID) == null){
- map.put(userID, Integer.valueOf(score));
- }else{
- Integer preScore = map.get(userID);
- map.put(userID, preScore+Integer.valueOf(score));
- }
- }
- StringBuilder sBuilder = new StringBuilder();
- for(Map.Entry<String, Integer> entry:map.entrySet()){
- String userID = entry.getKey();
- String score = String.valueOf(entry.getValue());
- sBuilder.append(userID + "_" + score + ",");
- }
- String line = null;
- if(sBuilder.toString().endsWith(",")){
- line = sBuilder.substring(0,sBuilder.length()-1);
- }
- outKey.set(itemID);
- outValue.set(line);
- context.write(outKey, outValue);
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
<---------------step1的主函数----------------->
[java]
- package step1;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MR1 {
- private static String inputPath = "/ItemCF/step1_input/ActionList.txt";
- private static String outputPath = "/ItemCF/step1_output/";
- private static String hdfsPath = "hdfs://hadoop:9000";
- public int run(){
- try{
- //创建job配置类
- Configuration conf = new Configuration();
- //设置hdfs地址
- conf.set("fs.defaultFS",hdfsPath);
- //创建job实例
- Job job = Job.getInstance(conf, "step1");
- //添加分布式缓存文件
- //job.addCacheArchive(new URI(cache + "matrix2"));
- //设置Job主类
- job.setJarByClass(MR1.class);
- //设置Job的Mapper类和Reducer类
- job.setMapperClass(Mapper1.class);
- job.setReducerClass(Reducer1.class);
- //设置Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reducer的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //设置输入和输出路径
- FileSystem fs = FileSystem.get(conf);
- Path inPath = new Path(inputPath);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }
- Path outPath = new Path(outputPath);
- fs.delete(outPath,true);
- FileOutputFormat.setOutputPath(job, outPath);
- return job.waitForCompletion(true)? 1:-1;
- }catch(IOException e){
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return 1;
- }
- public static void main(String[] args){
- try{
- int result = -1;
- result = new MR1().run();
- System.out.println("result0 = "+result);
- if(result == 1){
- System.out.println("result1 = "+result);
- System.out.println("success");
- }else if(result == -1){
- System.out.println("result2 = "+result);
- System.out.println("defeat");
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
经step1阶段输出结果为:
输出矩阵每列代表: 物品ID(行);用户ID(列);分值 输出路径为 /ItemCF/step1_output/。
输出结果:
1 A_2,C_5
2 A_10,B_3
3 C_15
4 A_3,C_5
5 B_3
6 A_5,B_5
step2.利用评分矩阵,构建物品与物品的相似度矩阵。
输入:步骤1的输出 /ItemCF/step1_output/
缓存:步骤1的输出 /ItemCF/step1_output/part-r-00000
<---------------step2的map函数----------------->
[java]
- package step2;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.text.DecimalFormat;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class Mapper2 extends Mapper<LongWritable,Text,Text,Text> {
- private Text outKey = new Text();
- private Text outValue = new Text();
- private List<String> cacheList = new ArrayList<String>();
- private DecimalFormat df = new DecimalFormat("0.00");
- protected void setup(Context context) throws IOException,InterruptedException{
- try{
- super.setup(context);
- //通过输入流将全局缓存中的右侧矩阵读入List<String>中
- FileReader fr = new FileReader("itemUserScore");
- BufferedReader br = new BufferedReader(fr);
- String line = null;
- while((line=br.readLine()) != null){
- cacheList.add(line);
- }
- fr.close();
- br.close();
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- /*
- * key:row
- * value:row tab col_value,col_value,col_value
- *
- * */
- @SuppressWarnings("unused")
- protected void map(LongWritable key,Text value,Context context)
- throws IOException, InterruptedException{
- try{
- //row
- String row_matrix1 = value.toString().split("\t")[0];
- //String[] col_value
- String[] column_array_matrix1 = value.toString().split("\t")[1].split(",");
- double demoinator1 = 0;
- //计算左侧矩阵行的空间距离
- for(String column_value:column_array_matrix1){
- String score = column_value.split("_")[1];
- demoinator1 += Double.valueOf(score) * Double.valueOf(score);
- }
- demoinator1 = Math.sqrt(demoinator1);
- for(String line:cacheList){
- //右侧矩阵的行line
- String row_matrix2 = line.toString().split("\t")[0];
- String[] column_array_matrix2 = line.toString().split("\t")[1].split(",");
- double demoinator2 = 0;
- //计算右侧矩阵行的空间距离
- for(String column_value:column_array_matrix2){
- String score = column_value.split("_")[1];
- demoinator2 += Double.valueOf(score) * Double.valueOf(score);
- }
- demoinator2 = Math.sqrt(demoinator2);
- int numerator = 0;
- //遍历左矩阵第一行的每一列
- for(String column_value_matrix1 : column_array_matrix1){
- String column_matrix1 = column_value_matrix1.split("_")[0];
- String value_matrix1 = column_value_matrix1.split("_")[1];
- //遍历右矩阵每一行的每一列
- for(String column_value_matrix2 : column_array_matrix2){
- if(column_value_matrix2.startsWith(column_matrix1+"_")){
- String value_matrix2 = column_value_matrix2.split("_")[1];
- numerator += Integer.valueOf(value_matrix1)* Integer.valueOf(value_matrix2);
- }
- }
- }
- double cos = numerator / (demoinator1 * demoinator2);
- if(cos == 0){
- continue;
- }
- outKey.set(row_matrix1);
- outValue.set(row_matrix2 + "_" + df.format(cos));
- context.write(outKey, outValue);
- }
- } catch(Exception e){
- e.printStackTrace();
- }
- }
- }
<---------------step2的reduce函数----------------->
[java]
- package step2;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- @SuppressWarnings("unused")
- public class Reducer2 extends Reducer<Text,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- try{
- StringBuilder sb = new StringBuilder();
- for(Text value:values){
- sb.append(value + ",");
- }
- String result = null;
- if(sb.toString().endsWith(",")){
- result = sb.substring(0, sb.length()-1);
- }
- outKey.set(key);
- outValue.set(result);
- context.write(outKey, outValue);
- } catch(Exception e){
- e.printStackTrace();
- }
- }
- }
<---------------step2的主函数----------------->
[java]
- package step2;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MR2 {
- private static String inputPath = "/ItemCF/step1_output/";//步骤1的输出
- private static String outputPath = "/ItemCF/step2_output/";//
- //将step1输出的转置矩阵作为全局缓存
- private static String cache = "/ItemCF/step1_output/part-r-00000";//步骤1的输出
- private static String hdfsPath = "hdfs://hadoop:9000";
- public int run(){
- try{
- //创建job配置类
- Configuration conf = new Configuration();
- //设置hdfs地址
- conf.set("fs.defaultFS",hdfsPath);
- //创建job实例
- Job job = Job.getInstance(conf, "step2");
- //添加分布式缓存文件
- job.addCacheArchive(new URI(cache + "#itemUserScore"));
- //设置Job主类
- job.setJarByClass(MR2.class);
- //设置Job的Mapper类和Reducer类
- job.setMapperClass(Mapper2.class);
- job.setReducerClass(Reducer2.class);
- //设置Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reducer的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //设置输入和输出路径
- FileSystem fs = FileSystem.get(conf);
- Path inPath = new Path(inputPath);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }
- Path outPath = new Path(outputPath);
- fs.delete(outPath,true);
- FileOutputFormat.setOutputPath(job, outPath);
- return job.waitForCompletion(true)? 1:0;
- }catch(IOException e){
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (URISyntaxException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return -1;
- }
- public static void main(String[] args){
- int result = -1;
- result = new MR2().run();
- if(result == 1){
- System.out.println("success");
- }else if(result == -1){
- System.out.println("defeat");
- }
- }
- }
经step2阶段输出结果为:
输出:物品ID(行);物品ID(列)_相似度 /ItemCF/step2_output/
1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
3 4_0.86,3_1.00,1_0.93
4 1_0.99,4_1.00,6_0.36,4_0.86,2_0.49
5 2_0.29,5_1.00,6_0.71
6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
3.将评分矩阵转置。
输入:步骤1的输出 /ItemCF/step1_output/
<---------------step3的map函数----------------->
[java]
- package step3;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- @SuppressWarnings("unused")
- public class Mapper3 extends Mapper<LongWritable,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void map(LongWritable key,Text value,Context context)
- throws IOException, InterruptedException{
- String[] rowAndLine = value.toString().split("\t");
- //矩阵的行号
- String row = rowAndLine[0];
- String[] lines = rowAndLine[1].split(",");
- //lines:1_1,2_2,3_-2,4_0
- for(int i = 0;i<lines.length;i++){
- String column = lines[i].split("_")[0];
- String valueStr = lines[i].split("_")[1];
- outKey.set(column);
- outValue.set(row + "_" + valueStr);
- context.write(outKey, outValue);
- }
- }
- }
<---------------step3的reduce函数----------------->
[java]
- package step3;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- @SuppressWarnings("unused")
- public class Reducer3 extends Reducer<Text,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- StringBuilder sb = new StringBuilder();
- for(Text text:values){
- //text:row_valueStr
- sb.append(text + ",");
- }
- String line = null;
- if(sb.toString().endsWith(",")){
- line = sb.substring(0,sb.length()-1);
- }
- outKey.set(key);
- outValue.set(line);
- context.write(outKey, outValue);
- }
- }
<---------------step3的主函数----------------->
[java]
- package step3;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MR3 {
- private static String inputPath = "/ItemCF/step1_output/";//步骤1的输出
- private static String outputPath ="/ItemCF/step3_output/";
- //将step1输出的转置矩阵作为全局缓存
- //private static String cache = "/matend";
- private static String hdfsPath = "hdfs://hadoop:9000";
- public int run(){
- try{
- //创建job配置类
- Configuration conf = new Configuration();
- //设置hdfs地址
- conf.set("fs.defaultFS",hdfsPath);
- //创建job实例
- Job job = Job.getInstance(conf, "step3");
- //添加分布式缓存文件
- //job.addCacheArchive(new URI(cache + "matrix2"));
- //设置Job主类
- job.setJarByClass(MR3.class);
- //设置Job的Mapper类和Reducer类
- job.setMapperClass(Mapper3.class);
- job.setReducerClass(Reducer3.class);
- //设置Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reducer的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //设置输入和输出路径
- FileSystem fs = FileSystem.get(conf);
- Path inPath = new Path(inputPath);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }
- Path outPath = new Path(outputPath);
- fs.delete(outPath,true);
- FileOutputFormat.setOutputPath(job, outPath);
- return job.waitForCompletion(true)? 1:0;
- }catch(IOException e){
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return -1;
- }
- public static void main(String[] args){
- int result = 0;
- result = new MR3().run();
- if(result == 1){
- System.out.println("success");
- }else if(result == -1){
- System.out.println("defeat");
- }
- }
- }
经step3阶段输出结果为:
输出:用户ID(行);物品ID(列)_分值 /ItemCF/step3_output/
A 6_5,4_3,2_10,1_2
B 6_5,5_3,2_3
C 4_5,3_15,1_5
step4.物品与物品相似度矩阵*评分矩阵(经过步骤3转置)
输入:步骤2的输出 /ItemCF/step2_output/
缓存:步骤3的输出 /ItemCF/step3_output/part-r-00000
<---------------step4的map函数----------------->
[java]
- package step4;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.text.DecimalFormat;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- @SuppressWarnings("unused")
- public class Mapper4 extends Mapper<LongWritable,Text,Text,Text> {
- private Text outKey = new Text();
- private Text outValue = new Text();
- private List<String> cacheList = new ArrayList<String>();
- private DecimalFormat df = new DecimalFormat("0.00");
- protected void setup(Context context) throws IOException,InterruptedException{
- super.setup(context);
- //通过输入流将全局缓存中的右侧矩阵读入List<String>中
- FileReader fr = new FileReader("itemUserScore2");
- BufferedReader br = new BufferedReader(fr);
- /*
- * row: row tab col_value,col_value,col_value
- *
- * */
- String line = null;
- while((line=br.readLine()) != null){
- cacheList.add(line);
- }
- fr.close();
- br.close();
- }
- /*
- * key:row
- * value:row tab col_value,col_value,col_value
- *
- * */
- protected void map(LongWritable key,Text value,Context context)
- throws IOException, InterruptedException{
- try{
- //row
- String row_matrix1 = value.toString().split("\t")[0];
- //String[] col_value
- String[] column_array_matrix1 = value.toString().split("\t")[1].split(",");
- for(String line:cacheList){
- String row_matrix2 = line.toString().split("\t")[0];
- String[] column_array_matrix2 = line.toString().split("\t")[1].split(",");
- double result = 0;
- //遍历左矩阵第一行的每一列
- for(String column_value_matrix1 : column_array_matrix1){
- String column_matrix1 = column_value_matrix1.split("_")[0];
- String value_matrix1 = column_value_matrix1.split("_")[1];
- //遍历右矩阵每一行的每一列
- for(String column_value_matrix2 : column_array_matrix2){
- if(column_value_matrix2.startsWith(column_matrix1+"_")){
- String value_matrix2 = column_value_matrix2.split("_")[1];
- result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2);
- }
- }
- }
- if(result == 0){
- continue;
- }
- outKey.set(row_matrix1);
- outValue.set(row_matrix2 + "_" + df.format(result));
- context.write(outKey, outValue);
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
<---------------step4的reduce函数----------------->
[java]
- package step4;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- @SuppressWarnings("unused")
- public class Reducer4 extends Reducer<Text,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- StringBuilder sb = new StringBuilder();
- for(Text value:values){
- sb.append(value + ",");
- }
- String result = null;
- if(sb.toString().endsWith(",")){
- result = sb.substring(0, sb.length()-1);
- }
- outKey.set(key);
- outValue.set(result);
- context.write(outKey, outValue);
- }
- }
<---------------step4的主函数----------------->
[java]
- package step4;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MR4 {
- private static String inputPath = "/ItemCF/step2_output/";//步骤2的输出
- private static String outputPath = "/ItemCF/step4_output/";
- //将step1输出的转置矩阵作为全局缓存
- private static String cache = "/ItemCF/step3_output/part-r-00000";//步骤3的输出
- private static String hdfsPath = "hdfs://hadoop:9000";
- public int run(){
- try{
- //创建job配置类
- Configuration conf = new Configuration();
- //设置hdfs地址
- conf.set("fs.defaultFS",hdfsPath);
- //创建job实例
- Job job = Job.getInstance(conf, "step4");
- //添加分布式缓存文件
- job.addCacheArchive(new URI(cache + "#itemUserScore2"));
- //设置Job主类
- job.setJarByClass(MR4.class);
- //设置Job的Mapper类和Reducer类
- job.setMapperClass(Mapper4.class);
- job.setReducerClass(Reducer4.class);
- //设置Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reducer的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //设置输入和输出路径
- FileSystem fs = FileSystem.get(conf);
- Path inPath = new Path(inputPath);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }
- Path outPath = new Path(outputPath);
- fs.delete(outPath,true);
- FileOutputFormat.setOutputPath(job, outPath);
- return job.waitForCompletion(true)? 1:0;
- }catch(IOException e){
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (URISyntaxException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return -1;
- }
- public static void main(String[] args){
- try{
- int result = 0;
- result = new MR4().run();
- if(result == 1){
- System.out.println("success");
- }else if(result == -1){
- System.out.println("defeat");
- }
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- }
经step4阶段输出结果为:
输出:物品ID(行);用户ID(列)_分值 /ItemCF/step4_output/
1 A_9.87,B_2.38,C_23.90
2 A_16.59,B_8.27,C_4.25
3 C_23.95,A_4.44
4 B_3.27,C_22.85,A_11.68
5 A_6.45,B_7.42
6 C_3.10,A_15.40,B_9.77
step5.根据评分矩阵,将步骤4的输出中,用户已经有过行为的商品评分置0
输入:步骤4的输出 /ItemCF/step4_output/
缓存:步骤1的输出 /ItemCF/step1_output/part-r-00000
<---------------step5的map函数----------------->
[java]
- package step5;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- public class Mapper5 extends Mapper<LongWritable,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- private List<String> cacheList = new ArrayList<String>();
- protected void setup(Context context) throws IOException,InterruptedException{
- super.setup(context);
- //通过输入流将全局缓存中的右侧矩阵读入List<String>中
- FileReader fr = new FileReader("itemUserScore3");
- BufferedReader br = new BufferedReader(fr);
- String line = null;
- while((line=br.readLine()) != null){
- cacheList.add(line);
- }
- fr.close();
- br.close();
- }
- protected void map(LongWritable key,Text value,Context context)
- throws IOException, InterruptedException{
- String item_matrix1 = value.toString().split("\t")[0];//物品ID
- String[] user_score_array_matrix1 = value.toString().split("\t")[1].split("1");
- for(String line: cacheList){
- String item_matrix2 = value.toString().split("\t")[0];//物品ID
- String[] user_score_array_matrix2 = value.toString().split("\t")[1].split("1");
- //如果物品ID相同
- if(item_matrix1 == item_matrix2){
- boolean flag = false;
- for(String user_score_matrix1:user_score_array_matrix1){
- String user_matrix1 = user_score_matrix1.split("_")[0];
- String score_matrix1 = user_score_matrix1.split("_")[1];
- for(String user_score_matrix2:user_score_array_matrix1){
- String user_matrix2 = user_score_matrix2.split("_")[0];
- if(user_matrix1 == user_matrix2){
- flag = true;
- }
- }
- if(flag == false){
- outKey.set(user_matrix1);
- outValue.set(item_matrix1 + "_" + score_matrix1);
- context.write(outKey, outValue);
- }
- }
- }
- }
- }
- }
<---------------step5的reduce函数----------------->
[java]
- package step5;
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Reducer.Context;
- @SuppressWarnings("unused")
- public class Reducer5 extends Reducer<Text,Text,Text,Text>{
- private Text outKey = new Text();
- private Text outValue = new Text();
- protected void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- StringBuilder sb = new StringBuilder();
- for(Text value:values){
- sb.append(value + ",");
- }
- String line = null;
- if(sb.toString().endsWith(",")){
- line = sb.substring(0, sb.length()-1);
- }
- outKey.set(key);
- outValue.set(line);
- context.write(outKey, outValue);
- }
- }
<---------------step5的主函数----------------->
[java]
- package step5;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class MR5 {
- private static String inputPath = "/ItemCF/step4_output/";//步骤4的输出
- private static String outputPath = "/ItemCF/step5_output/";
- //将step1输出的转置矩阵作为全局缓存
- private static String cache = "/ItemCF/step1_output/part-r-00000";//步骤1的输出
- private static String hdfsPath = "hdfs://hadoop:9000";
- public int run(){
- try{
- //创建job配置类
- Configuration conf = new Configuration();
- //设置hdfs地址
- conf.set("fs.defaultFS",hdfsPath);
- //创建job实例
- Job job = Job.getInstance(conf, "step5");
- //添加分布式缓存文件
- job.addCacheArchive(new URI(cache + "#itemUserScore3"));
- //设置Job主类
- job.setJarByClass(MR5.class);
- //设置Job的Mapper类和Reducer类
- job.setMapperClass(Mapper5.class);
- job.setReducerClass(Reducer5.class);
- //设置Mapper的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reducer的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- //设置输入和输出路径
- FileSystem fs = FileSystem.get(conf);
- Path inPath = new Path(inputPath);
- if(fs.exists(inPath)){
- FileInputFormat.addInputPath(job, inPath);
- }
- Path outPath = new Path(outputPath);
- fs.delete(outPath,true);
- FileOutputFormat.setOutputPath(job, outPath);
- return job.waitForCompletion(true)? 1:0;
- }catch(IOException e){
- e.printStackTrace();
- } catch (ClassNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (URISyntaxException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return -1;
- }
- public static void main(String[] args){
- try{
- int result = 0;
- result = new MR5().run();
- if(result == 1){
- System.out.println("success");
- }else if(result == -1){
- System.out.println("defeat");
- }
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- }
经step5阶段输出结果为:
输出:用户ID(行);物品ID(列);分值(最终的推荐列表)/ItemCF/step5_output/
A 5_6.45,3_4.44
B 4_3.27,1_2.38
C 6_3.10,2_4.25
通过以上各个函数分布测试无误后,我们可以编写如下函数,统一运行。(注:在统一运行前应将各步骤的运行结果删除)
[java]
- package Job;
- import step1.MR1;
- import step2.MR2;
- import step3.MR3;
- import step4.MR4;
- import step5.MR5;
- public class JobRunner {
- public static void main(String[] args){
- int status1 = -1;
- int status2 = -1;
- int status3 = -1;
- int status4 = -1;
- int status5 = -1;
- status1 = new MR1().run();
- if(status1 == 1){
- System.out.println("step1 success,begin step2");
- status2 = new MR2().run();
- }else{
- System.out.println("step1 failuer");
- }
- if(status2 == 1){
- System.out.println("step2 success,begin step3");
- status3 = new MR3().run();
- }else{
- System.out.println("step2 failuer");
- }
- if(status3 == 1){
- System.out.println("step3 success,begin step4");
- status4 = new MR4().run();
- }else{
- System.out.println("step3 failuer");
- }
- if(status4 == 1){
- System.out.println("step4 success,begin step5");
- status5 = new MR5().run();
- }else{
- System.out.println("step4 failuer");
- }
- if(status5 == 1){
- System.out.println("step5 success,program over");
- //status2 = new MR5().run();
- }else{
- System.out.println("step5 failuer");
- }
- }
- }
以上,就是基于物品推荐算法的所有代码^_^
登录 | 立即注册