基于Scala的Actor对字进行统计
用actor并发编程写一个单机版的WorldCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果
package com.zhiyou100.ScalaActor_akka
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source
class ActorWordCount extends Actor{
override def act(): Unit = {
loop{
react{
case MapTask(filename) =>{
//map的业务逻辑 1.读取文件2. 单词切割 3. (key,1) 4.combiner
//局部汇总:结果是Map[String,Int]
val result = Source.fromFile(filename).getLines().
flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size)
//将Map发送给reduce
sender ! ReduceTask(result)
}
case ExistTask =>{
exit()
}
}
}
}
}
object ActorWordCount{
def main(args: Array[String]): Unit = {
val resSet = new mutable.HashSet[Future[Any]]()
val resultList=new ListBuffer[ReduceTask]
val files=Array[String](xs="E:\\1\\2\\1\\MR.txt")
for (filename<- files){
val actor=new ActorWordCount
val res = actor.start() !! MapTask(filename) //启动actor并向Map发送消息,返回值是一个Future
resSet += res //把这些Future
}
while(resSet.size>0){
val toHandle= resSet.filter(_.isSet) //取出有效的结果,待处理的数据
for (f<- toHandle){
val result=f.apply()
val result1 = result .asInstanceOf[ReduceTask] //获取ReduceTask的实例
resultList += result1 //将有效的数据放到另一个容器中
resSet -=f //从futuer的集合中删除用过的元素
}
}
println(resultList)
val rd1 = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
println(rd1)
//resultlist:((hello,3),(hello,2),(tom,1)...) 此时是一个wordCount
val r= resultList.flatMap(_.result).groupBy(_._1).mapValues(_.size)
println(r.toBuffer)
}
}
case class MapTask(filename:String)
case class ReduceTask(result:Map[String,Int])
case object ExistTask //退出Task