您的位置:首页 > 其它

IntelliJ IDEA使用SBT构建一个AKKA Scala程序

2016-08-03 11:33 513 查看
1、下载安装 IntelliJ IDEA

2、下载安装 Scala

3、在IntelliJ IDEA中安装Scala插件



4、创建工程











5、设置build.sbt

在项目中的build.sbt中追加下图所示的resolvers和libraryDependencies

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11" % "2.3.4"




添加后,若当前电脑没有设置版本的akka,则会显示红线,保存后会自动下载akka

6、开始写代码

例子1

我们要做的设计是由一个 主 actor来启动整个计算过程,创建一组 工作 actor. 整个工作会被分割成具体的小段, 各小段会以round-robin的方式发送到不同的工作
actor. 主actor等待所有的工作actor完全各自的工作并将其回送的结果进行汇总。当计算完成以后,主actor将结果发送给 监听器 acotr, 由它来输出结果。

/**
* Created by yangruimin on 2016/8/3.
*/

import java.util.concurrent.TimeUnit

import akka.actor._
import akka.routing.RoundRobinRouter

import scala.concurrent.duration.Duration

object Pi extends App {

calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)

sealed trait PiMessage
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
case class PiApproximation(pi: Double, duration: Duration)

class Worker extends Actor {

def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i ← start until (start + nrOfElements))
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
acc
}

def receive = {
case Work(start, nrOfElements) ⇒
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}

class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
extends Actor {

var pi: Double = _
var nrOfResults: Int = _
val start: Long = System.currentTimeMillis

val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

def receive = {
case Calculate ⇒
for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
case Result(value) ⇒
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) {
// Send the result to the listener
listener ! PiApproximation(pi, duration = Duration.create(System.currentTimeMillis - start, TimeUnit.MILLISECONDS))
// Stops this actor and all its supervised children
context.stop(self)
}
}

}

class Listener extends Actor {
def receive = {
case PiApproximation(pi, duration) ⇒
println("\n\tPi approximation: \t\t%s\n\tCalculation time: \t\t%s\n"
.format(pi, duration))
context.system.shutdown()
}
}

def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// Create an Akka system
val system = ActorSystem("PiSystem")

// create the result listener, which will print the result and shutdown the system
val listener = system.actorOf(Props[Listener], name = "listener")

// create the master
val master = system.actorOf(Props(new Master(
nrOfWorkers, nrOfMessages, nrOfElements, listener)),
name = "master")

// start the calculation
master ! Calculate

}
}
运行结果如下:



例子2

使用?标识符,从feature获取计算文件行数的的结果

import akka.actor.{Actor, ActorRef, ActorSystem, Props}

import scala.concurrent.Await

/**
* Created by yangruimin on 2016/8/3.
*/
object Sample extends App
{

import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._

implicit val ec = global

override def main(args: Array[String])
{
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor("C:\\Users\\Administrator\\Desktop\\file.txt")))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
val result = Await.result(future, timeout.duration)
println("Total number of words " + result)
system.shutdown
}

/**
* main actor
*/
case class StartProcessFileMsg()
class WordCounterActor(filename: String) extends Actor
{
private var running = false
private var totalLines = 0
private var linesProcessed = 0
private var result = 0
private var fileSender: Option[ActorRef] = None

def receive =
{
case StartProcessFileMsg() =>
{
if (running)
{
// println just used for example purposes;
// Akka logger should be used instead
println("Warning: duplicate start message received")
} else
{
running = true
fileSender = Some(sender) // save reference to process invoker
import scala.io.Source._
fromFile(filename).getLines.foreach
{ line =>
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
totalLines += 1
}
}
}
case StringProcessedMsg(words) =>
{
result += words
linesProcessed += 1
if (linesProcessed == totalLines)
{
fileSender.map(_ ! result) // provide result to process invoker
}
}
case _ => println("message not recognized!")
}
}

/**
* secondary actor
*/
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor
{
def receive =
{
case ProcessStringMsg(string) =>
{
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
}
运行结果如下:



Demo源码:http://download.csdn.net/detail/lyyybz/9609243
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: