JobPlus知识库 IT 大数据 文章
mapreduce初期学习

在我最初使用Java写MapReduce程序之前,总有一个疑问:既然可以用SQL这么通俗易懂的语句直接操作数据,而且不需要过多了解MapReduce执行过程,为什么还要费力地用Java垒代码,去了解MapReduce的底层执行过程。什么样的应用场景需要我们来开发MapReduce呢?

首先,SQL非常适用于处理结构化数据,对于非结构化数据以及需要特殊函数处理的数据比如文本数据,SQL则会力不从心。举一个小例子,从海量文本数据中提取各种字符编码并翻译为中文,过程中还涉及自动识别是utf-8还是ANSI亦或是其他编码格式,这个需求用MapReduce程序实现起来更为合理;另外,在处理业务逻辑较为复杂的任务时,使用SQL很难实现,其执行效率方面也很难满足业务需求。举例来说,我们需要将业务日志中的域名识别为相应的互联网应用,现实操作中需要分多种情况使用多重判断进行规则匹配,并剔除钓鱼网站和fake url,使用SQL很难实现业务逻辑。再例如,使用SQL进行多表join并叠加复杂的数学运算时,其效率也很难满足业务需求。

在我们的机器学习工具开发过程中,为了使用原有数据建立特征向量,我们需要对原有表结构进行转化,需要迭代原始数据生成具有较多特征值的特征向量。原始数据量为13亿条,共13.2GB,我们尝试使用Hive SQL进行实现,经过测试,任务执行时间过长无法满足需求。而使用MapReduce编写两个Job实现业务逻辑,同时使用哈希算法优化字符串查询效率,最终处理时长为15分钟。应对这些复杂情况,使用MapReduce编程可以使我们获得更多对程序实现的控制和方法选择,通过底层算法优化实现效率提升。

基于不同的业务场景,结合不同工具特点,我们采用SQL脚本和MapReduce开发程序结合的策略,使日常数据处理任务在效率上得到了很好地满足。在我们平台中,MapReduce程序承担了如关键字提取、应用匹配和标签规则运算等近30%的日常数据处理任务。

总之,我们在实际应用中依据灵活性和效率来选择是否自己开发程序。

概览MapReduce

认识MapReduce先从架构入手,在此我们一图以蔽之:

 

图 1

 

现在广泛使用的MapReduce v2基于YARN架构,其角色包括Resource Manager(RM)、NodeManager(NM)、Application Master(AM)。RM由Master主机承担,主要负责任务调度和资源调配,NM和AM由各工作节点Slave承担,负责任务的处理和资源读写,其计算单位抽象为container。MapReduce的计算流程可以抽象为Splitting、Mapping、Shuffling、Reducing阶段,其中shuffling包括了Grouping、Sorting、Partitioning过程。以WordCount为例,如下图:

 

图 2

 

在掌握了MapReduce架构和原理的基础上,从代码的角度认识MapReduce才是程序员的正确打开方式。

开发MapReduce

MapReduce程序中,Map和Reduce逻辑功能分别通过扩展Mapper类和Reducer类实现。具体在实现过程中,我们在主类中将Mapper和Reducer类扩展并作为内部类调用,最后通过main函数定义输入输出以及Job配置,从而作为程序主入口。

Map实现

Mapper类扩展需要实现map方法,如下:

  1. private static class MyMapper extends

  2. Mapper<NullWritable, Writable, IntWritable, Text> {

  3. @Override

  4. protected void map(

  5.                NullWritable key,

  6.                Writable value,

  7.                Mapper<NullWritable, Writable, IntWritable, Text>.Context context)

  8. throws IOException, InterruptedException {

  9. }

  10. }

根据需求可以扩展setup、cleanup和自定义方法等,扩展Mapper类时需要声明键值对类型,如 Mapper< NullWritable,Writable,IntWritable,Text >,依次分别为输入输出< key,value >类型,其中< NullWritable,Writable >是orc文件格式输入< key,value >类型。

需要强调的是,MapReduce中所有输入输出字段类型都必须实现Writable或者WritableComparable类型,这是因为MapReduce中磁盘读写和节点数据传输过程涉及到数据的序列化和反序列化,需要通过这两类来实现。经常用到的IntWritable、LongWritable、Text等都是实现自WritableComparable类,如果需要,我们也可以扩展这两类实现自定义数据类型。例如,在通过MapReduce实现两表和多表Join的过程中,我通过实现WritableComparable类自定义Map输出的key字段类型,来实现对于Grouping和Sorting阶段不同比较字段的控制。

setup方法在类调用起始阶段运行,可以实现初始阶段对于参数读取和变量赋值的操作。在app应用识别案例中,我们在setup阶段实现对于平台DPI文件的读取操作,以在之后的map阶段实现MapJoin操作,代码如下:

  1. protected void setup(Context context) throws IOException,

  2. InterruptedException {

  3. Configuration conf = context.getConfiguration();

  4. //读取DPI文件HDFS存放路径

  5. String ini=DefaultStringifier.load(conf, "ini", Text.class).toString();

  6. FileSystem fs = FileSystem.get(conf);

  7. FSDataInputStream in = fs.open(new Path(ini));

  8. BufferedReader bf = new BufferedReader(new InputStreamReader(in));

  9. String str = null;

  10. while ((str = bf.readLine()) != null) {

  11. String[] rules = str.split("\\|\\|");

  12. String host = rules[5];

  13. String appname = rules[1]

  14. DPIMap.put(host,appname);

  15. }

  16. if (bf != null) {

  17. bf.close();

  18. }

  19. }

其中DPIMap是需要在主类中定义的HashMap变量,在map阶段将使用HashMap实现快速查找。

map方法是实现Mapper类的核心方法,map阶段主要逻辑都需要在map方法中实现。map方法参数定义包括输入< key,value >和上下文对象context声明。Context对象负责在MapReduce执行过程中平台配置和Job配置的传递。Job执行过程中,写入的业务逻辑会对每一条数据进行操作,并将中间结果< key,value >值通过context对象写入后台进行之后的shuffle和reduce操作。

例如我需要将业务数据中的host字段与DPI数据的host字段进行等值连接,统计出使用app的次数。我们可以在map方法中实现如下:

  1. protected void map(

  2.        NullWritable key,

  3.        Writable value,

  4.        Mapper<NullWritable, Writable, Text, IntWritable>.Context context)

  5.        throws IOException, InterruptedException {

  6. String appname = new String();

  7. //读入orc格式数据;

  8. OrcStruct struct = (OrcStruct)value;

  9. TypeInfo typeInfo =

  10. TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA);

  11. StructObjectInspector inspector = (StructObjectInspector)

  12. OrcStruct.createObjectInspector(typeInfo);

  13. //读入host字段;

  14. try{

  15. host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString();

  16. }catch(Exception e){

  17. host = "";

  18. }

  19. if ((appname = DpiList.get(host)) != null){

  20. context.write(new Text(appname),new IntWritable(1));

  21. }

  22. }

在此默认输入数据为ORC格式,代码中涉及对ORC文件读取方法。

Reduce实现

同Mapper类类似,扩展Reducer类需要实现reduce方法。继续以统计app次数为例,Reducer类扩展实现为:

  1. private static class MyReducer extends

  2. Reducer<Text, IntWritable, NullWritable, Writable> {

  3. @Override

  4. protected void reduce(Text key, Iterable<IntWritable> values, Context context)

  5. throws IOException, InterruptedException {

  6. OrcSerde orcSerde = new OrcSerde();

  7. //写orc格式文件操作;

  8. Writable row;

  9. int sum = 0;

  10. StructObjectInspector inspector =

  11. (StructObjectInspector) ObjectInspectorFactory

  12. .getReflectionObjectInspector(MyRow.class,

  13. ObjectInspectorFactory.ObjectInspectorOptions.JAVA);

  14. for(IntWritable val:values){

  15. sum = sum + val.get();

  16. }

  17. String[] result = {key.toString, Integer.toString(sum)};

  18. row = orcSerde.serialize(new MyRow(result), inspector);

  19. context.write(NullWritable.get(), row);

  20. }

  21. }

其中reduce方法实现的逻辑为对依据key值group之后的value值集合进行加和,并写入HDFS。

在reduce方法中,接收到的value集合通过Iterable接口实现,我们可以通过iterator对象提供的API实现对value值集合的遍历。Reduce的输出我们最终写为ORC格式。

程序主入口main()方法

通过在主类中定义main()方法作为程序的入口,我们需要在此完成对程序参数传递、输入输出配置和HDFS平台配置声明等工作,以app应用识别为例,代码如下:

  1. public static void main(String[] args) throws IOException,

  2. URISyntaxException, InterruptedException, ClassNotFoundException {

  3. String inputPath = args[0];

  4. String outputPath = args[1];

  5. String ini = args[2];

  6. Configuration conf = new Configuration();

  7. //向Mapper传递DPI文件位置;

  8. DefaultStringifier.store(conf,ini,"ini");

  9. Job job = new Job(conf);

  10. //设置任务队列;

  11. conf.set("mapreduce.job.queuename", "background");

  12. job.setJarByClass(StrMatching_dpi_orc.class);

  13. //设置reduce数量;

  14. job.setNumReduceTasks(40);

  15. //定义输入输出文件类型;

  16. job.setInputFormatClass(OrcNewInputFormat.class);

  17. job.setOutputFormatClass(OrcNewOutputFormat.class);

  18. //配置输入输出文件路径;

  19. FileInputFormat.addInputPath(job, new Path(inputPath));

  20. FileSystem fs = FileSystem.get(conf);

  21. if (fs.exists(new Path(outputPath))) {

  22. fs.delete(new Path(outputPath), true);

  23. }

  24. FileOutputFormat.setOutputPath(job, new Path(outputPath));

  25. //设置Map输出的<key,value>类型;

  26. job.setMapOutputKeyClass(Text.class);

  27. job.setMapOutputValueClass(IntWritable.class);

  28. //设置最终输出结果<key,value>类型;

  29. job.setOutputKeyClass(NullWritable.class);

  30. job.setOutputValueClass(Writable.class);

  31. //声明Mapper类和Reducer类;

  32. job.setMapperClass(MyMapper.class);

  33. job.setReducerClass(MyReducer.class);

  34. //执行任务,结束后自动退出;

  35. System.exit(job.waitForCompletion(true) ? 0 : 1);

  36. }

此例main()方法主要完成了对输入输出类型和路径的配置、任务执行队列和资源配置的定义。main()方法主要完成对程序接口的定义和资源调配,以上代码展示了一个最基本main()方法的定义。如果任务需要,我们还可以完成诸如自定义Group Comparator、Sort Comparator、Partitoner等对象的定义,并在main()方法中声明,作为MapReduce程序的comparator。

在我们平台的日常任务中,我们放弃使用占用空间较大的Text和Sequence文件格式,完全使用ORC文件格式作为数据存储格式。这样可以实现自定义MapReduce程序与Hive平台的无缝结合,更重要的是,可以为平台节省十倍的存储空间。

ORC存储方法

ORC File是Optimized Row Columnar (ORC) file的简称,它基于RCFile格式进行了优化。ORC文件格式的设计初衷是为了提高Hive数据读写以及数据处理能力,由于其实现了一定的数据压缩,可以占用更小的数据存储。

我们使用ORC格式作为MapReduce和Hive工具的统一存储格式,可以节省平台大量的存储空间,同时也实现了MapReduce程序与Hive的更好结合。

经过我们平台日常任务的实测积累,ORC文件格式可以为Hive提供稳定快速的数据读写,并且与Text文件存储相比,可以节省十倍的存储空间,可以大幅提升平台数据存储和处理能力。对于MapReduce程序读写ORC文件,无法像未压缩的Text文件一样直接读写,还需要做关于表数据结构声明等工作。

读ORC文件

仍然以app应用识别为例,主类中需要定义变量SCHEMA,声明读入表结构:

private static final String SCHEMA = "struct<ID:string,Name:string, time:string,fst_uri:string,host:string>";

读取ORC文件格式的代码如下:

  1. OrcStruct struct = (OrcStruct)value;

  2. TypeInfo typeInfo =

  3. TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA);

  4. StructObjectInspector inspector = (StructObjectInspector)

  5. OrcStruct.createObjectInspector(typeInfo);

  6. //读入host字段;

  7. try{

  8. host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString();

  9. }catch(Exception e){

  10. host = "";

  11. }

首先,需要将读入的value值强制类型转换为OrcStruct,然后根据表结构实例化StructObjectInspector对象为inspector,最后使用StructObjectInspector类提供的API对字段进行读取。

写ORC文件

与读入过程相对应,写ORC文件代码如下:

  1. OrcSerde orcSerde = new OrcSerde();

  2. Writable row;

  3. int sum = 0;

  4. StructObjectInspector inspector =

  5. (StructObjectInspector) ObjectInspectorFactory

  6. .getReflectionObjectInspector(MyRow.class,

  7. ObjectInspectorFactory.ObjectInspectorOptions.JAVA);

  8. for(IntWritable val:values){

  9. sum = sum + val.get();

  10. }

  11. String[] result = {key.toString, Integer.toString(sum)};

  12. row = orcSerde.serialize(new MyRow(result), inspector);

  13. context.write(NullWritable.get(), row);

我们需要根据自定义的数据类型MyRow类实例化StructObjectInspector为inspector,然后使用OrcSerde对象将最终计算结果进行序列化并写入HDFS。其中MyRow类是通过扩展Writable类,对输出数据类型进行了定义,在类中完成了对输出表结构字段的定义和赋值,代码如下:

  1. public class MyRow implements Writable {

  2. String appname;

  3. int cnt;

  4. MyRow(String[] val){

  5. this.appname = val[0];

  6. this.cnt = Integer.parseInt(val[1]);

  7. }

  8. @Override

  9. public void readFields(DataInput arg0) throws IOException {

  10. throw new UnsupportedOperationException("no write");

  11. }

  12. @Override

  13. public void write(DataOutput arg0) throws IOException {

  14. throw new UnsupportedOperationException("no read");

  15. }

  16. }

在上面的章节中,我们介绍了MapReduce开发在北京移动大数据平台上的应用背景和部分应用案例。尽管MapReduce由于处理机制中大量的磁盘读写带来了数据处理效率的瓶颈,但在日常离线数据处理任务中由于其成熟稳定的性能,MapReduce仍然扮演着十分重要的角色。


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
35人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序