基于Scala Acotor实现多线程单词统计(WordCount)

摘 要

基于Scala Acotor实现多线程单词统计(WordCount)

  1. package com.itunic.scala
  2. import scala.io.Source
  3. import scala.actors.{Actor, Future}
  4. import scala.collection.mutable
  5. /**
  6.   * Created by itunic.com on 2016/12/9.
  7.   */
  8. class WordCountActor extends Actor {
  9.   override def act(): Unit = {
  10.     loop {
  11.       react {
  12.         case SubmitTask(fileName) => {
  13.           //Map(tom -> 1, jm -> 2, 666 -> 2, hello -> 3)
  14.           //每个线程单独计算一次
  15.           val result = Source.fromFile(fileName).getLines().flatMap(_.split("\t")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
  16.           sender ! ResultTask(result)
  17.         }
  18.         case StopTask => {
  19.           exit()
  20.         }
  21.       }
  22.     }
  23.   }
  24. }
  25. //提交任务样例类
  26. case class SubmitTask(fileName: String)
  27. //返回结果样例类
  28. case class ResultTask(result: Map[String, Int])
  29. //结束任务样例类
  30. case object StopTask
  31. object WordCountActor {
  32.   def main(args: Array[String]): Unit = {
  33.     //存放返回结果集
  34.     val resultSet = new mutable.HashSet[Future[Any]]()
  35.     //汇总结果集
  36.     val resultList = new mutable.ListBuffer[ResultTask]
  37.     //文件池
  38.     val files = Array("F:\\test.txt""F:\\test1.txt""F:\\test.txt""F:\\test1.txt")
  39.     //循环读取并启动线程
  40.     for (i <- files) {
  41.       val actor = new WordCountActor
  42.       //启动并异步接收结果
  43.       val result = actor.start() !! SubmitTask(i)
  44.       resultSet += result
  45.     }
  46.     //合并结果集
  47.     while (resultSet.size > 0) {
  48.       val toCumpute = resultSet.filter(_.isSet)
  49.       //println(toCumpute.toBuffer)
  50.       for (i <- toCumpute) {
  51.         val res = i.apply().asInstanceOf[ResultTask]
  52.         resultList += res
  53.         resultSet.remove(i)
  54.       }
  55.       //防止死鎖休眠100毫秒
  56.       Thread.sleep(100)
  57.     }
  58.     //计算最终结果
  59.     val count = resultList.flatMap(_.result).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
  60.     println(count)
  61.   }
  62. }
  • 基于Scala Acotor实现多线程单词统计(WordCount)已关闭评论
  • 71 views
  • A+
所属分类:未分类
avatar