JobPlus知识库 互联网 大数据 文章
基于物品的推荐算法

前几天学习了基于map-reduce的物品推荐算法的实现,写下来和大家分享。

首先,整段代码可分为5步:

    step1.根据用户行为列表构建评分矩阵。

    step2.利用评分矩阵,构建物品与物品的相似度矩阵。(在这里利用余弦相似度计算物品与物品的相似度矩阵)


多维向量的余弦相似度:

a(A1,A2,A3.....An)

b(B1,B2,B3,....Bn)

    step3.将评分矩阵转置。

    step4.物品与物品相似度矩阵*评分矩阵(经过步骤3转置)。

    step5.根据评分矩阵,将步骤4的输出中,用户已经有过行为的商品评分置0 。

接下来,我们看看具体实现。

step1.根据用户行为列表构建评分矩阵。

输入矩阵每列代表:用户ID;物品ID;分值   输入文件路径为: /ItemCF/step1_input/ActionList.txt

eg:输入矩阵:

     A,1,1
      C,3,5
      B,2,3
      B,5,3
      B,6,5
      A,2,10
      C,3,10
      C,4,5
      C,1,5
      A,1,1
      A,6,5
      A,4,3

<---------------step1的map阶段----------------->

[java] 

  1. package step1;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.Mapper.Context;  
  9.   
  10. public class Mapper1 extends Mapper<LongWritable,Text,Text,Text>{  
  11.       
  12.     private Text outKey = new Text();  
  13.     private Text outValue = new Text();  
  14.       
  15.     protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context)  
  16.             throws IOException, InterruptedException{  
  17.     try{  
  18.         String[] values = value.toString().split(",");  
  19.         String userID = values[0];  
  20.         String itemID = values[1];  
  21.         String score = values[2];  
  22.           
  23.         outKey.set(itemID);  
  24.         outValue.set(userID+"_"+score);  
  25.           
  26.         context.write(outKey, outValue);  
  27.       }catch(Exception e){  
  28.           e.printStackTrace();  
  29.          }  
  30.     }  
  31. }  

<---------------step1的reduce阶段----------------->

[java] 

  1. package step1;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.HashMap;  
  5. import java.util.Map;  
  6.   
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.mapreduce.Reducer;  
  9. import org.apache.hadoop.mapreduce.Reducer.Context;  
  10.   
  11. public class Reducer1 extends Reducer<Text,Text,Text,Text> {  
  12.     private Text outKey = new Text();  
  13.     private Text outValue = new Text();  
  14.       
  15.     protected void reduce(Text key,Iterable<Text> values,Context context)  
  16.             throws IOException,InterruptedException{  
  17.         try{  
  18.         String itemID = key.toString();  
  19.           
  20.         //<userID,score>  
  21.         Map<String,Integer> map = new HashMap<String,Integer>();  
  22.         for(Text value: values){   
  23.             String userID = value.toString().split("_")[0];  
  24.             String score = value.toString().split("_")[1];  
  25.               
  26.             if(map.get(userID) == null){  
  27.                 map.put(userID, Integer.valueOf(score));  
  28.             }else{  
  29.                 Integer preScore = map.get(userID);  
  30.                 map.put(userID, preScore+Integer.valueOf(score));  
  31.             }  
  32.         }  
  33.           
  34.         StringBuilder sBuilder = new StringBuilder();  
  35.         for(Map.Entry<String, Integer> entry:map.entrySet()){  
  36.             String userID = entry.getKey();  
  37.             String score = String.valueOf(entry.getValue());  
  38.             sBuilder.append(userID + "_" + score + ",");  
  39.   
  40.         }  
  41.           
  42.         String line = null;  
  43.         if(sBuilder.toString().endsWith(",")){  
  44.             line = sBuilder.substring(0,sBuilder.length()-1);  
  45.         }  
  46.           
  47.         outKey.set(itemID);  
  48.         outValue.set(line);  
  49.           
  50.         context.write(outKey, outValue);  
  51.         }catch(Exception e){  
  52.             e.printStackTrace();  
  53.         }     
  54.     }  
  55. }  

<---------------step1的主函数----------------->

[java] 

  1. package step1;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.URI;  
  5. import java.net.URISyntaxException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class MR1 {  
  18.     private static String inputPath = "/ItemCF/step1_input/ActionList.txt";  
  19.     private static String outputPath = "/ItemCF/step1_output/";  
  20.     private static String hdfsPath = "hdfs://hadoop:9000";  
  21.       
  22.     public int run(){  
  23.         try{  
  24.             //创建job配置类  
  25.             Configuration conf = new Configuration();  
  26.             //设置hdfs地址  
  27.             conf.set("fs.defaultFS",hdfsPath);  
  28.             //创建job实例  
  29.             Job job = Job.getInstance(conf, "step1");  
  30.             //添加分布式缓存文件  
  31.             //job.addCacheArchive(new URI(cache + "matrix2"));  
  32.             //设置Job主类  
  33.             job.setJarByClass(MR1.class);  
  34.             //设置Job的Mapper类和Reducer类  
  35.             job.setMapperClass(Mapper1.class);  
  36.             job.setReducerClass(Reducer1.class);  
  37.             //设置Mapper的输出类型  
  38.             job.setMapOutputKeyClass(Text.class);  
  39.             job.setMapOutputValueClass(Text.class);  
  40.             //设置Reducer的输出类型  
  41.             job.setOutputKeyClass(Text.class);  
  42.             job.setOutputValueClass(Text.class);  
  43.             //设置输入和输出路径  
  44.             FileSystem fs = FileSystem.get(conf);  
  45.             Path inPath = new Path(inputPath);  
  46.             if(fs.exists(inPath)){  
  47.                 FileInputFormat.addInputPath(job, inPath);  
  48.             }  
  49.               
  50.             Path outPath = new Path(outputPath);  
  51.             fs.delete(outPath,true);  
  52.               
  53.             FileOutputFormat.setOutputPath(job, outPath);  
  54.               
  55.             return job.waitForCompletion(true)? 1:-1;  
  56.               
  57.         }catch(IOException e){  
  58.             e.printStackTrace();  
  59.         } catch (ClassNotFoundException e) {  
  60.             // TODO Auto-generated catch block  
  61.             e.printStackTrace();  
  62.         } catch (InterruptedException e) {  
  63.             // TODO Auto-generated catch block  
  64.             e.printStackTrace();  
  65.         }  
  66.         return 1;  
  67.           
  68.    }  
  69.     public static void main(String[] args){  
  70.         try{  
  71.         int result = -1;  
  72.         result = new MR1().run();  
  73.         System.out.println("result0 = "+result);  
  74.         if(result == 1){  
  75.             System.out.println("result1 = "+result);  
  76.             System.out.println("success");  
  77.             }else if(result == -1){  
  78.                 System.out.println("result2 = "+result);  
  79.                 System.out.println("defeat");  
  80.             }  
  81.         }catch(Exception e){  
  82.         e.printStackTrace();  
  83.      }  
  84.     }  
  85. }  

经step1阶段输出结果为:

输出矩阵每列代表: 物品ID(行);用户ID(列);分值  输出路径为 /ItemCF/step1_output/。

输出结果:

      1 A_2,C_5
      2 A_10,B_3
      3 C_15
      4 A_3,C_5
      5 B_3
      6 A_5,B_5

step2.利用评分矩阵,构建物品与物品的相似度矩阵。

  输入:步骤1的输出      /ItemCF/step1_output/
  缓存:步骤1的输出      /ItemCF/step1_output/part-r-00000

<---------------step2的map函数----------------->

[java]

  1. package step2;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.FileReader;  
  5. import java.io.IOException;  
  6. import java.text.DecimalFormat;  
  7. import java.util.ArrayList;  
  8. import java.util.List;  
  9.   
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13.   
  14. public class Mapper2 extends Mapper<LongWritable,Text,Text,Text> {  
  15.     private Text outKey = new Text();  
  16.     private Text outValue = new Text();  
  17.     private List<String> cacheList = new ArrayList<String>();  
  18.     private DecimalFormat df = new DecimalFormat("0.00");  
  19.       
  20.     protected void setup(Context context) throws IOException,InterruptedException{  
  21.         try{  
  22.         super.setup(context);  
  23.           
  24.         //通过输入流将全局缓存中的右侧矩阵读入List<String>中  
  25.         FileReader fr = new FileReader("itemUserScore");  
  26.         BufferedReader br = new BufferedReader(fr);  
  27.           
  28.       
  29.         String line = null;  
  30.         while((line=br.readLine()) != null){  
  31.             cacheList.add(line);  
  32.         }  
  33.           
  34.         fr.close();  
  35.         br.close();  
  36.     }catch(Exception e){  
  37.         e.printStackTrace();  
  38.       }  
  39.     }  
  40.       
  41.     /* 
  42.      * key:row 
  43.      * value:row tab col_value,col_value,col_value 
  44.      *  
  45.      * */  
  46.     @SuppressWarnings("unused")  
  47.     protected void map(LongWritable key,Text value,Context context)   
  48.             throws IOException, InterruptedException{  
  49.         try{  
  50.         //row  
  51.         String row_matrix1 = value.toString().split("\t")[0];  
  52.         //String[] col_value  
  53.         String[] column_array_matrix1 = value.toString().split("\t")[1].split(",");  
  54.           
  55.         double demoinator1 = 0;  
  56.         //计算左侧矩阵行的空间距离  
  57.         for(String column_value:column_array_matrix1){  
  58.             String score = column_value.split("_")[1];  
  59.             demoinator1 += Double.valueOf(score) * Double.valueOf(score);  
  60.         }  
  61.         demoinator1 = Math.sqrt(demoinator1);  
  62.           
  63.         for(String line:cacheList){  
  64.             //右侧矩阵的行line  
  65.             String row_matrix2 = line.toString().split("\t")[0];  
  66.             String[] column_array_matrix2 = line.toString().split("\t")[1].split(",");  
  67.               
  68.             double demoinator2 = 0;  
  69.             //计算右侧矩阵行的空间距离  
  70.             for(String column_value:column_array_matrix2){  
  71.                 String score = column_value.split("_")[1];  
  72.                 demoinator2 += Double.valueOf(score) * Double.valueOf(score);  
  73.             }  
  74.             demoinator2 = Math.sqrt(demoinator2);  
  75.               
  76.             int numerator = 0;  
  77.             //遍历左矩阵第一行的每一列  
  78.             for(String column_value_matrix1 : column_array_matrix1){  
  79.                 String column_matrix1 = column_value_matrix1.split("_")[0];  
  80.                 String value_matrix1 = column_value_matrix1.split("_")[1];  
  81.                                                   
  82.                 //遍历右矩阵每一行的每一列  
  83.                 for(String column_value_matrix2 : column_array_matrix2){  
  84.                     if(column_value_matrix2.startsWith(column_matrix1+"_")){  
  85.                         String value_matrix2 = column_value_matrix2.split("_")[1];  
  86.                         numerator += Integer.valueOf(value_matrix1)* Integer.valueOf(value_matrix2);  
  87.                     }  
  88.                 }  
  89.                   
  90.             }  
  91.               
  92.             double cos = numerator / (demoinator1 * demoinator2);  
  93.             if(cos == 0){  
  94.                 continue;  
  95.             }  
  96.             outKey.set(row_matrix1);  
  97.             outValue.set(row_matrix2 + "_" + df.format(cos));  
  98.             context.write(outKey, outValue);  
  99.         }  
  100.     } catch(Exception e){  
  101.         e.printStackTrace();  
  102.     }  
  103.     }  
  104. }  

<---------------step2的reduce函数----------------->

[java] 

  1. package step2;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Reducer;  
  7. import org.apache.hadoop.mapreduce.Reducer.Context;  
  8.   
  9. @SuppressWarnings("unused")  
  10. public class Reducer2 extends Reducer<Text,Text,Text,Text>{  
  11.     private Text outKey = new Text();  
  12.     private Text outValue = new Text();  
  13.       
  14.     protected void reduce(Text key,Iterable<Text> values,Context context)  
  15.             throws IOException,InterruptedException{  
  16.         try{  
  17.         StringBuilder sb = new StringBuilder();  
  18.         for(Text value:values){  
  19.             sb.append(value + ",");  
  20.         }  
  21.           
  22.         String result = null;  
  23.         if(sb.toString().endsWith(",")){  
  24.             result = sb.substring(0, sb.length()-1);  
  25.         }  
  26.           
  27.         outKey.set(key);  
  28.         outValue.set(result);  
  29.         context.write(outKey, outValue);  
  30.     } catch(Exception e){  
  31.         e.printStackTrace();  
  32.         }  
  33.     }  
  34. }  

<---------------step2的主函数----------------->

[java] 

  1. package step2;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.URI;  
  5. import java.net.URISyntaxException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class MR2 {  
  18.     private static String inputPath = "/ItemCF/step1_output/";//步骤1的输出  
  19.     private static String outputPath = "/ItemCF/step2_output/";//  
  20.     //将step1输出的转置矩阵作为全局缓存  
  21.     private static String cache = "/ItemCF/step1_output/part-r-00000";//步骤1的输出  
  22.     private static String hdfsPath = "hdfs://hadoop:9000";  
  23.       
  24.     public int run(){  
  25.         try{  
  26.             //创建job配置类  
  27.             Configuration conf = new Configuration();  
  28.             //设置hdfs地址  
  29.             conf.set("fs.defaultFS",hdfsPath);  
  30.             //创建job实例  
  31.             Job job = Job.getInstance(conf, "step2");  
  32.             //添加分布式缓存文件  
  33.             job.addCacheArchive(new URI(cache + "#itemUserScore"));  
  34.             //设置Job主类  
  35.             job.setJarByClass(MR2.class);  
  36.             //设置Job的Mapper类和Reducer类  
  37.             job.setMapperClass(Mapper2.class);  
  38.             job.setReducerClass(Reducer2.class);  
  39.             //设置Mapper的输出类型  
  40.             job.setMapOutputKeyClass(Text.class);  
  41.             job.setMapOutputValueClass(Text.class);  
  42.             //设置Reducer的输出类型  
  43.             job.setOutputKeyClass(Text.class);  
  44.             job.setOutputValueClass(Text.class);  
  45.             //设置输入和输出路径  
  46.             FileSystem fs = FileSystem.get(conf);  
  47.             Path inPath = new Path(inputPath);  
  48.             if(fs.exists(inPath)){  
  49.                 FileInputFormat.addInputPath(job, inPath);  
  50.             }  
  51.               
  52.             Path outPath = new Path(outputPath);  
  53.             fs.delete(outPath,true);  
  54.               
  55.             FileOutputFormat.setOutputPath(job, outPath);  
  56.               
  57.             return job.waitForCompletion(true)? 1:0;  
  58.               
  59.         }catch(IOException e){  
  60.             e.printStackTrace();  
  61.         } catch (ClassNotFoundException e) {  
  62.             // TODO Auto-generated catch block  
  63.             e.printStackTrace();  
  64.         } catch (InterruptedException e) {  
  65.             // TODO Auto-generated catch block  
  66.             e.printStackTrace();  
  67.         } catch (URISyntaxException e) {  
  68.             // TODO Auto-generated catch block  
  69.             e.printStackTrace();  
  70.         }  
  71.         return -1;  
  72.    }  
  73.     public static void main(String[] args){  
  74.         int result = -1;  
  75.         result = new MR2().run();  
  76.         if(result == 1){  
  77.             System.out.println("success");  
  78.             }else if(result == -1){  
  79.                 System.out.println("defeat");  
  80.             }  
  81.           
  82.     }  
  83. }  

    经step2阶段输出结果为:

     输出:物品ID(行);物品ID(列)_相似度   /ItemCF/step2_output/
      1 1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
      2 1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
      3 4_0.86,3_1.00,1_0.93
      4 1_0.99,4_1.00,6_0.36,4_0.86,2_0.49
      5 2_0.29,5_1.00,6_0.71

      6 1_0.26,5_0.71,6_1.00,2_0.88,4_0.36

3.将评分矩阵转置。

    输入:步骤1的输出   /ItemCF/step1_output/

<---------------step3的map函数----------------->

[java] 

  1. package step3;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.Text;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.Mapper.Context;  
  9.   
  10. @SuppressWarnings("unused")  
  11. public class Mapper3 extends Mapper<LongWritable,Text,Text,Text>{  
  12.   
  13.     private Text outKey = new Text();  
  14.     private Text outValue = new Text();  
  15.       
  16.       
  17.     protected void map(LongWritable key,Text value,Context context)   
  18.             throws IOException, InterruptedException{  
  19.         String[] rowAndLine = value.toString().split("\t");  
  20.           
  21.         //矩阵的行号  
  22.         String row = rowAndLine[0];  
  23.         String[] lines = rowAndLine[1].split(",");  
  24.           
  25.         //lines:1_1,2_2,3_-2,4_0  
  26.         for(int i = 0;i<lines.length;i++){  
  27.             String column = lines[i].split("_")[0];  
  28.             String valueStr = lines[i].split("_")[1];  
  29.           
  30.             outKey.set(column);  
  31.             outValue.set(row + "_" + valueStr);  
  32.             context.write(outKey, outValue);  
  33.         }  
  34.           
  35.     }  
  36. }  

<---------------step3的reduce函数----------------->

[java] 

  1. package step3;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Reducer;  
  7. import org.apache.hadoop.mapreduce.Reducer.Context;  
  8.   
  9. @SuppressWarnings("unused")  
  10. public class Reducer3 extends Reducer<Text,Text,Text,Text>{  
  11.     private Text outKey = new Text();  
  12.     private Text outValue = new Text();  
  13.       
  14.       
  15.     protected void reduce(Text key,Iterable<Text> values,Context context)  
  16.         throws IOException,InterruptedException{  
  17.             StringBuilder sb = new StringBuilder();  
  18.             for(Text text:values){  
  19.                 //text:row_valueStr  
  20.                 sb.append(text + ",");  
  21.             }  
  22.             String line = null;  
  23.             if(sb.toString().endsWith(",")){  
  24.                 line = sb.substring(0,sb.length()-1);  
  25.             }  
  26.               
  27.             outKey.set(key);  
  28.             outValue.set(line);  
  29.             context.write(outKey, outValue);  
  30.         }  
  31. }  

<---------------step3的主函数----------------->

[java] 

  1. package step3;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.URI;  
  5. import java.net.URISyntaxException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class MR3 {  
  18.     private static String inputPath = "/ItemCF/step1_output/";//步骤1的输出  
  19.     private static String outputPath ="/ItemCF/step3_output/";  
  20.     //将step1输出的转置矩阵作为全局缓存  
  21.     //private static String cache = "/matend";  
  22.     private static String hdfsPath = "hdfs://hadoop:9000";  
  23.       
  24.     public int run(){  
  25.         try{  
  26.             //创建job配置类  
  27.             Configuration conf = new Configuration();  
  28.             //设置hdfs地址  
  29.             conf.set("fs.defaultFS",hdfsPath);  
  30.             //创建job实例  
  31.             Job job = Job.getInstance(conf, "step3");  
  32.             //添加分布式缓存文件  
  33.             //job.addCacheArchive(new URI(cache + "matrix2"));  
  34.             //设置Job主类  
  35.             job.setJarByClass(MR3.class);  
  36.             //设置Job的Mapper类和Reducer类  
  37.             job.setMapperClass(Mapper3.class);  
  38.             job.setReducerClass(Reducer3.class);  
  39.             //设置Mapper的输出类型  
  40.             job.setMapOutputKeyClass(Text.class);  
  41.             job.setMapOutputValueClass(Text.class);  
  42.             //设置Reducer的输出类型  
  43.             job.setOutputKeyClass(Text.class);  
  44.             job.setOutputValueClass(Text.class);  
  45.             //设置输入和输出路径  
  46.             FileSystem fs = FileSystem.get(conf);  
  47.             Path inPath = new Path(inputPath);  
  48.             if(fs.exists(inPath)){  
  49.                 FileInputFormat.addInputPath(job, inPath);  
  50.             }  
  51.               
  52.             Path outPath = new Path(outputPath);  
  53.             fs.delete(outPath,true);  
  54.               
  55.             FileOutputFormat.setOutputPath(job, outPath);  
  56.               
  57.             return job.waitForCompletion(true)? 1:0;  
  58.               
  59.         }catch(IOException e){  
  60.             e.printStackTrace();  
  61.         } catch (ClassNotFoundException e) {  
  62.             // TODO Auto-generated catch block  
  63.             e.printStackTrace();  
  64.         } catch (InterruptedException e) {  
  65.             // TODO Auto-generated catch block  
  66.             e.printStackTrace();  
  67.         }  
  68.         return -1;  
  69.    }  
  70.     public static void main(String[] args){  
  71.         int result = 0;  
  72.         result = new MR3().run();  
  73.         if(result == 1){  
  74.             System.out.println("success");  
  75.             }else if(result == -1){  
  76.                 System.out.println("defeat");  
  77.             }  
  78.           
  79.     }  
  80. }  

      经step3阶段输出结果为:

      输出:用户ID(行);物品ID(列)_分值  /ItemCF/step3_output/
      A 6_5,4_3,2_10,1_2
      B 6_5,5_3,2_3
      C 4_5,3_15,1_5

step4.物品与物品相似度矩阵*评分矩阵(经过步骤3转置)

    输入:步骤2的输出    /ItemCF/step2_output/

    缓存:步骤3的输出    /ItemCF/step3_output/part-r-00000

<---------------step4的map函数----------------->

[java] 

  1. package step4;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.FileReader;  
  5. import java.io.IOException;  
  6. import java.text.DecimalFormat;  
  7. import java.util.ArrayList;  
  8. import java.util.List;  
  9.   
  10. import org.apache.hadoop.io.LongWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.Mapper.Context;  
  14.   
  15. @SuppressWarnings("unused")  
  16. public class Mapper4 extends Mapper<LongWritable,Text,Text,Text> {  
  17.     private Text outKey = new Text();  
  18.     private Text outValue = new Text();  
  19.     private List<String> cacheList = new ArrayList<String>();  
  20.     private DecimalFormat df = new DecimalFormat("0.00");  
  21.       
  22.     protected void setup(Context context) throws IOException,InterruptedException{  
  23.         super.setup(context);  
  24.           
  25.         //通过输入流将全局缓存中的右侧矩阵读入List<String>中  
  26.         FileReader fr = new FileReader("itemUserScore2");  
  27.         BufferedReader br = new BufferedReader(fr);  
  28.           
  29.         /* 
  30.          * row: row tab col_value,col_value,col_value 
  31.          *  
  32.          * */  
  33.         String line = null;  
  34.         while((line=br.readLine()) != null){  
  35.             cacheList.add(line);  
  36.         }  
  37.           
  38.         fr.close();  
  39.         br.close();  
  40.     }  
  41.       
  42.     /* 
  43.      * key:row 
  44.      * value:row tab col_value,col_value,col_value 
  45.      *  
  46.      * */  
  47.     protected void map(LongWritable key,Text value,Context context)   
  48.             throws IOException, InterruptedException{  
  49.         try{  
  50.                       
  51.         //row  
  52.         String row_matrix1 = value.toString().split("\t")[0];  
  53.         //String[] col_value  
  54.         String[] column_array_matrix1 = value.toString().split("\t")[1].split(",");  
  55.           
  56.         for(String line:cacheList){  
  57.             String row_matrix2 = line.toString().split("\t")[0];  
  58.             String[] column_array_matrix2 = line.toString().split("\t")[1].split(",");  
  59.               
  60.             double result = 0;  
  61.             //遍历左矩阵第一行的每一列  
  62.             for(String column_value_matrix1 : column_array_matrix1){  
  63.                 String column_matrix1 = column_value_matrix1.split("_")[0];  
  64.                 String value_matrix1 = column_value_matrix1.split("_")[1];  
  65.                   
  66.                 //遍历右矩阵每一行的每一列  
  67.                 for(String column_value_matrix2 : column_array_matrix2){  
  68.                     if(column_value_matrix2.startsWith(column_matrix1+"_")){  
  69.                         String value_matrix2 = column_value_matrix2.split("_")[1];  
  70.                         result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2);  
  71.                     }  
  72.                 }  
  73.                   
  74.             }  
  75.             if(result == 0){  
  76.                 continue;  
  77.             }  
  78.             outKey.set(row_matrix1);  
  79.             outValue.set(row_matrix2 + "_" + df.format(result));  
  80.               
  81.             context.write(outKey, outValue);  
  82.             }  
  83.         }catch(Exception e){  
  84.             e.printStackTrace();  
  85.         }  
  86.     }  
  87. }  

<---------------step4的reduce函数----------------->

[java] 

  1. package step4;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Reducer;  
  7. import org.apache.hadoop.mapreduce.Reducer.Context;  
  8.   
  9. @SuppressWarnings("unused")  
  10. public class Reducer4 extends Reducer<Text,Text,Text,Text>{  
  11.     private Text outKey = new Text();  
  12.     private Text outValue = new Text();  
  13.       
  14.     protected void reduce(Text key,Iterable<Text> values,Context context)  
  15.             throws IOException,InterruptedException{  
  16.         StringBuilder sb = new StringBuilder();  
  17.         for(Text value:values){  
  18.             sb.append(value + ",");  
  19.         }  
  20.           
  21.         String result = null;  
  22.         if(sb.toString().endsWith(",")){  
  23.             result = sb.substring(0, sb.length()-1);  
  24.         }  
  25.           
  26.         outKey.set(key);  
  27.         outValue.set(result);  
  28.         context.write(outKey, outValue);  
  29.     }  
  30. }  

<---------------step4的主函数----------------->

[java]

  1. package step4;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.URI;  
  5. import java.net.URISyntaxException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class MR4 {  
  18.     private static String inputPath = "/ItemCF/step2_output/";//步骤2的输出  
  19.     private static String outputPath = "/ItemCF/step4_output/";  
  20.     //将step1输出的转置矩阵作为全局缓存  
  21.     private static String cache = "/ItemCF/step3_output/part-r-00000";//步骤3的输出  
  22.     private static String hdfsPath = "hdfs://hadoop:9000";  
  23.       
  24.     public int run(){  
  25.         try{  
  26.             //创建job配置类  
  27.             Configuration conf = new Configuration();  
  28.             //设置hdfs地址  
  29.             conf.set("fs.defaultFS",hdfsPath);  
  30.             //创建job实例  
  31.             Job job = Job.getInstance(conf, "step4");  
  32.             //添加分布式缓存文件  
  33.             job.addCacheArchive(new URI(cache + "#itemUserScore2"));  
  34.             //设置Job主类  
  35.             job.setJarByClass(MR4.class);  
  36.             //设置Job的Mapper类和Reducer类  
  37.             job.setMapperClass(Mapper4.class);  
  38.             job.setReducerClass(Reducer4.class);  
  39.             //设置Mapper的输出类型  
  40.             job.setMapOutputKeyClass(Text.class);  
  41.             job.setMapOutputValueClass(Text.class);  
  42.             //设置Reducer的输出类型  
  43.             job.setOutputKeyClass(Text.class);  
  44.             job.setOutputValueClass(Text.class);  
  45.             //设置输入和输出路径  
  46.             FileSystem fs = FileSystem.get(conf);  
  47.             Path inPath = new Path(inputPath);  
  48.             if(fs.exists(inPath)){  
  49.                 FileInputFormat.addInputPath(job, inPath);  
  50.             }  
  51.               
  52.             Path outPath = new Path(outputPath);  
  53.             fs.delete(outPath,true);  
  54.               
  55.             FileOutputFormat.setOutputPath(job, outPath);  
  56.               
  57.             return job.waitForCompletion(true)? 1:0;  
  58.               
  59.         }catch(IOException e){  
  60.             e.printStackTrace();  
  61.         } catch (ClassNotFoundException e) {  
  62.             // TODO Auto-generated catch block  
  63.             e.printStackTrace();  
  64.         } catch (InterruptedException e) {  
  65.             // TODO Auto-generated catch block  
  66.             e.printStackTrace();  
  67.         } catch (URISyntaxException e) {  
  68.             // TODO Auto-generated catch block  
  69.             e.printStackTrace();  
  70.         }  
  71.         return -1;  
  72.    }  
  73.     public static void main(String[] args){  
  74.         try{  
  75.             int result = 0;  
  76.           
  77.         result = new MR4().run();  
  78.         if(result == 1){  
  79.             System.out.println("success");  
  80.             }else if(result == -1){  
  81.                 System.out.println("defeat");  
  82.             }  
  83.      } catch (Exception e){  
  84.          e.printStackTrace();  
  85.      }  
  86.     }  
  87. }  

        经step4阶段输出结果为:

         输出:物品ID(行);用户ID(列)_分值   /ItemCF/step4_output/
      1 A_9.87,B_2.38,C_23.90
      2 A_16.59,B_8.27,C_4.25
      3 C_23.95,A_4.44
      4 B_3.27,C_22.85,A_11.68
      5 A_6.45,B_7.42

      6 C_3.10,A_15.40,B_9.77

step5.根据评分矩阵,将步骤4的输出中,用户已经有过行为的商品评分置0 

    输入:步骤4的输出    /ItemCF/step4_output/

    缓存:步骤1的输出    /ItemCF/step1_output/part-r-00000

<---------------step5的map函数----------------->

[java] 

  1. package step5;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.FileReader;  
  5. import java.io.IOException;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Mapper.Context;  
  13.   
  14. public class Mapper5 extends Mapper<LongWritable,Text,Text,Text>{  
  15.     private Text outKey = new Text();  
  16.     private Text outValue = new Text();  
  17.     private List<String> cacheList = new ArrayList<String>();  
  18.       
  19.     protected void setup(Context context) throws IOException,InterruptedException{  
  20.         super.setup(context);  
  21.           
  22.         //通过输入流将全局缓存中的右侧矩阵读入List<String>中  
  23.         FileReader fr = new FileReader("itemUserScore3");  
  24.         BufferedReader br = new BufferedReader(fr);  
  25.           
  26.         String line = null;  
  27.         while((line=br.readLine()) != null){  
  28.             cacheList.add(line);  
  29.         }  
  30.           
  31.         fr.close();  
  32.         br.close();  
  33.     }  
  34.       
  35.     protected void map(LongWritable key,Text value,Context context)   
  36.             throws IOException, InterruptedException{  
  37.         String item_matrix1 = value.toString().split("\t")[0];//物品ID   
  38.         String[] user_score_array_matrix1 = value.toString().split("\t")[1].split("1");  
  39.           
  40.         for(String line: cacheList){  
  41.             String item_matrix2 = value.toString().split("\t")[0];//物品ID   
  42.             String[] user_score_array_matrix2 = value.toString().split("\t")[1].split("1");  
  43.               
  44.             //如果物品ID相同  
  45.             if(item_matrix1 == item_matrix2){  
  46.                 boolean flag = false;  
  47.                 for(String user_score_matrix1:user_score_array_matrix1){  
  48.                     String user_matrix1 = user_score_matrix1.split("_")[0];  
  49.                     String score_matrix1 = user_score_matrix1.split("_")[1];  
  50.                       
  51.                     for(String user_score_matrix2:user_score_array_matrix1){  
  52.                         String user_matrix2 = user_score_matrix2.split("_")[0];  
  53.                         if(user_matrix1 == user_matrix2){  
  54.                             flag = true;  
  55.                         }  
  56.                     }  
  57.                       
  58.                     if(flag == false){  
  59.                         outKey.set(user_matrix1);  
  60.                         outValue.set(item_matrix1 + "_" + score_matrix1);  
  61.                         context.write(outKey, outValue);  
  62.                     }  
  63.                 }  
  64.             }  
  65.         }  
  66.     }  
  67. }  

<---------------step5的reduce函数----------------->

[java] 

  1. package step5;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Reducer;  
  7. import org.apache.hadoop.mapreduce.Reducer.Context;  
  8.   
  9. @SuppressWarnings("unused")  
  10. public class Reducer5 extends Reducer<Text,Text,Text,Text>{  
  11.     private Text outKey = new Text();  
  12.     private Text outValue = new Text();  
  13.       
  14.     protected void reduce(Text key,Iterable<Text> values,Context context)  
  15.             throws IOException,InterruptedException{  
  16.         StringBuilder sb = new StringBuilder();  
  17.         for(Text value:values){  
  18.             sb.append(value + ",");  
  19.         }  
  20.           
  21.         String line = null;  
  22.         if(sb.toString().endsWith(",")){  
  23.             line = sb.substring(0, sb.length()-1);  
  24.         }  
  25.           
  26.         outKey.set(key);  
  27.         outValue.set(line);  
  28.         context.write(outKey, outValue);  
  29.     }  
  30. }  

<---------------step5的主函数----------------->

[java] 

  1. package step5;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.URI;  
  5. import java.net.URISyntaxException;  
  6.   
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.FileSystem;  
  9. import org.apache.hadoop.fs.Path;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14.   
  15.   
  16.   
  17. public class MR5 {  
  18.     private static String inputPath = "/ItemCF/step4_output/";//步骤4的输出  
  19.     private static String outputPath = "/ItemCF/step5_output/";  
  20.     //将step1输出的转置矩阵作为全局缓存  
  21.     private static String cache = "/ItemCF/step1_output/part-r-00000";//步骤1的输出  
  22.     private static String hdfsPath = "hdfs://hadoop:9000";  
  23.       
  24.     public int run(){  
  25.         try{  
  26.             //创建job配置类  
  27.             Configuration conf = new Configuration();  
  28.             //设置hdfs地址  
  29.             conf.set("fs.defaultFS",hdfsPath);  
  30.             //创建job实例  
  31.             Job job = Job.getInstance(conf, "step5");  
  32.             //添加分布式缓存文件  
  33.             job.addCacheArchive(new URI(cache + "#itemUserScore3"));  
  34.             //设置Job主类  
  35.             job.setJarByClass(MR5.class);  
  36.             //设置Job的Mapper类和Reducer类  
  37.             job.setMapperClass(Mapper5.class);  
  38.             job.setReducerClass(Reducer5.class);  
  39.             //设置Mapper的输出类型  
  40.             job.setMapOutputKeyClass(Text.class);  
  41.             job.setMapOutputValueClass(Text.class);  
  42.             //设置Reducer的输出类型  
  43.             job.setOutputKeyClass(Text.class);  
  44.             job.setOutputValueClass(Text.class);  
  45.             //设置输入和输出路径  
  46.             FileSystem fs = FileSystem.get(conf);  
  47.             Path inPath = new Path(inputPath);  
  48.             if(fs.exists(inPath)){  
  49.                 FileInputFormat.addInputPath(job, inPath);  
  50.             }  
  51.               
  52.             Path outPath = new Path(outputPath);  
  53.             fs.delete(outPath,true);  
  54.               
  55.             FileOutputFormat.setOutputPath(job, outPath);  
  56.               
  57.             return job.waitForCompletion(true)? 1:0;  
  58.               
  59.         }catch(IOException e){  
  60.             e.printStackTrace();  
  61.         } catch (ClassNotFoundException e) {  
  62.             // TODO Auto-generated catch block  
  63.             e.printStackTrace();  
  64.         } catch (InterruptedException e) {  
  65.             // TODO Auto-generated catch block  
  66.             e.printStackTrace();  
  67.         } catch (URISyntaxException e) {  
  68.             // TODO Auto-generated catch block  
  69.             e.printStackTrace();  
  70.         }  
  71.         return -1;  
  72.    }  
  73.     public static void main(String[] args){  
  74.         try{  
  75.             int result = 0;  
  76.           
  77.         result = new MR5().run();  
  78.         if(result == 1){  
  79.             System.out.println("success");  
  80.             }else if(result == -1){  
  81.                 System.out.println("defeat");  
  82.             }  
  83.      } catch (Exception e){  
  84.          e.printStackTrace();  
  85.      }  
  86.     }  
  87. }  

   经step5阶段输出结果为:

   输出:用户ID(行);物品ID(列);分值(最终的推荐列表)/ItemCF/step5_output/
      A 5_6.45,3_4.44
      B 4_3.27,1_2.38

      C 6_3.10,2_4.25

通过以上各个函数分布测试无误后,我们可以编写如下函数,统一运行。(注:在统一运行前应将各步骤的运行结果删除)

[java] 

  1. package Job;  
  2.   
  3. import step1.MR1;  
  4. import step2.MR2;  
  5. import step3.MR3;  
  6. import step4.MR4;  
  7. import step5.MR5;  
  8.   
  9. public class JobRunner {  
  10.     public static void main(String[] args){  
  11.         int status1 = -1;  
  12.         int status2 = -1;  
  13.         int status3 = -1;  
  14.         int status4 = -1;  
  15.         int status5 = -1;  
  16.           
  17.         status1 = new MR1().run();  
  18.         if(status1 == 1){  
  19.             System.out.println("step1 success,begin step2");  
  20.             status2 = new MR2().run();  
  21.         }else{  
  22.             System.out.println("step1 failuer");  
  23.         }  
  24.         if(status2 == 1){  
  25.             System.out.println("step2 success,begin step3");  
  26.             status3 = new MR3().run();  
  27.         }else{  
  28.             System.out.println("step2 failuer");  
  29.         }  
  30.         if(status3 == 1){  
  31.             System.out.println("step3 success,begin step4");  
  32.             status4 = new MR4().run();  
  33.         }else{  
  34.             System.out.println("step3 failuer");  
  35.         }  
  36.         if(status4 == 1){  
  37.             System.out.println("step4 success,begin step5");  
  38.             status5 = new MR5().run();  
  39.         }else{  
  40.             System.out.println("step4 failuer");  
  41.         }  
  42.         if(status5 == 1){  
  43.             System.out.println("step5 success,program over");  
  44.             //status2 = new MR5().run();  
  45.         }else{  
  46.             System.out.println("step5 failuer");  
  47.         }  
  48.     }  
  49. }  


以上,就是基于物品推荐算法的所有代码^_^


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
114人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序