利用Spark通过nginx日志离线统计网站每日pv

摘 要

本文将介绍通过Apache Spark实现离线统计网站每日pv的思路及代码。

前言

在此之前,利用mapreduce实现了一版通过nginx日志离线分析网站每日pv,感兴趣的可以去看一下。本文实现思路与之前mapreduce的思路一致。可以很好的比较mapreduce和Spark的写法。在个人看来,Spark写起来更加优美简洁,有一种四两拨千斤的感觉。

利用Spark通过nginx日志离线统计网站每日pv

想了解实现思路的,可以看一下利用Mapreduce实现的文章,详细思路已经阐述。

点击查看->利用HadoopMareduce实现pv统计分析

本文与Hadoop Mapreduce采用的数据集为同一个,为标准的nginx日志文件。在上文中已经提供了下载附件。感兴趣的可以去下载。

代码实现

  1. package com.itunic.rdd
  2. import java.text.SimpleDateFormat
  3. import java.util.{Date, Locale}
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. import scala.collection.mutable
  6. /**
  7.   * Created by c on 2017/1/11.
  8.   * 通过nginx日志统计每日pv,并按照日期和pv排序
  9.   * by me:
  10.   * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
  11.   * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
  12.   * 博客宗旨:把最实用的经验,分享给最需要的你,
  13.   * 希望每一位来访的朋友都能有所收获!
  14.   *
  15.   */
  16. object NginxLogPV {
  17.   /**
  18.     * 设置需要统计的页面
  19.     */
  20.   val pages = new mutable.HashSet[String]()
  21.   pages += ".php"
  22.   /**
  23.     * 封装KPI实体类
  24.     *
  25.     * @param line
  26.     * @return KPI
  27.     */
  28.   def parser(line: String): KPI = {
  29.     //
  30.     val fields = line.split(" ")
  31.     val remote_addr = fields(0)
  32.     val time_local = fields(3).substring(1)
  33.     val request = fields(6)
  34.     val status = fields(8)
  35.     var valid = true
  36.     if (fields.length <= 11) {
  37.       valid = false
  38.     } else {
  39.       valid = if (status.toInt >= 400false else true
  40.     }
  41.     val url = if (request.indexOf("?") != -1) request.substring(0, request.indexOf("?")) else request
  42.     KPI(remote_addr, time_local, url, status, valid)
  43.   }
  44.   /**
  45.     * 过滤无效数据
  46.     *
  47.     * @param line
  48.     * @return
  49.     */
  50.   def filterPVs(line: String): KPI = {
  51.     val kpi: KPI = parser(line)
  52.     /**
  53.       * 过滤需要统计的URL
  54.       */
  55.     kpi.valid = false
  56.     for (page <- pages) {
  57.       if (kpi.request != null) {
  58.         if (kpi.request.contains(page)) {
  59.           kpi.valid = true
  60.         }
  61.       }
  62.     }
  63.     return kpi;
  64.   }
  65.   /**
  66.     * 将nginx日志时间转换为常规日期
  67.     *
  68.     * @param time_local
  69.     * @return
  70.     */
  71.   def getTime_local_Date(time_local: String): Date = {
  72.     val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US)
  73.     df.parse(time_local)
  74.   }
  75.   /**
  76.     * 日期格式化
  77.     *
  78.     * @param time_local
  79.     * @return
  80.     */
  81.   def getTime_local_day(time_local: String): String = {
  82.     val df = new SimpleDateFormat("yyyy-MM-dd");
  83.     df.format(getTime_local_Date(time_local));
  84.   }
  85.   def main(args: Array[String]): Unit = {
  86.     // StreamingExamples.setStreamingLogLevels()
  87.     val conf = new SparkConf().setAppName("NginxLogPV").setMaster("local")
  88.     val sc = new SparkContext(conf)
  89.     val rdd = sc.textFile("F:\\test\\input\\access.log").map(x => {
  90.       /**
  91.         * 封装并过滤数据
  92.         */
  93.       filterPVs(x)
  94.     }).filter(x => {
  95.       /**
  96.         * 过滤有效数据
  97.         */
  98.       x.valid
  99.     }).map(x => {
  100.       /**
  101.         * 封装 key-value数据
  102.         */
  103.       ((getTime_local_day(x.time_local), x.request), 1)
  104.     }).reduceByKey(_ + _) //聚合
  105.     /**
  106.       * 二次排序
  107.       */
  108.     val rdd6 = rdd.sortBy(x => PVSort(x._1._1, x._2))
  109.     /**
  110.       * 格式化数据并输出到磁盘
  111.       */
  112.     rdd6.map(x => {
  113.       x._1._1 + "\t" + x._1._2 + "\t" + x._2
  114.     }).saveAsTextFile("F:\\test\\input\\wc231")
  115.     // println(rdd5.collect().toBuffer)
  116.     sc.stop()
  117.   }
  118. }
  119. /**
  120.   * 自定义排序,日期升序,点击量降序
  121.   *
  122.   * @param date
  123.   * @param count
  124.   */
  125. case class PVSort(date: String, count: Int) extends Ordered[PVSort] with Serializable {
  126.   override def compare(that: PVSort): Int = {
  127.     val i = this.date.compareTo(that.date)
  128.     if (i == 0) {
  129.       return -this.count.compareTo(that.count)
  130.     } else {
  131.       return i
  132.     }
  133.   }
  134. }
  135. /**
  136.   * kpi样例类
  137.   *
  138.   * @param remote_addr
  139.   * @param time_local
  140.   * @param request
  141.   * @param status
  142.   * @param valid
  143.   */
  144. case class KPI(
  145.                 remote_addr: String, //来访ip
  146.                 time_local: String, //来访时间
  147.                 request: String, //受访页面
  148.                 status: String, //状态
  149.                 var valid: Boolean = true //判断是否合法
  150.               ) extends Serializable

nginx 日志示例

  1. 50.116.27.194 - - [18/Sep/2013:07:11:29 +0000"POST /wp-cron.php?doing_wp_cron=1379488288.8893849849700927734375 HTTP/1.0" 200 0 "-" "WordPress/3.6; http://itunic.com"

统计结果示例

  1. 2013-09-18  /wp-admin/admin-ajax.php    200
  2. 2013-09-18  /wp-cron.php    73
  3. 2013-09-18  /batch.manage.php   21
  4. 2013-09-18  /index.php  10
  5. 2013-09-18  /tag/waitoutputthreads/index.php    10
  6. 2013-09-19  /wp-admin/admin-ajax.php    120
  7. 2013-09-19  /wp-cron.php    24
  8. 2013-09-19  /index.php  13
  9. 2013-09-19  /register.php   9
  10. 2013-09-19  /wp-admin/post.php  4
  11. 2013-09-19  /wp-admin/async-upload.php  3

 

avatar

评论已关闭!

目前评论:2   其中:访客  2   博主  0

    • avatar 萨瓦底卡 0

      很简练,正好今天学习到这里,就看到博主的这篇文章,继续关注博主更新。

        • avatar 沙拉黑油 0

          @萨瓦底卡 我也是,约个伴一起学习,共同进步,娃哈哈