利用Hadoop Mapreduce实现pv统计分析

摘 要

本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。

前言

利用网站的kpi数据来分析出网站潜在的价值,那么了解网站的PV、UV、IP的状况,是一项必不可少的任务。本文将介绍通过Hadoop Mapreduce实现离线统计网站每日pv的思路及代码。

什么是PV

pv是指页面的浏览量或点击量(Page View),用户每访问一次或刷新一下即被计算一次。

需求

对网站以往的访问数据进行日pv、月PV、年PV统计。

技术选型

对于访问量大的网站来说,普通程序计算实现成本非常大。我们可以利用Hadoop来实现分布式计算,将固有的数据量分散到多台机器进行计算,无疑加快了计算速度,也降低了宕机的风险。

 

实现思路

在map阶段将数据清洗,并进行排序、分组。在reduce阶段完成统计。非常简单。

下面具体看实现代码:

KPI.java 用于封装日志信息及排序

  1. package com.itunic.mr.kpi;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.text.ParseException;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. import java.util.HashSet;
  9. import java.util.Locale;
  10. import java.util.Set;
  11. import org.apache.hadoop.io.WritableComparable;
  12. /**
  13.  * 封装一个实体类,将nginx的日志分解成有用的信息。
  14.  * @author itunic
  15.  *
  16.  */
  17. public class KPI implements WritableComparable<KPI> {
  18.     private String remote_addr;// 来访ip
  19.     private String remote_user;// 来访用户名称,忽略属性“-”
  20.     private String time_local;// 记录时间与时区
  21.     private String request;// 访问页面
  22.     private String status;// 返回状态
  23.     private String body_bytes_sent;// 返回客户端内容主体大小
  24.     private String http_referer;// 来访页面
  25.     private String http_user_agent;// 客户浏览器的相关信息
  26.     private boolean valid = true;// 检验数据是否合法
  27.     //设置需要统计的页面类型
  28.     static Set<String> pages = null;
  29.     static {
  30.         pages = new HashSet<String>();
  31.         pages.add(".php");
  32.         /*pages.add(".jsp");
  33.         pages.add(".png");*/
  34.     }
  35.     /**
  36.      * 将传过来的数据封装成bean
  37.      * 
  38.      * @param line
  39.      * @return
  40.      */
  41.     private static KPI parser(String line) {
  42.         KPI kpi = new KPI();
  43.         String[] arr = line.split(" ");
  44.         if (arr.length > 11) {
  45.             kpi.setRemote_addr(arr[0]);
  46.             kpi.setRemote_user(arr[1]);
  47.             kpi.setTime_local(arr[3].substring(1));
  48.             String url = arr[6].indexOf("?") != -1 ? arr[6].substring(0, arr[6].indexOf("?")) : arr[6];
  49.             kpi.setRequest(url);
  50.             kpi.setStatus(arr[8]);
  51.             kpi.setBody_bytes_sent(arr[9]);
  52.             kpi.setHttp_referer(arr[10]);
  53.             if (arr.length > 12) {
  54.                 kpi.setHttp_user_agent(arr[11] + " " + arr[12]);
  55.             } else {
  56.                 kpi.setHttp_user_agent(arr[11]);
  57.             }
  58.             if (Integer.parseInt(kpi.getStatus()) >= 400) {
  59.                 kpi.setValid(false);
  60.             }
  61.         } else {
  62.             kpi.setValid(false);
  63.         }
  64.         return kpi;
  65.     }
  66.     /**
  67.      * 提取需要的url
  68.      * 
  69.      * @param line
  70.      * @return
  71.      */
  72.     public static KPI filterPVs(String line) {
  73.         KPI kpi = parser(line);
  74.         kpi.setValid(false);
  75.         for (String page : pages) {
  76.             if (kpi.getRequest() != null) {
  77.                 if (kpi.getRequest().contains(page)) {
  78.                     kpi.setValid(true);
  79.                     break;
  80.                 }
  81.             }
  82.         }
  83.         return kpi;
  84.     }
  85.     public String getRemote_addr() {
  86.         return remote_addr;
  87.     }
  88.     public void setRemote_addr(String remote_addr) {
  89.         this.remote_addr = remote_addr;
  90.     }
  91.     public String getRemote_user() {
  92.         return remote_user;
  93.     }
  94.     public void setRemote_user(String remote_user) {
  95.         this.remote_user = remote_user;
  96.     }
  97.     public String getTime_local() {
  98.         return time_local;
  99.     }
  100.     public Date getTime_local_Date() {
  101.         SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
  102.         try {
  103.             return df.parse(this.time_local);
  104.         } catch (ParseException e) {
  105.             return null;
  106.         }
  107.     }
  108.     public String getTime_local_day() throws ParseException {
  109.         SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
  110.         return df.format(this.getTime_local_Date());
  111.     }
  112.     public String getTime_local_Date_hour() throws ParseException {
  113.         SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
  114.         return df.format(this.getTime_local_Date());
  115.     }
  116.     public void setTime_local(String time_local) {
  117.         this.time_local = time_local;
  118.     }
  119.     public String getRequest() {
  120.         return request;
  121.     }
  122.     public void setRequest(String request) {
  123.         this.request = request;
  124.     }
  125.     public String getStatus() {
  126.         return status;
  127.     }
  128.     public void setStatus(String status) {
  129.         this.status = status;
  130.     }
  131.     public String getBody_bytes_sent() {
  132.         return body_bytes_sent;
  133.     }
  134.     public void setBody_bytes_sent(String body_bytes_sent) {
  135.         this.body_bytes_sent = body_bytes_sent;
  136.     }
  137.     public String getHttp_referer() {
  138.         return http_referer;
  139.     }
  140.     public void setHttp_referer(String http_referer) {
  141.         this.http_referer = http_referer;
  142.     }
  143.     public String getHttp_user_agent() {
  144.         return http_user_agent;
  145.     }
  146.     public void setHttp_user_agent(String http_user_agent) {
  147.         this.http_user_agent = http_user_agent;
  148.     }
  149.     public boolean isValid() {
  150.         return valid;
  151.     }
  152.     public void setValid(boolean valid) {
  153.         this.valid = valid;
  154.     }
  155.     @Override
  156.     public void write(DataOutput out) throws IOException {
  157.         out.writeUTF(this.getRemote_addr());
  158.         out.writeUTF(this.getRemote_user());
  159.         out.writeUTF(this.getTime_local());
  160.         out.writeUTF(this.getRequest());
  161.         out.writeUTF(this.getStatus());
  162.         out.writeUTF(this.getBody_bytes_sent());
  163.         out.writeUTF(this.getHttp_referer());
  164.     }
  165.     @Override
  166.     public void readFields(DataInput in) throws IOException {
  167.         this.setRemote_addr(in.readUTF());
  168.         this.setRemote_user(in.readUTF());
  169.         this.setTime_local(in.readUTF());
  170.         this.setRequest(in.readUTF());
  171.         this.setStatus(in.readUTF());
  172.         this.setBody_bytes_sent(in.readUTF());
  173.         this.setHttp_referer(in.readUTF());
  174.     }
  175.     /**
  176.      * 排序,按照日期降序、url升序处理。
  177.      */
  178.     @Override
  179.     public int compareTo(KPI o) {
  180.         int i;
  181.         try {
  182.             i = this.getTime_local_day().compareTo(o.getTime_local_day());
  183.             if (i != 0) {
  184.                 return -i;
  185.             }
  186.         } catch (ParseException e) {
  187.             e.printStackTrace();
  188.         }
  189.         return this.getRequest().compareTo(o.getRequest());
  190.     }
  191. }

PageViewGroup.java 用于将日志信息分组

  1. package com.itunic.mr.kpi;
  2. import java.text.ParseException;
  3. import org.apache.hadoop.io.WritableComparable;
  4. import org.apache.hadoop.io.WritableComparator;
  5. /**
  6.  * 分组,
  7.  * 由于需要按照每日的维度统计pv,必须将日期分组。
  8.  * @author itunic
  9.  *
  10.  */
  11. public class PageViewGroup extends WritableComparator {
  12.     public PageViewGroup() {
  13.         super(KPI.classtrue);
  14.     }
  15.     @SuppressWarnings("rawtypes")
  16.     @Override
  17.     public int compare(WritableComparable a, WritableComparable b) {
  18.         KPI kpi1 = (KPI) a;
  19.         KPI kpi2 = (KPI) b;
  20.         try {
  21.             /**
  22.              * 判断当前bean与传入的bean的日期是否一致。如果一致则需要判断是否为同一个url
  23.              */
  24.             int i = kpi1.getTime_local_day().compareTo(kpi2.getTime_local_day());
  25.             if (i != 0) {
  26.                 return -i;
  27.             }
  28.             return kpi1.getRequest().compareTo(kpi2.getRequest());
  29.         } catch (ParseException e) {
  30.             // TODO Auto-generated catch block
  31.             e.printStackTrace();
  32.         }
  33.         return 0;
  34.     }
  35. }

PageView.java Mapreduce计算类

  1. package com.itunic.mr.kpi;
  2. import java.io.IOException;
  3. import java.text.ParseException;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. public class PageView {
  14.     /**
  15.      * 利用mapreduce 离线统计每天的pv
  16.      * 
  17.      * @author itunic
  18.      *
  19.      */
  20.     public static class PageVisitsMapper extends Mapper<LongWritable, Text, KPI, LongWritable> {
  21.         @Override
  22.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  23.             // 校验每一行URL的合法性
  24.             KPI kpi = KPI.filterPVs(value.toString());
  25.             if (kpi.isValid()) {
  26.                 // 利用mapper特性输出给reducer
  27.                 context.write(kpi, new LongWritable(1));
  28.             }
  29.         }
  30.     }
  31.     public static class PageVisitsReducer extends Reducer<KPI, LongWritable, Text, Text> {
  32.         @Override
  33.         protected void reduce(KPI key, Iterable<LongWritable> value, Context context)
  34.                 throws IOException, InterruptedException {
  35.             long count = 0;
  36.             // 将相同的分组相同的页面循环叠加
  37.             for (LongWritable l : value) {
  38.                 count += l.get();
  39.             }
  40.             String out = key.getRequest() + "\t" + count;
  41.             try {
  42.                 context.write(new Text(key.getTime_local_day()), new Text(out));
  43.             } catch (ParseException e) {
  44.                 // TODO Auto-generated catch block
  45.                 e.printStackTrace();
  46.             }
  47.         }
  48.     }
  49.     public static void main(String[] args) throws Exception {
  50.         Configuration conf = new Configuration();
  51.         Job job = Job.getInstance(conf);
  52.         job.setJarByClass(PageView.class);
  53.         job.setMapperClass(PageVisitsMapper.class);
  54.         job.setReducerClass(PageVisitsReducer.class);
  55.         job.setMapOutputKeyClass(KPI.class);
  56.         job.setMapOutputValueClass(LongWritable.class);
  57.         job.setOutputKeyClass(Text.class);
  58.         job.setOutputValueClass(Text.class);
  59.         job.setGroupingComparatorClass(PageViewGroup.class);
  60.         FileInputFormat.setInputPaths(job, new Path("F:\\test\\input\\access.log.fensi"));
  61.         FileOutputFormat.setOutputPath(job, new Path("F:\\test\\output9"));
  62.         int i = job.waitForCompletion(true) ? 0 : 1;
  63.         System.exit(i);
  64.     }
  65. }

源数据实例:nginx log

  1. 194.237.142.21 - - [18/Sep/2013:06:49:18 +0000"GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"

 

运行分析部分结果如下:

  1. 2013-09-19  /tag/hadoop/page/3//images/stories/3xp.php  2
  2. 2013-09-19  /tag/rhadoop//images/stories/3xp.php    2
  3. 2013-09-19  /ucp.php    1
  4. 2013-09-19  /wp-admin/admin-ajax.php    120
  5. 2013-09-19  /wp-admin/async-upload.php  3
  6. 2013-09-19  /wp-admin/edit-comments.php 1
  7. 2013-09-19  /wp-admin/post.php  4
  8. 2013-09-19  /wp-cron.php    24
  9. 2013-09-19  /wp-login.php   2
  10. 2013-09-19  /xmlrpc.php 3
  11. 2013-09-18  //components/com_jnews/includes/openflashchart/php-ofc-library/ofc_upload_image.php 1
  12. 2013-09-18  //images/stories/3xp.php    6
  13. 2013-09-18  //images/stories/70cpx.php  2
  14. 2013-09-18  //images/stories/70pet.php  2
  15. 2013-09-18  //images/stories/cr0t.php   1
  16. 2013-09-18  /admin.php  2
  17. 2013-09-18  /administrator/index.php    2
  18. 2013-09-18  /batch.manage.php   21
  19. 2013-09-18  /category/hadoop-action/page/4//images/stories/3xp.php  2
  20. 2013-09-18  /images/stories/cr0t.php    1
  21. 2013-09-18  /index.php  10
  22. 2013-09-18  /index.php/blog/14  1
  23. 2013-09-18  /index.php/quiz/1   1
  24. 2013-09-18  /index.php/quiz/2   1

点击下载测试数据及分析结果数据

相关

利用Apache Spark实现pv统计分析

文件下载

广告也精彩
  • 利用Hadoop Mapreduce实现pv统计分析已关闭评论
  • 456 views
  • A+
所属分类:未分类
avatar