前言
在此之前,利用mapreduce实现了一版通过nginx日志离线分析网站每日pv,感兴趣的可以去看一下。本文实现思路与之前mapreduce的思路一致。可以很好的比较mapreduce和Spark的写法。在个人看来,Spark写起来更加优美简洁,有一种四两拨千斤的感觉。
想了解实现思路的,可以看一下利用Mapreduce实现的文章,详细思路已经阐述。
点击查看->利用HadoopMareduce实现pv统计分析
本文与Hadoop Mapreduce采用的数据集为同一个,为标准的nginx日志文件。在上文中已经提供了下载附件。感兴趣的可以去下载。
代码实现
- package com.itunic.rdd
- import java.text.SimpleDateFormat
- import java.util.{Date, Locale}
- import org.apache.spark.{SparkConf, SparkContext}
- import scala.collection.mutable
- /**
- * Created by c on 2017/1/11.
- * 通过nginx日志统计每日pv,并按照日期和pv排序
- * by me:
- * 我本沉默是关注互联网以及分享IT相关工作经验的博客,
- * 主要涵盖了操作系统运维、计算机编程、项目开发以及系统架构等经验。
- * 博客宗旨:把最实用的经验,分享给最需要的你,
- * 希望每一位来访的朋友都能有所收获!
- *
- */
- object NginxLogPV {
- /**
- * 设置需要统计的页面
- */
- val pages = new mutable.HashSet[String]()
- pages += ".php"
- /**
- * 封装KPI实体类
- *
- * @param line
- * @return KPI
- */
- def parser(line: String): KPI = {
- //
- val fields = line.split(" ")
- val remote_addr = fields(0)
- val time_local = fields(3).substring(1)
- val request = fields(6)
- val status = fields(8)
- var valid = true
- if (fields.length <= 11) {
- valid = false
- } else {
- valid = if (status.toInt >= 400) false else true
- }
- val url = if (request.indexOf("?") != -1) request.substring(0, request.indexOf("?")) else request
- KPI(remote_addr, time_local, url, status, valid)
- }
- /**
- * 过滤无效数据
- *
- * @param line
- * @return
- */
- def filterPVs(line: String): KPI = {
- val kpi: KPI = parser(line)
- /**
- * 过滤需要统计的URL
- */
- kpi.valid = false
- for (page <- pages) {
- if (kpi.request != null) {
- if (kpi.request.contains(page)) {
- kpi.valid = true
- }
- }
- }
- return kpi;
- }
- /**
- * 将nginx日志时间转换为常规日期
- *
- * @param time_local
- * @return
- */
- def getTime_local_Date(time_local: String): Date = {
- val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US)
- df.parse(time_local)
- }
- /**
- * 日期格式化
- *
- * @param time_local
- * @return
- */
- def getTime_local_day(time_local: String): String = {
- val df = new SimpleDateFormat("yyyy-MM-dd");
- df.format(getTime_local_Date(time_local));
- }
- def main(args: Array[String]): Unit = {
- // StreamingExamples.setStreamingLogLevels()
- val conf = new SparkConf().setAppName("NginxLogPV").setMaster("local")
- val sc = new SparkContext(conf)
- val rdd = sc.textFile("F:\\test\\input\\access.log").map(x => {
- /**
- * 封装并过滤数据
- */
- filterPVs(x)
- }).filter(x => {
- /**
- * 过滤有效数据
- */
- x.valid
- }).map(x => {
- /**
- * 封装 key-value数据
- */
- ((getTime_local_day(x.time_local), x.request), 1)
- }).reduceByKey(_ + _) //聚合
- /**
- * 二次排序
- */
- val rdd6 = rdd.sortBy(x => PVSort(x._1._1, x._2))
- /**
- * 格式化数据并输出到磁盘
- */
- rdd6.map(x => {
- x._1._1 + "\t" + x._1._2 + "\t" + x._2
- }).saveAsTextFile("F:\\test\\input\\wc231")
- // println(rdd5.collect().toBuffer)
- sc.stop()
- }
- }
- /**
- * 自定义排序,日期升序,点击量降序
- *
- * @param date
- * @param count
- */
- case class PVSort(date: String, count: Int) extends Ordered[PVSort] with Serializable {
- override def compare(that: PVSort): Int = {
- val i = this.date.compareTo(that.date)
- if (i == 0) {
- return -this.count.compareTo(that.count)
- } else {
- return i
- }
- }
- }
- /**
- * kpi样例类
- *
- * @param remote_addr
- * @param time_local
- * @param request
- * @param status
- * @param valid
- */
- case class KPI(
- remote_addr: String, //来访ip
- time_local: String, //来访时间
- request: String, //受访页面
- status: String, //状态
- var valid: Boolean = true //判断是否合法
- ) extends Serializable
nginx 日志示例
- 50.116.27.194 - - [18/Sep/2013:07:11:29 +0000] "POST /wp-cron.php?doing_wp_cron=1379488288.8893849849700927734375 HTTP/1.0" 200 0 "-" "WordPress/3.6; http://itunic.com"
统计结果示例
- 2013-09-18 /wp-admin/admin-ajax.php 200
- 2013-09-18 /wp-cron.php 73
- 2013-09-18 /batch.manage.php 21
- 2013-09-18 /index.php 10
- 2013-09-18 /tag/waitoutputthreads/index.php 10
- 2013-09-19 /wp-admin/admin-ajax.php 120
- 2013-09-19 /wp-cron.php 24
- 2013-09-19 /index.php 13
- 2013-09-19 /register.php 9
- 2013-09-19 /wp-admin/post.php 4
- 2013-09-19 /wp-admin/async-upload.php 3
2017年01月13日 下午10:56 沙发
很简练,正好今天学习到这里,就看到博主的这篇文章,继续关注博主更新。
2017年01月13日 下午11:00 1层
@萨瓦底卡 我也是,约个伴一起学习,共同进步,娃哈哈