JobPlus知识库 IT 大数据 文章
使用mapreduce清洗web访问日志并导入hive数据库流程

编写一个简单的日志清洗脚本,原始访问日志如下:

[html]

  1. 192.168.18.1 - - [16/Feb/2017:13:53:49 +0800] "GET /favicon.ico HTTP/1.1" 404 288  
  2. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  3. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  4. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  5. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  6. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  7. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  8. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  9. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  10. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  11. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  12. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a001 HTTP/1.1" 404 288  
  13. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a007 HTTP/1.1" 404 288  
  14. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a003 HTTP/1.1" 404 288  
  15. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/运动鞋/a003 HTTP/1.1" 404 288  
  16. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/皮鞋/b001 HTTP/1.1" 404 288  
  17. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/皮鞋/b002 HTTP/1.1" 404 288  
  18. 192.168.18.2 - - [16/Feb/2017:13:53:49 +0800] "GET /鞋子/男鞋/皮鞋/b003 HTTP/1.1" 404 288  

1,按照格式做好样式数据后,将原始数据导入到/user/hadoop/name目录中;

2,创建java数据清洗执行文件:

    vim Namecount.java

[html] 

  1. import java.lang.String;  
  2. import java.io.IOException;  
  3. import java.util.*;  
  4. import java.text.SimpleDateFormat;  
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Reducer;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  17. import org.apache.hadoop.util.GenericOptionsParser;  
  18. import org.apache.hadoop.io.NullWritable;  
  19.    
  20. public class Namecount {  
  21.   
  22.          public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); //原时间格式  
  23.          public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyy-MM-dd");//现时间格式  
  24.        private Date parseDateFormat(String string) {         //转换时间格式  
  25.             Date parse = null;  
  26.             try {  
  27.                 parse = FORMAT.parse(string);  
  28.             } catch (Exception e) {  
  29.                 e.printStackTrace();  
  30.             }  
  31.             return parse;  
  32.         }  
  33.           
  34.         public String[] parse(String line) {  
  35.             String ip = parseIP(line);       //ip  
  36.             String time = parseTime(line);   //时间  
  37.             String url = parseURL(line);     //url  
  38.             String status = parseStatus(line); //状态  
  39.             String traffic = parseTraffic(line);//流量  
  40.   
  41.             return new String[] { ip, time, url, status, traffic };  
  42.         }   
  43.         private String parseTraffic(String line) {    //流量  
  44.             final String trim = line.substring(line.lastIndexOf("\"") + 1)  
  45.                     .trim();  
  46.             String traffic = trim.split(" ")[1];  
  47.             return traffic;  
  48.         }  
  49.        private String parseStatus(String line) {     //状态  
  50.             final String trim = line.substring(line.lastIndexOf("\"") + 1)  
  51.                     .trim();  
  52.             String status = trim.split(" ")[0];  
  53.             return status;  
  54.         }  
  55.   
  56.         private String parseURL(String line) {       //url  
  57.             final int first = line.indexOf("\"");  
  58.             final int last = line.lastIndexOf("\"");  
  59.             String url = line.substring(first + 1, last);  
  60.             return url;  
  61.         }  
  62.         private String parseTime(String line) {    //时间  
  63.             final int first = line.indexOf("[");  
  64.             final int last = line.indexOf("+0800]");  
  65.             String time = line.substring(first + 1, last).trim();  
  66.             Date date = parseDateFormat(time);  
  67.             return dateformat1.format(date);  
  68.         }  
  69.         private String parseIP(String line) {     //ip  
  70.             String ip = line.split("- -")[0].trim();  
  71.             return ip;  
  72.         }  
  73.     public static class Map extends  
  74.             Mapper<LongWritable, Text, Text, IntWritable> {  
  75.                   
  76.         public void map(LongWritable key, Text value, Context context)  
  77.                 throws IOException, InterruptedException {  
  78.             // 将输入的纯文本文件的数据转化成String  
  79.             Text outputValue = new Text();  
  80.             String line = value.toString();  
  81.              Namecount aa=new Namecount();  
  82.             StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");  
  83.    
  84.             // 分别对每一行进行处理  
  85.             while (tokenizerArticle.hasMoreElements()) {  
  86.                 // 每行按空格划分  
  87.               String stra=tokenizerArticle.nextToken().toString();  
  88.               String [] Newstr=aa.parse(stra);  
  89.   
  90.            if (Newstr[2].startsWith("GET /")) { //过滤开头字符串  
  91.                 Newstr[2] = Newstr[2].substring("GET /".length());  
  92.             }   
  93.           else if (Newstr[2].startsWith("POST /")) {  
  94.                 Newstr[2] = Newstr[2].substring("POST /".length());  
  95.             }  
  96.            if (Newstr[2].endsWith(" HTTP/1.1")) { //过滤结尾字符串  
  97.                 Newstr[2] = Newstr[2].substring(0, Newstr[2].length()  
  98.                         - " HTTP/1.1".length());  
  99.             }  
  100.               String[] words = Newstr[2].split("/");  
  101.               if(words.length==4){  
  102.                   outputValue.set(Newstr[0] + "\t" + Newstr[1] + "\t" + words[0]+"\t"+words[1]+"\t"+words[2]+"\t"+words[3]+"\t"+"0");  
  103.                    context.write(outputValue,new IntWritable(1));                   
  104. }      
  105.     }  
  106.   }  
  107. }  
  108.    
  109.     public static class Reduce extends  
  110.             Reducer<Text, IntWritable, Text, IntWritable> {  
  111.         // 实现reduce函数  
  112.         public void reduce(Text key, Iterable<IntWritable> values,  
  113.                 Context context) throws IOException, InterruptedException {  
  114.           int sum = 0;  
  115.             Iterator<IntWritable> iterator = values.iterator();  
  116.             while (iterator.hasNext()) {  
  117.                 sum += iterator.next().get();  
  118.             }  
  119.             context.write(key, new IntWritable(sum));  
  120.         }  
  121.     }  
  122.     public static void main(String[] args) throws Exception {  
  123.         Configuration conf = new Configuration();  
  124.           
  125.     conf.set("mapred.jar","Namecount.jar");  
  126.    
  127.         String[] ioArgs = new String[] { "name", "name_out" };  
  128.         String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();  
  129.         if (otherArgs.length != 2) {  
  130.             System.err.println("Usage: Score Average <in> <out>");  
  131.             System.exit(2);  
  132.         }  
  133.    
  134.         Job job = new Job(conf, "name_goods_count");  
  135.         job.setJarByClass(Namecount.class);  
  136.    
  137.         // 设置Map、Combine和Reduce处理类  
  138.         job.setMapperClass(Map.class);  
  139.         job.setCombinerClass(Reduce.class);  
  140.         job.setReducerClass(Reduce.class);  
  141.    
  142.         // 设置输出类型  
  143.         job.setOutputKeyClass(Text.class);  
  144.         job.setOutputValueClass(IntWritable.class);  
  145.    
  146.         // 将输入的数据集分割成小数据块splites,提供一个RecordReder的实现  
  147.         job.setInputFormatClass(TextInputFormat.class);  
  148.         // 提供一个RecordWriter的实现,负责数据输出  
  149.         job.setOutputFormatClass(TextOutputFormat.class);  
  150.    
  151.         // 设置输入和输出目录  
  152.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
  153.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
  154.         System.exit(job.waitForCompletion(true) ? 0 : 1);  
  155.     }  
  156. }  


3,编译执行

[html]

  1. [hadoop@h85 mr]$ /usr/jdk1.7.0_25/bin/javac Namecount.java  
  2. [hadoop@h85 mr]$ /usr/jdk1.7.0_25/bin/jar cvf Namecount.jar Namecount*class  
  3. [hadoop@h85 mr]$ hadoop jar Namecount.jar Namecount  

输出的结果被保存在/user/hadoop/name_out/part-r-00000

4,hive中创建有相应字段的表:(字段)

[html] 

  1. 例如: ip string       acc_date string    wp string   sex string(鞋子种类)   type(鞋子种类) string     nid(鞋子编号) string   quanzhong(权重) int         count int  

[html] 

  1. 例如:192.168.18.2    20170216            鞋子            男鞋                   运动鞋                          a001                0                    13  

创建表:

[html] 

  1. create table acc_log(ip string,acc_date string,wp string,sex string,type string,nid string,quanzhong int,count int) row format delimited fields terminated by '\t';  

抽取数据:

[html] 

  1. load data inpath '/user/hadoop/name_out/part-r-00000' into table acc_log;  


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

¥ 打赏支持
69人赞 举报
分享到
用户评价(0)

暂无评价,你也可以发布评价哦:)

扫码APP

扫描使用APP

扫码使用

扫描使用小程序