您的位置:首页 > 其它

Scala用actor编写简单WordCount

2018-01-16 16:58 423 查看
package cn.allengao.actor

import java.io.File

import scala.actors.{Actor, Future}

//旧版本还是使用actors的actor,而不是使用akka的actor
import scala.collection.mutable
import scala.io.Source

class Task extends Actor {

override def act(): Unit = {
//希望程序线程池可以复用,效率会更高一些
loop {
react {
//首先提交任务,告诉程序要执行的文件
case SubmitTask(fileName) => {
//getLines取出的是一个迭代器,可以实现从文件中取一些,读一些的操作。进行局部汇总。
val result = Source.fromFile(fileName).getLines().flatMap(_.split(" ")).map((_, 1)).toList.groupBy(_._1).mapValues(_.size)
//把result的结果发送给case class ResultTask。
sender ! ResultTask(result)
}
//如果任务执行完就停止
case StopTask => {
exit()
}
}
}
}
}

case class SubmitTask(fileName: String)

case class ResultTask(result: Map[String, Int])

case object StopTask

object MyWordCount {
def main(args: Array[String]) {
val files = Array[String]("d://files//word1.txt", "d://files//word2.log")
//用HashSet可变集合将replySet装起来
val replySet = new mutable.HashSet[Future[Any]]
val resultList = new mutable.ListBuffer[ResultTask]

//在这里有多少个文件就new多少个actor
for (f <- files) {
val actor = new Task
//使用异步发送消息,返回值是 Future[Any]
val reply = actor.start() !! SubmitTask(f)
replySet += reply
}
//通过while循环读取replySet的内容,如果replySet.size=0,程序停止计算。
while (replySet.size > 0) {
//通过filter方法生成一个新的HashSet,里面装载的是任务完成的Future。
val toCumpute = replySet.filter(_.isSet)
for (r <- toCumpute) {
//将结果取出来,相当于java中的get方法,取出来的是一个[Any]。
val result = r.apply().asInstanceOf[ResultTask]
resultList += result
//将计算完的replaySet
replySet -= r
}
//防止局部任务执行冲突,设置一个延迟
Thread.sleep(100)
}
//汇总的功能
//List(Map(),Map(),Map()),flat之后将Map()压平,生成List((hello,5),(tom,2),(hello,2),(jerry,2))
val finalResult = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)).toList.sortBy(_._2).reverse
println(finalResult)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: