使用scala的Actor模拟计算多文件WordCount
2017-03-22 15:50
399 查看
scala的Actor是基于事件模型的,具体的模型可以自己查询资料,这里根据别人的demo代码自己也写了一个基于Actor的事件模型的多文件计算WordCount,代码中我写了详细的注释,仅供参考
首先在D盘下面创建三个文件,里面写一些单词用空格分开:
具体代码如下:
运行结果如下:
首先在D盘下面创建三个文件,里面写一些单词用空格分开:
具体代码如下:
package com.lijie.actor import scala.actors._ import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.io.Source /** * Created by Lijie on 2017/3/22. */ class MyActor extends Actor { //重写Actor中的act()方法,和java中的run()一样 override def act(): Unit = { //相当于while(true) 这里用react可以复用线程池 loop { //这是个偏函数,定义两个模式匹配 react { case MapCalculation(fileName) => { //计算文件中的word的数量,并用sender把消息发送回去 sender ! MapResult(Source.fromFile(fileName).getLines().flatMap(_.split(" ")).map((_, 1)).toList.groupBy(_._1).mapValues(_.size)) } } } } } //定义一个case class 用来模拟map的word统计 case class MapCalculation(fileName: String) //定义一个case class 用来模拟map的word统计的返回结果封装 case class MapResult(mapResult: Map[String, Int]) object MyActor { def main(args: Array[String]): Unit = { //用来存储异步返回的future val futures = new mutable.HashSet[Future[Any]]() //用来存储map返回的值 val listRes = new ListBuffer[Map[String, Int]]() //定义文件的位置,类似于mapreduce的FileInputFormat.addInputPath val files = Array("D:\\lijietest01.txt", "D:\\lijietest02.log", "D:\\lijietest03.txt") //循环向MyActor发送消息 for (fileName <- files) { //创建消息对象 并启动 且发送异步消息 //每启动一个发送一个异步消息并返回一个Future,这里同java中的Callable val actor = new MyActor //发送异步消息并返回Future(! !? !!) val future = actor.start() !! MapCalculation(fileName) //扔到hashSet中 futures += future } //模拟reduce while (futures.size > 0) { //这里取出已经计算完成的异步返回 isSet类似于Callable的isDone返回boolean,将计算完的结果放到res val res = futures.filter(_.isSet) for (r <- res) { //这里的apply()相当于Callable的get(),强转成MapResult(上面那个case class),取出里面的map并且放入到一个list中 listRes += r.apply().asInstanceOf[MapResult].mapResult //移除futures已经处理的对象,直到移除完 这个while循环就终止 futures -= r //如果文件很大,最后设定睡眠时间,不然cpu会空跑 Thread.sleep(100) } } //对上面处理完成的map结果进行reduce //其中这个listRes打印出来如下:(因为我这里三个文件内容一样,所以每个map里面的数据一样) // ListBuffer( // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1), // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1), // Map(world -> 1, soga -> 1, scala -> 2, lijie -> 4, abcd -> 1, hello -> 3, nihao -> 1) // ) val wordCount = listRes.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) //测试打印结果 //Map(world -> 3, soga -> 3, scala -> 6, lijie -> 12, abcd -> 3, hello -> 9, nihao -> 3) println(wordCount) } }
运行结果如下:
相关文章推荐
- 使用Scala编写WordCount详细分析
- Scala简单单机actorwordcount
- 云计算(二十三)-编写WordCount并使用MRUnit测试
- Scala中使用两种方式对单词进行次数统计(wordCount)
- 快学Scala-Actor并发编程实现WordCount
- scala akka 修炼之路1(使用actor实现一个job的并发计算和task失败重启)
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
- idea+maven+scala创建wordcount,打包jar并在spark on yarn上运行(可以使用)
- Spark来监控hdfs里的文件,并用wordcount计算
- 使用 scala 实现单机版 WordCount
- wordcount-使用eclipse插件中对文件路径的修改
- Spark实战----(1)使用Scala开发本地测试的Spark WordCount程序
- Scala用actor编写简单WordCount
- 使用POI来处理Excel和Word文件格式
- 同时启动了多个word的exe,word退出的时候,会报警告“此文件正由另一个应用程序或用户使用” normal.dot
- 使用js比较word文件
- 使用iframe模拟无刷新上传文件。
- 【OS课程设计二】模拟DOS系统文件的物理结构和管理使用
- MASM32编程使用PE文件头信息计算文件长度
- 使用Delphi获取Word文件中的数据