涉及到的知识:
- Yarn调度MapReduce的过程
- Mapper
- Reduce
- 排序
- 分区
- Combiner
- Shuffle
Yarn调度MapReduce的过程
Yarn和MapReduce的关系,就跟我们web项目中和tomcat的关系一样,Yarn是MapReduce运行的容器。下面我们先介绍一下Yarn的一些概念。
Yarn也是主从结构:ResourceManager(主节点)、NodeManager(从节点–真正干活的)。
(1)ResourceManager资源管理器
- 接收客户端的请求:执行任务
- 分配资源
- 分配任务
(2)NodeManager阶段管理器(运行任务MapReduce)
- 从DataNode上获取数据,执行任务
下面来一张Yarn调度MapReduce的原理图:
在原理图里面涉及到一个资源分配的关系,Yarn资源分配的方式有三种:
FIFO Scheduler:先来先得。缺点:没有考虑任务的优先级
Capacity Scheduler:容器管理。
Fair Scheduler:公平调度。注意:安装配置Hive on Spark,需要配置Yarn为Fair Scheduler
前提:假设每个任务具有相同的优先级,平均分配系统的资源(不过可以配置任务权重,把优先级考虑进去了)
后面会涉及到这一方面的问题,到时候在说明。
MapReduce的原理和运用
在Hadoop背景知识中,我们已经介绍过什么是MapReduce,并且在Hadoop环境搭建里面运行过系统的wordcount jar,获取到以下结果:
现在我们通过详细分析wordcount的MapReduce过程来实现自己的wordcount程序,并且加入排序(默认是字典顺序,即上面的结果,我们将其改为逆字典顺序)。在写代码之前我们先来分析wordcount的MapReduce流程:
从上面的原理图我们可以看出一共是分为两个阶段:Mapper、Reduce,Mapper的输出作为Reduce的输入,Reduce完成之后输出到HDFS。下面我们就来实现一个自己的MapReduce:
//Mapper k1 v1 k2 v2
class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/**
* context表示Mapper的上下文
* 上文:HDFS
* 下文:Mapper
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//获取数据:I love Beijing
String data = v1.toString();
//分词
String[] words = data.split(" ");
//输出k2 v2
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
// Reduce k3 v3 k4 v4
class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
/**
*
context表示Reducer的上下文
* 上文:Mapper
* 下文:HDFS
*/
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
//对v3求和
int total = 0;
for (IntWritable v : v3) {
total += v.get();
}
//输出 k4单词 v4频率
context.write(k3,new IntWritable(total));
}
}
//自己的比较器(根据不同类型,继承不同的Comparator,这里使用过的是Text)
class MyNumberComparator extends Text.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//定义自己的排序比较规则:改成降序
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
//程序入口class WordCountMain {
public static void main(String[] args) throws Exception{
// 创建一个job和任务入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); //main方法所在的class
//指定job的mapper和输出的类型<k2 v2>
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); //k2的类型
job.setMapOutputValueClass(IntWritable.class); //v2的类型
//指定自己的比较器
job.setSortComparatorClass(MyNumberComparator.class);
//指定job的reducer和输出的类型<k4 v4>
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class); //k4的类型
job.setOutputValueClass(IntWritable.class); //v4的类型
//指定job的输入和输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行job
job.waitForCompletion(true);
}
}
把这个程序导出成jar,然后上传到linux,执行hadoop jar xxx.jar /input/data.txt /output/0506/word_count2命令即可。结果如下:
我们得到了想要的结果,MapReduce的排序,默认情况下:数字(升序)、字符串(字典顺序)。到这里关于Mapper、Reducer、排序就已经完成了。
注意:写mapreduce程序要导入jar包作为依赖,jar在common和mapreduce中,自行拷贝即可。
分区partition
我们先以Oracle数据库为例,解释什么是分区、什么是Hash分区,然后使用MapReduce实现分区的功能。
下面是分区的原理图:
从上面可以看出,分区的本质就是将全表进行划分,从而提高查询效率。上图中讲述到一个比较重要的概念叫作Hash分区,我们这里一起说明一下,然后开始动手写代码。
关于分区的概念已经都说明完了,现在我们开始上代码了:
//数据类:员工表的一行数据-->7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Employee implements Writable{
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public String toString() {
return "Employee [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
}
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
// k2 部门号 v2 员工
public class PartEmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建员工对象
Employee e = new Employee();
//设置员工的属性
//员工号
e.setEmpno(Integer.parseInt(words[0]));
//姓名
e.setEname(words[1]);
//职位
e.setJob(words[2]);
//老板号: 注意 可能没有老板号
try{
e.setMgr(Integer.parseInt(words[3]));
}
catch(Exception ex){
//没有老板号
e.setMgr(-1);
}
//入职日期
e.setHiredate(words[4]);
//月薪
e.setSal(Integer.parseInt(words[5]));
//奖金:注意:奖金也可能没有
try{
e.setComm(Integer.parseInt(words[6]));
}catch(Exception ex){
//没有奖金
e.setComm(0);
}
//部门号
e.setDeptno(Integer.parseInt(words[7]));
//输出:k2 部门号 v2 员工对象
context.write(new IntWritable(e.getDeptno()),e);
}
}
public class PartEmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Employee> {
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)
throws IOException, InterruptedException {
/*
* k3 部门号
* v3 部门的员工
*/
for(Employee e:v3){
context.write(k3, e);
}
}
}
/*
* 建立自己的分区规则:根据员工的部门号进行分区
*/// k2 v2
public class MyEmployeeParitioner extends Partitioner<IntWritable, Employee>{
/**
* numPartition参数:建立多少个分区,我们在main中调用setNumReduceTasks函数时传入
*/
@Override
public int getPartition(IntWritable k2, Employee v2, int numPartition) {
// 如何建立分区
if(v2.getDeptno() == 10){
//放入1号分区中
return 1%numPartition;
}
else if(v2.getDeptno() == 20){
//放入2号分区中
return 2%numPartition;
}
else{
//放入0号分区中
return 3%numPartition;
}
}
}
public class PartEmployeeMain { public static void main(String[] args) throws Exception {
// 创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartEmployeeMain.class);
//指定job的mapper和输出的类型 k2 v2
job.setMapperClass(PartEmployeeMapper.class);
job.setMapOutputKeyClass(IntWritable.class); //部门号
job.setMapOutputValueClass(Employee.class); //员工
//指定任务的分区规则
job.setPartitionerClass(MyEmployeeParitioner.class);
//指定建立几个分区
job.setNumReduceTasks(3);
//指定job的reducer和输出的类型 k4 v4
job.setReducerClass(PartEmployeeReducer.class);
job.setOutputKeyClass(IntWritable.class); //部门号
job.setOutputValueClass(Employee.class); //员工
//指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
将上述代码导出为jar包,执行Hadoop jar xxx.jar /input/emp.csv /output/0506/partition1,得到如下结果:
这样我们就把一个表通过分区分为了3份。分区的知识就到这里,后面我们开始解释合并。
合并Combiner
合并是一种特殊的Reducer。在Mapper端,先执行一次Reducer —–> 提高效率(减少Mapper输出到Reduce的数据量)。下面是Combiner的原理图:
现在开始上代码:
//Mapper k1 v1 k2 v2
class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
/**
*context表示Mapper的上下文
* 上文:HDFS
* 下文:Mapper
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//获取数据:I love Beijing
String data = v1.toString();
//分词
String[] words = data.split(" ");
//输出k2 v2
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
// Reduce k3 v3 k4 v4
class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
/**
* context表示Reducer的上下文
* 上文:Mapper
* 下文:HDFS
*/
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context) throws IOException, InterruptedException {
//对v3求和
int total = 0;
for (IntWritable v : v3) {
total += v.get();
}
//输出 k4单词 v4频率
context.write(k3,new IntWritable(total));
}
}
//自己的比较器(根据不同类型,继承不同的Comparator,这里使用过的是Text)
class MyNumberComparator extends Text.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//定义自己的排序比较规则:改成降序
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
//程序入口
class WordCountMain {
public static void main(String[] args) throws Exception{
// 创建一个job和任务入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class); //main方法所在的class
//指定job的mapper和输出的类型<k2 v2>
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class); //k2的类型
job.setMapOutputValueClass(IntWritable.class); //v2的类型
//指定自己的比较器
job.setSortComparatorClass(MyNumberComparator.class);
//指定combiner
job.setCombinerClass(WordCountReduce.class);
//指定job的reducer和输出的类型<k4 v4>
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class); //k4的类型
job.setOutputValueClass(IntWritable.class); //v4的类型
//指定job的输入和输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行job
job.waitForCompletion(true);
}
}
仔细看上面的代码,发现和MapReduce的原理和运用中的基本上一样,只有一个地方不一样,在WordCountMain中多了一句job.setCombinerClass(WordCountReduce.class);,我们把WordCountReduce直接作为Combiner传入job当中,运行结果如下:
combiner就是这么简单,不过有一些点需要注意一下:
(*)有些情况不能使用Combiner —-> 举例:求平均值
(*)保证引入Combiner以后,不能改变原来的逻辑
我们以第一点为例:1+2+3求平均值,结果很明显为2,我们假设现在使用combiner先将1+2求平均值为1.5,然后输出给Reduce,Reduce求1.5+3的平均值,明显已经为不为2了,所以使用的使用要注意。
洗牌(Shuffle)
最后我们要说明的知识点就是洗牌,其实说的简单一点,shuffle就是对之前这些特性的一个功能汇总,在一个MapReduce程序中加入分区、合并、排序等等。还是原来的流程,来一张原理图:
到这里,MapReduce的原理以及一些常用高级功能就已经完成了。
登录 | 立即注册