JobPlus知识库 IT 大数据 文章
mapreduce编程模板

1.MapReduce 编程模型的5个步骤:

  1)迭代,将输入数据解析成 key/value 对;

  2)将解析的 key/value经过Map处理映射成另一组key/value对;

  3)根据key进行分组;

  4)以分组为单位进行归约(Reduce 过程);

  5)迭代,输出最终结果。

2.MapReduce编程模型模板:

  在进行编程过程只需改变Map()和Reduce()方法,如果没有Reduce过程时需要对run()作适当调整。

1 import java.io.IOException;  

2 import java.util.jar.JarException;  

3 import org.apache.hadoop.conf.Configuration;  

4 import org.apache.hadoop.conf.Configured;  

5 import org.apache.hadoop.fs.Path;  

6 import org.apache.hadoop.io.LongWritable;  

7 import org.apache.hadoop.io.NullWritable;  

8 import org.apache.hadoop.io.Text;  

9 import org.apache.hadoop.mapreduce.Job; 

10 import org.apache.hadoop.mapreduce.Mapper; 

11 import org.apache.hadoop.mapreduce.Reducer; 

12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 

14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

16 import org.apache.hadoop.util.Tool; 

17 import org.apache.hadoop.util.ToolRunner; 

18 import org.apache.jasper.compiler.JavacErrorDetail; 

19 

20 public class Example extends Configured implements Tool{ 

21     

22     enum Counter{ 

23         LINESKIP; //输出错误行 

24     } 

25     

26     /*MapClass 

27      * Mapper< 

28      * LongWritable            输入的 key 

29      * Text                 输入的 value 

30      * NullWritable/Text    输出的 key 

31      * Text                    输出的 value 

32      * > 

33      * */ 

34     //public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>   //没有Reduce过程时 

35     public static class Map extends Mapper<LongWritable,Text,Text,Text>{ 

36         public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException 

37         { 

38             String mydata=value.toString();   //读取源数据 

39              try{ 

40                 //数据处理、

41                 String[] mydataSplite=mydata.split("");  //数据切分

42                 String aData=mydataSplite[0]; 

43                 String bData=mydataSplite[1]; 

44                 

45                 /* 

46                  * 没有Reduce过程时 

47                  * Text outText=new Text(aData+""+bData); 

48        * context.write(new Text(aData), new Text(bData));  //输出 key/value ,NullWritable.get()避免输出制表符 

49                 */ 

50                 

51                 context.write(new Text(aData), new Text(bData));  //输出 key/value 

52             }catch(java.lang.ArrayIndexOutOfBoundsException e) 

53             { 

54                 context.getCounter(Counter.LINESKIP).increment(1);  //出错计数+1 

55                 return; 

56             } 

57         } 

58     } 

59     

60     /*Reduce静态类 

61      * Reducer< 

62      * Text,    输入的 key 

63      * Text,    输入的 value 

64      * Text,    输出的 key

65      * Text     输出的 value 

66      * 

67      * Reduce 的输入格式应与 Map的输出格式一致

68      * > 

69      * */ 

70     public static class Reduce extends Reducer<Text,Text,Text,Text> 

71     { 

72public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException 

73         { 

74             String valuString; 

75             String outString=""; 

76             for (Text value : values) 

77             { 

78                 valuString=value.toString(); 

79                 outString+=valuString+","; 

80             } 

81             context.write(key, new Text(outString));  //输出参数与定义格式一致,如果不是制表符分离的要换成空值 

82         } 

83     } 

84     

85     /*run设置运行任务*/

86     public int run(String[] args) throws Exception { 

87         Configuration conf = getConf(); 

88        

89         Job job = new Job(conf,"Example");   //作务名 

90         job.setJarByClass(Example.class);    //选择class 

91         

92         FileInputFormat.setInputPaths(job, new Path(args[0]));    //输入路径 

93         FileOutputFormat.setOutputPath(job, new Path(args[1]));   //输出路径 

94         

95         job.setMapperClass(Map.class);           //调用 Map class启动Map task     

96         job.setReducerClass(Reduce.class);       //调用 Reduce class 雇用 Reduce task 

97         job.setCombinerClass(Reduce.class);

98         job.setInputFormatClass(TextInputFormat.class);    //输入格式 

99         job.setOutputFormatClass(TextOutputFormat.class);  //输出格式 

100         //job.setOutputKeyClass(NullWritable.class);     //没有Reduce过程时,输出 key 格式,应该与指定的格式一致 

101         job.setOutputKeyClass(Text.class);                 //输出 key 格式,应该与指定的格式一致 

102         job.setOutputValueClass(Text.class);               //输出 value 格式   

103         

104         System.exit(job.waitForCompletion(true)?0:1); 

105         return 0; 

106     } 

107     

108     /*主函数入口*/ 

109     public static void main(String[] args) throws Exception { 

110         int res = ToolRunner.run(new Configuration(),new Example(),args); 

111         System.exit(res); 

112     } 

113     

114 }


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
182人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序