1.MapReduce 编程模型的5个步骤:
1)迭代,将输入数据解析成 key/value 对;
2)将解析的 key/value经过Map处理映射成另一组key/value对;
3)根据key进行分组;
4)以分组为单位进行归约(Reduce 过程);
5)迭代,输出最终结果。
2.MapReduce编程模型模板:
在进行编程过程只需改变Map()和Reduce()方法,如果没有Reduce过程时需要对run()作适当调整。
1 import java.io.IOException;
2 import java.util.jar.JarException;
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.conf.Configured;
5 import org.apache.hadoop.fs.Path;
6 import org.apache.hadoop.io.LongWritable;
7 import org.apache.hadoop.io.NullWritable;
8 import org.apache.hadoop.io.Text;
9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.Mapper;
11 import org.apache.hadoop.mapreduce.Reducer;
12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
16 import org.apache.hadoop.util.Tool;
17 import org.apache.hadoop.util.ToolRunner;
18 import org.apache.jasper.compiler.JavacErrorDetail;
19
20 public class Example extends Configured implements Tool{
21
22 enum Counter{
23 LINESKIP; //输出错误行
24 }
25
26 /*MapClass
27 * Mapper<
28 * LongWritable 输入的 key
29 * Text 输入的 value
30 * NullWritable/Text 输出的 key
31 * Text 输出的 value
32 * >
33 * */
34 //public static class Map extends Mapper<LongWritable,Text,NullWritable,Text> //没有Reduce过程时
35 public static class Map extends Mapper<LongWritable,Text,Text,Text>{
36 public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException
37 {
38 String mydata=value.toString(); //读取源数据
39 try{
40 //数据处理、
41 String[] mydataSplite=mydata.split(""); //数据切分
42 String aData=mydataSplite[0];
43 String bData=mydataSplite[1];
44
45 /*
46 * 没有Reduce过程时
47 * Text outText=new Text(aData+""+bData);
48 * context.write(new Text(aData), new Text(bData)); //输出 key/value ,NullWritable.get()避免输出制表符
49 */
50
51 context.write(new Text(aData), new Text(bData)); //输出 key/value
52 }catch(java.lang.ArrayIndexOutOfBoundsException e)
53 {
54 context.getCounter(Counter.LINESKIP).increment(1); //出错计数+1
55 return;
56 }
57 }
58 }
59
60 /*Reduce静态类
61 * Reducer<
62 * Text, 输入的 key
63 * Text, 输入的 value
64 * Text, 输出的 key
65 * Text 输出的 value
66 *
67 * Reduce 的输入格式应与 Map的输出格式一致
68 * >
69 * */
70 public static class Reduce extends Reducer<Text,Text,Text,Text>
71 {
72public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
73 {
74 String valuString;
75 String outString="";
76 for (Text value : values)
77 {
78 valuString=value.toString();
79 outString+=valuString+",";
80 }
81 context.write(key, new Text(outString)); //输出参数与定义格式一致,如果不是制表符分离的要换成空值
82 }
83 }
84
85 /*run设置运行任务*/
86 public int run(String[] args) throws Exception {
87 Configuration conf = getConf();
88
89 Job job = new Job(conf,"Example"); //作务名
90 job.setJarByClass(Example.class); //选择class
91
92 FileInputFormat.setInputPaths(job, new Path(args[0])); //输入路径
93 FileOutputFormat.setOutputPath(job, new Path(args[1])); //输出路径
94
95 job.setMapperClass(Map.class); //调用 Map class启动Map task
96 job.setReducerClass(Reduce.class); //调用 Reduce class 雇用 Reduce task
97 job.setCombinerClass(Reduce.class);
98 job.setInputFormatClass(TextInputFormat.class); //输入格式
99 job.setOutputFormatClass(TextOutputFormat.class); //输出格式
100 //job.setOutputKeyClass(NullWritable.class); //没有Reduce过程时,输出 key 格式,应该与指定的格式一致
101 job.setOutputKeyClass(Text.class); //输出 key 格式,应该与指定的格式一致
102 job.setOutputValueClass(Text.class); //输出 value 格式
103
104 System.exit(job.waitForCompletion(true)?0:1);
105 return 0;
106 }
107
108 /*主函数入口*/
109 public static void main(String[] args) throws Exception {
110 int res = ToolRunner.run(new Configuration(),new Example(),args);
111 System.exit(res);
112 }
113
114 }
登录 | 立即注册