MapReduce二次排序
默认情况下,Map 输出的结果会对 Key 进行默认的排序,但是有时候需要对 Key 排序的同时再对 Value 进行排序,这时候就要用到二次排序了。下面让我们来介绍一下什么是二次排序。
二次排序原理
我们把二次排序主要分为以下几个阶段。
Map 起始阶段
在Map阶段,使用 job.setInputFormatClass() 定义的 InputFormat ,将输入的数据集分割成小数据块 split,同时 InputFormat 提供一个 RecordReader的实现。本课程中使用的是 TextInputFormat,它提供的 RecordReader 会将文本的行号作为 Key,这一行的文本作为 Value。这就是自定义 Mapper 的输入是 < LongWritable,Text> 的原因。然后调用自定义 Mapper 的map方法,将一个个< LongWritable,Text>键值对输入给 Mapper 的map方法。
Map 最后阶段
在 Map 阶段的最后,会先调用 job.setPartitionerClass() 对这个 Mapper 的输出结果
进行分区,每个分区映射到一个Reducer。每个分区内又调用
job.setSortComparatorClass() 设置的 Key 比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过 job.setSortComparatorClass() 设置 Key 比较函数类,则使用 Key 实现的 compareTo() 方法。我们既可以使用 IntPair 实现的 compareTo() 方法,也可以专门定义 Key 比较函数类。
Reduce 阶段
在 Reduce 阶段,reduce() 方法接受所有映射到这个 Reduce 的 map 输出后,也是会
调用 job.setSortComparatorClass()方法设置的 Key 比较函数类,对所有数据进行排序。然
后开始构造一个 Key 对应的 Value 迭代器。这时就要用到分组,使用
job.setGroupingComparatorClass()方法设置分组函数类。只要这个比较器比较的两个 Key
相同,它们就属于同一组,它们的 Value 放在一个 Value 迭代器,而这个迭代器的 Key 使用属于同一个组的所有Key的第一个Key。最后就是进入 Reducer 的 reduce() 方法, reduce() 方法的输入是所有的 Key 和它的 Value 迭代器,同样注意输入与输出的类型必须与自定义的 Reducer 中声明的一致。
接下来我们通过数据示例,可以很直观的了解二次排序的原理。
输入数据
输入文件sort.txt内容为:
输出文件的内容(从小到大排序)如下:
从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果。
二次排序的具体流程
在 MapReduce 中,所有的 Key 是需要被比较和排序的,而且是二次,先根据Partitioner,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
1、自定义 key
所有自定义的 key 应该实现接口 WritableComparable,因为它是可序列化的并且可
比较的。WritableComparable 的内部方法如下所示。
[java]
- //反序列化,从流中的二进制转换成IntPair public void readFields(DataInput in) throws IOException
- //序列化,将IntPair转化成使用流传送的二进制 public void write(DataOutput out)
- //key的比较
- public int compareTo(IntPair o)
- //默认的分区类 HashPartitioner,使用此方法 public int hashCode()
- //默认实现
- public boolean equals(Object right)
2、自定义分区
自定义分区函数类 FirstPartitioner,是 key 的第一次比较,完成对所有 key 的排序。
[java]
- public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>
- 在 job 中使用 setPartitionerClasss()方法设置 Partitioner。
- job.setPartitionerClasss(FirstPartitioner.Class);
3、Key 的比较类
这是 Key 的第二次比较,对所有的 Key 进行排序,即同时完成IntPair中的first和second排序。该类是一个比较器,可以通过两种方式实现。
1) 继承 WritableComparator。
[java]
- public static class KeyComparator extends WritableComparator
- 必须有一个构造函数,并且重载以下方法。
- public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
[java]
- 上面两种实现方式,在 Job 中,可以通过setSortComparatorClass()方法来设置Key的比较类。
- job.setSortComparatorClass(KeyComparator.Class);
注意:如果没有使用自定义的 SortComparator 类,则默认使用 Key 中compareTo()
方法对 Key 排序分组。
4、定义分组类函数
在 Reduce 阶段,构造一个与 Key 相对应的 Value 迭代器的时候,只要 first 相同就属于同一个组,放在一个 Value 迭代器。定义这个比较器,可以有两种方式。
1) 继承 WritableComparator。
[java]
- public static class GroupingComparator extends WritableComparator
必须有一个构造函数,并且重载以下方法。
[java]
- public int compare(WritableComparable w1, WritableComparable w2)
2) 实现接口 RawComparator。
上面两种实现方式,在 Job 中,可以通过 setGroupingComparatorClass()方法来设置分组类。
[java]
- job.setGroupingComparatorClass(GroupingComparator.Class);
另外注意的是,如果reduce的输入与输出不是同一种类型,则 Combiner和Reducer不能共用 Reducer 类,因为 Combiner 的输出是 reduce 的输入。除非重新定义一个
Combiner。
代码实现
Hadoop 的 example 包中自带了一个 MapReduce 的二次排序算法,下面这个示例对 example 包中的二次排序源码的改进。我们按照以下几步完成二次排序:
第一步:自定义IntPair类,将示例数据中的key/value封装成一个整体作为Key,同时实现 WritableComparable 接口并重写其方法。
[java]
- /**
- * 自己定义的key类应该实现WritableComparable接口 */
- public class IntPair implements WritableComparable<IntPair>{ int first;//第一个成员变量 int second;//第二个成员变量
- public void set(int left, int right){ first = left;
- second = right;
- }
- public int getFirst(){ return first;
- }
- public int getSecond(){ return second;
- }
- @Override
- //反序列化,从流中的二进制转换成IntPair public void readFields(DataInput in) throws IOException{
- first = in.readInt(); second = in.readInt();
- }
- @Override
- //序列化,将IntPair转化成使用流传送的二进制 public void write(DataOutput out) throws IOException{
- out.writeInt(first);
- out.writeInt(second);
- }
- @Override //key的比较
- public int compareTo(IntPair o)
- {
- // TODO Auto‐generated method stub if (first != o.first){
- return first < o.first ? -1 : 1; }else if (second != o.second){
- return second < o.second ? -1 : 1; }else{
- return 0;
- }
- }
- @Override
- public int hashCode(){
- return first * 157 + second;
- }
- @Override
- public boolean equals(Object right){ if (right == null)
- return false; if (this == right) return true;
- if (right instanceof IntPair){ IntPair r = (IntPair) right;
- return r.first == first && r.second == second; }else{
- return false;
- }
- }
- }
- 第二步:自定义分区函数类FirstPartitioner,根据 IntPair 中的first实现分区。
- /**
- * 分区函数类。根据first确定Partition。 */
- public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>{ @Override
- public int getPartition(IntPair key, IntWritable value,int numPartitions){ return Math.abs(key.getFirst() * 127) % numPartitions;
- }
- }
- 第三步:自定义 SortComparator 实现 IntPair 类中的first和second排序。本课程中没
- 有使用这种方法,而是使用 IntPair 中的compareTo()方法实现的。
- 第四步:自定义 GroupingComparator 类,实现分区内的数据分组。
- /** *继承WritableComparator */
- public static class GroupingComparator extends WritableComparator{ protected GroupingComparator(){
- super(IntPair.class, true);
- }
- @Override
- //Compare two WritableComparables.
- public int compare(WritableComparable w1, WritableComparable w2){ IntPair ip1 = (IntPair) w1;
- IntPair ip2 = (IntPair) w2; int l = ip1.getFirst(); int r = ip2.getFirst();
- return l == r ? 0 : (l < r ? -1 : 1);
- }
- }
- 第五步:编写 MapReduce 主程序实现二次排序。
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.io.WritableComparator;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class SecondarySort{
- // 自定义map
- public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>{
- private final IntPair intkey = new IntPair();
- private final IntWritable intvalue = new IntWritable();
- public void map(LongWritable key, Text value, Context context) throws IOException, Interrupted
- String line = value.toString();
- StringTokenizer tokenizer = new StringTokenizer(line);
- int left = 0;
- int right = 0;
- if (tokenizer.hasMoreTokens()){
- left = Integer.parseInt(tokenizer.nextToken());
- if (tokenizer.hasMoreTokens())
- right = Integer.parseInt(tokenizer.nextToken());
- intkey.set(left, right);
- intvalue.set(right);
- context.write(intkey, intvalue);
- }
- }
- }
- // 自定义reduce
- public static class Reduce extends Reducer< IntPair, IntWritable, Text, IntWritable>{ private final Text left = new Text();
- public void reduce(IntPair key, Iterable< IntWritable> values,Context context) throws IOExcepti left.set(Integer.toString(key.getFirst()));
- for (IntWritable val : values){ context.write(left, val);
- }
- }
- }
- /**
- * @param args */
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundExce // TODO Auto‐generated method stub
- Configuration conf = new Configuration();
- Job job = new Job(conf, "secondarysort"); job.setJarByClass(SecondarySort.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));//输入路径
- FileOutputFormat.setOutputPath(job, new Path(args[1]));//输出路径
- job.setMapperClass(Map.class);// Mapper job.setReducerClass(Reduce.class);// Reducer
- job.setPartitionerClass(FirstPartitioner.class);// 分区函数//job.setSortComparatorClass(KeyComparator.Class);//本课程并没有自定义SortComparator,而是使用In job.setGroupingComparatorClass(GroupingComparator.class);// 分组函数
- job.setMapOutputKeyClass(IntPair.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
登录 | 立即注册