JobPlus知识库 IT 工业智能4.0 文章
MapperReduce的使用及高级功能

涉及到的知识:

  1. Yarn调度MapReduce的过程
  2. Mapper
  3. Reduce
  4. 排序
  5. 分区
  6. Combiner
  7. Shuffle

Yarn调度MapReduce的过程

Yarn和MapReduce的关系,就跟我们web项目中和tomcat的关系一样,Yarn是MapReduce运行的容器。下面我们先介绍一下Yarn的一些概念。

Yarn也是主从结构:ResourceManager(主节点)、NodeManager(从节点–真正干活的)。

(1)ResourceManager资源管理器

  • 接收客户端的请求:执行任务
  • 分配资源
  • 分配任务

(2)NodeManager阶段管理器(运行任务MapReduce)

  • 从DataNode上获取数据,执行任务

下面来一张Yarn调度MapReduce的原理图:

在原理图里面涉及到一个资源分配的关系,Yarn资源分配的方式有三种:

  1. FIFO Scheduler:先来先得。缺点:没有考虑任务的优先级

  2. Capacity Scheduler:容器管理。

  3. Fair Scheduler:公平调度。注意:安装配置Hive on Spark,需要配置Yarn为Fair Scheduler

    前提:假设每个任务具有相同的优先级,平均分配系统的资源(不过可以配置任务权重,把优先级考虑进去了)

后面会涉及到这一方面的问题,到时候在说明。

MapReduce的原理和运用

在Hadoop背景知识中,我们已经介绍过什么是MapReduce,并且在Hadoop环境搭建里面运行过系统的wordcount jar,获取到以下结果: 

现在我们通过详细分析wordcount的MapReduce过程来实现自己的wordcount程序,并且加入排序(默认是字典顺序,即上面的结果,我们将其改为逆字典顺序)。在写代码之前我们先来分析wordcount的MapReduce流程:

从上面的原理图我们可以看出一共是分为两个阶段:Mapper、Reduce,Mapper的输出作为Reduce的输入,Reduce完成之后输出到HDFS。下面我们就来实现一个自己的MapReduce:

//Mapper                                k1          v1    k2      v2

class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    /**

     *  context表示Mapper的上下文

     *  上文:HDFS

     *  下文:Mapper

     */

    @Override

    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

        //获取数据:I love Beijing

        String data = v1.toString();

        //分词

        String[] words = data.split(" ");

        //输出k2 v2

        for (String word : words) {

            context.write(new Text(word),new IntWritable(1));

        }

    }

 }


// Reduce                                k3     v3        k4     v4

class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {

    /**

     *

  context表示Reducer的上下文

     *  上文:Mapper

     *  下文:HDFS

     */

    @Override

    protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {

        //对v3求和

        int total = 0;

        for (IntWritable v : v3) {

            total += v.get();

        }

        //输出        k4单词        v4频率

        context.write(k3,new IntWritable(total));

    }

 }


//自己的比较器(根据不同类型,继承不同的Comparator,这里使用过的是Text)

class MyNumberComparator extends Text.Comparator {

    @Override

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        //定义自己的排序比较规则:改成降序

        return -super.compare(b1, s1, l1, b2, s2, l2);

    }

 }


//程序入口class WordCountMain {

    public static void main(String[] args) throws Exception{

        // 创建一个job和任务入口

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WordCountMain.class);  //main方法所在的class

        //指定job的mapper和输出的类型<k2 v2>

        job.setMapperClass(WordCountMapper.class);

        job.setMapOutputKeyClass(Text.class);    //k2的类型

        job.setMapOutputValueClass(IntWritable.class);  //v2的类型

        //指定自己的比较器

        job.setSortComparatorClass(MyNumberComparator.class); 

       //指定job的reducer和输出的类型<k4  v4>

        job.setReducerClass(WordCountReduce.class);

        job.setOutputKeyClass(Text.class);  //k4的类型

        job.setOutputValueClass(IntWritable.class);  //v4的类型

        //指定job的输入和输出

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行job

        job.waitForCompletion(true);

    } 

 }


把这个程序导出成jar,然后上传到linux,执行hadoop jar xxx.jar /input/data.txt /output/0506/word_count2命令即可。结果如下:

我们得到了想要的结果,MapReduce的排序,默认情况下:数字(升序)、字符串(字典顺序)。到这里关于Mapper、Reducer、排序就已经完成了。

注意:写mapreduce程序要导入jar包作为依赖,jar在common和mapreduce中,自行拷贝即可。

分区partition

我们先以Oracle数据库为例,解释什么是分区、什么是Hash分区,然后使用MapReduce实现分区的功能。

下面是分区的原理图:

从上面可以看出,分区的本质就是将全表进行划分,从而提高查询效率。上图中讲述到一个比较重要的概念叫作Hash分区,我们这里一起说明一下,然后开始动手写代码。

关于分区的概念已经都说明完了,现在我们开始上代码了:

//数据类:员工表的一行数据-->7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30

public class Employee implements Writable{

    private int empno;

    private String ename;

    private String job;

    private int mgr;

    private String hiredate;

    private int sal;

    private int comm;

    private int deptno;

    @Override

    public String toString() {

        return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";

    }

    @Override

    public void readFields(DataInput input) throws IOException {

        // 反序列化

        this.empno = input.readInt();

        this.ename = input.readUTF();

        this.job = input.readUTF();

        this.mgr = input.readInt();

        this.hiredate = input.readUTF();

        this.sal = input.readInt();

        this.comm = input.readInt();

        this.deptno = input.readInt();

    }

    @Override

    public void write(DataOutput output) throws IOException {

        // 序列化

        output.writeInt(this.empno);

        output.writeUTF(this.ename);

        output.writeUTF(this.job);

        output.writeInt(this.mgr);

        output.writeUTF(this.hiredate);

        output.writeInt(this.sal);

        output.writeInt(this.comm);

        output.writeInt(this.deptno);

    }

      public int getEmpno() {

        return empno;

    }

    public void setEmpno(int empno) {

        this.empno = empno;

    }

    public String getEname() {

        return ename;

    }

    public void setEname(String ename) {

        this.ename = ename;

    }

    public String getJob() {

        return job;

    }

    public void setJob(String job) {

        this.job = job;

    }

    public int getMgr() {

        return mgr;

    }

    public void setMgr(int mgr) {

        this.mgr = mgr;

    }

    public String getHiredate() {

        return hiredate;

    }

    public void setHiredate(String hiredate) {

        this.hiredate = hiredate;

    }

    public int getSal() {

        return sal;

    }

    public void setSal(int sal) {

        this.sal = sal;

    }

    public int getComm() {

        return comm;

    }

    public void setComm(int comm) {

        this.comm = comm;

    }

    public int getDeptno() {

        return deptno;

    }

    public void setDeptno(int deptno) {

        this.deptno = deptno;

    }

 }


//                                                                      k2 部门号  v2 员工

public class PartEmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {

    @Override

    protected void map(LongWritable key1, Text value1, Context context)

            throws IOException, InterruptedException {

        //数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30

        String data = value1.toString();

        //分词

        String[] words = data.split(",");

        //创建员工对象

        Employee e = new Employee();

        //设置员工的属性

        //员工号

        e.setEmpno(Integer.parseInt(words[0]));

        //姓名

        e.setEname(words[1]);

        //职位

        e.setJob(words[2]);

        //老板号: 注意 可能没有老板号

        try{

            e.setMgr(Integer.parseInt(words[3]));

        }

catch(Exception ex){

            //没有老板号

            e.setMgr(-1);

        }

        //入职日期

        e.setHiredate(words[4]);

        //月薪

        e.setSal(Integer.parseInt(words[5]));

        //奖金:注意:奖金也可能没有

        try{

            e.setComm(Integer.parseInt(words[6]));

        }catch(Exception ex){

            //没有奖金

            e.setComm(0);

        }

        //部门号

        e.setDeptno(Integer.parseInt(words[7]));

        //输出:k2 部门号    v2  员工对象

        context.write(new IntWritable(e.getDeptno()),e);

    }

 }


public class PartEmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Employee> {

    @Override

    protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)

            throws IOException, InterruptedException {

        /*

         * k3 部门号

         * v3 部门的员工

         */

        for(Employee e:v3){

            context.write(k3, e);

        }

    }

 }



/*

 * 建立自己的分区规则:根据员工的部门号进行分区

 *///                                                          k2          v2

public class MyEmployeeParitioner extends Partitioner<IntWritable, Employee>{

    /**

     * numPartition参数:建立多少个分区,我们在main中调用setNumReduceTasks函数时传入

     */

    @Override

    public int getPartition(IntWritable k2, Employee v2, int numPartition) {

        // 如何建立分区

        if(v2.getDeptno() == 10){

            //放入1号分区中

            return 1%numPartition;

        }

else if(v2.getDeptno() == 20){

            //放入2号分区中

            return 2%numPartition;

        }

else{

            //放入0号分区中

            return 3%numPartition;

        }

    }

 }


public class PartEmployeeMain {    public static void main(String[] args) throws Exception {

        // 创建一个job

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(PartEmployeeMain.class);

        //指定job的mapper和输出的类型   k2  v2

        job.setMapperClass(PartEmployeeMapper.class);

        job.setMapOutputKeyClass(IntWritable.class);  //部门号

        job.setMapOutputValueClass(Employee.class);   //员工

        //指定任务的分区规则

       job.setPartitionerClass(MyEmployeeParitioner.class);

        //指定建立几个分区

        job.setNumReduceTasks(3);

        //指定job的reducer和输出的类型  k4   v4        

job.setReducerClass(PartEmployeeReducer.class);

        job.setOutputKeyClass(IntWritable.class);  //部门号

        job.setOutputValueClass(Employee.class);   //员工

        //指定job的输入和输出的路径

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行任务

        job.waitForCompletion(true);

    }

 }


将上述代码导出为jar包,执行Hadoop jar xxx.jar /input/emp.csv /output/0506/partition1,得到如下结果:


这样我们就把一个表通过分区分为了3份。分区的知识就到这里,后面我们开始解释合并。

合并Combiner

合并是一种特殊的Reducer。在Mapper端,先执行一次Reducer —–> 提高效率(减少Mapper输出到Reduce的数据量)。下面是Combiner的原理图:

现在开始上代码:

//Mapper                                k1          v1    k2      v2

class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

    /**

     *context表示Mapper的上下文

     *  上文:HDFS

     *  下文:Mapper

     */

    @Override

    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

        //获取数据:I love Beijing

        String data = v1.toString();

        //分词

        String[] words = data.split(" ");

        //输出k2 v2

        for (String word : words) {

            context.write(new Text(word),new IntWritable(1));

        }

    }

 }


// Reduce                                k3     v3        k4     v4

class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {

    /**

     *  context表示Reducer的上下文

     *  上文:Mapper

     *  下文:HDFS

     */

    @Override

    protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {

        //对v3求和

        int total = 0;

        for (IntWritable v : v3) {

            total += v.get();

        }

        //输出        k4单词        v4频率

        context.write(k3,new IntWritable(total));

    }

 }


//自己的比较器(根据不同类型,继承不同的Comparator,这里使用过的是Text)

class MyNumberComparator extends Text.Comparator {

    @Override

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        //定义自己的排序比较规则:改成降序

        return -super.compare(b1, s1, l1, b2, s2, l2);

    }

 }


//程序入口

class WordCountMain {

    public static void main(String[] args) throws Exception{

        // 创建一个job和任务入口

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(WordCountMain.class);  //main方法所在的class

        //指定job的mapper和输出的类型<k2 v2>

        job.setMapperClass(WordCountMapper.class);

        job.setMapOutputKeyClass(Text.class); //k2的类型

        job.setMapOutputValueClass(IntWritable.class);  //v2的类型

        //指定自己的比较器

        job.setSortComparatorClass(MyNumberComparator.class);

        //指定combiner

        job.setCombinerClass(WordCountReduce.class);

        //指定job的reducer和输出的类型<k4  v4>

        job.setReducerClass(WordCountReduce.class);

        job.setOutputKeyClass(Text.class);  //k4的类型

        job.setOutputValueClass(IntWritable.class);  //v4的类型

        //指定job的输入和输出

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行job

        job.waitForCompletion(true);

    }

 }


仔细看上面的代码,发现和MapReduce的原理和运用中的基本上一样,只有一个地方不一样,在WordCountMain中多了一句job.setCombinerClass(WordCountReduce.class);,我们把WordCountReduce直接作为Combiner传入job当中,运行结果如下:

combiner就是这么简单,不过有一些点需要注意一下:

(*)有些情况不能使用Combiner —-> 举例:求平均值 
(*)保证引入Combiner以后,不能改变原来的逻辑

我们以第一点为例:1+2+3求平均值,结果很明显为2,我们假设现在使用combiner先将1+2求平均值为1.5,然后输出给Reduce,Reduce求1.5+3的平均值,明显已经为不为2了,所以使用的使用要注意。

洗牌(Shuffle)

最后我们要说明的知识点就是洗牌,其实说的简单一点,shuffle就是对之前这些特性的一个功能汇总,在一个MapReduce程序中加入分区、合并、排序等等。还是原来的流程,来一张原理图: 

到这里,MapReduce的原理以及一些常用高级功能就已经完成了。


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
327人赞 举报
分享到
用户评价(1)
扫码APP

扫描使用APP

扫码使用

扫描使用小程序