您的位置:首页 > 其它

spark入门学习(2)---利用akka建立基于心跳基础的通信框架

2017-01-12 10:59 435 查看

1、架构图



2、业务要求

通信业务逻辑:

首先启动master,然后启动所有的worker

1.worker启动后,在preStart方法中与master建立连接,向master发送注册,将worker的信息通过case class封装起来发送给master

2.master接受到worker的注册消息后将worker的消息保存起来,然后向worker反馈注册成功

3.worker向master定期发送心跳,报活

4.master定时清理超时的worker

3、代码实现

3.1、master

Master.scala

package org.tianjun.rpc

import akka.actor.{Props, ActorSystem, Actor}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable
import scala.concurrent.duration._

/**
* 基于akka的rpc
* Created by tianjun on 2017/1/11 0011.
*/
class Master(val host:String,val port:Int) extends Actor{

//  println("constructor invoked!")

//workerId-->workerInfo
val idToWorker = new mutable.HashMap[String,WorkInfo]()
//workerINfo
val workes = new mutable.HashSet[WorkInfo]
//超时检测的间隔
val CHECK_INTERVAL = 15000

override def preStart(): Unit = {
println("preStart invoked!")
//导入隐式转换
import context.dispatcher
context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
}

override def receive: Receive = {

case RegisterWorker(id,memory,cores) => {
//判断是否已经注册
if(!idToWorker.contains(id)){
//没有注册,就把worker的信息封装保存到内存中
val workerInfo = new WorkInfo(id,memory,cores)
idToWorker(id) = workerInfo
workes+=workerInfo
sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")
}
}
case HeartBeat(id)=>{
if(idToWorker.contains(id)){
val workInfo = idToWorker(id)
//报活
val currentTime = System.currentTimeMillis()
workInfo.lastHearBeatTime = currentTime
}
}

case CheckTimeOutWorker => {
val currentTime = System.currentTimeMillis()
val toRemove = workes.filter(x=>currentTime-x.lastHearBeatTime > CHECK_INTERVAL)
for(w <- toRemove){
workes -= w
idToWorker -= w.id
}

println(workes.size)
}
}
}

object Master{

def main(args: Array[String]) {

val host = args(0)
val port = args(1).toInt

val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem老大,辅助创建和监控actor,它是单例的
val actorSystem = ActorSystem("MasterSystem",config)

val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")

actorSystem.awaitTermination()

}
}


3.2、worker

Worker.scala

package org.tianjun.rpc

import java.util.UUID

import akka.actor.{Props, ActorSystem, ActorSelection, Actor}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

/**
* rpc入门
* Created by tianjun on 2017/1/11 0011.
*/
class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor{

var master:ActorSelection = _

val workId = UUID.randomUUID().toString

val Heart_INTERVAL=10000

override def preStart(): Unit = {
//跟Master建立链接
master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
//向Master发送注册信息
master ! RegisterWorker(workId,memory,cores)
}

override def receive: Receive = {
case RegisteredWorker(masterUrl) => {
println(masterUrl)
//启动定时器发送心跳
import context.dispatcher
context.system.scheduler.schedule(0 millis,Heart_INTERVAL millis,self,SendHeartbeat)
}

case SendHeartbeat =>{
println("send heartbeat to master")
master ! HeartBeat(workId)
}
}
}

object Worker{
def main(args: Array[String]) {
val host = args(0)
val port = args(1).toInt
val masterHost=args(2)
val masterPort = args(3).toInt

val memory = args(4).toInt
val cores = args(5).toInt

val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem老大,辅助创建和监控actor,它是单例的
val actorSystem = ActorSystem("WorkSystem",config)

actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory ,cores)),"Worker")

actorSystem.awaitTermination()
}
}


RemoteMessage.scala

package org.tianjun.rpc

/**
* 跨进程通信
* Created by tianjun on 2017/1/11 0011.
*/
trait RemoteMessage extends Serializable

//Worker-->Master
case class RegisterWorker(id:String,memory:Int,cores:Int) extends RemoteMessage

//worker->master
case class HeartBeat(id:String)

//Master-->Worker
case class RegisteredWorker(masterUrl:String) extends RemoteMessage

//master->self
case object CheckTimeOutWorker
//worker-->self
case object SendHeartbeat


WorkInfo.scala

package org.tianjun.rpc

/**
* Created by tianjun on 2017/1/11 0011.
*/
class WorkInfo(val id:String,val memory:Int,cores:Int) {

//TODO 上一次心跳
var lastHearBeatTime : Long = _

}


4、运行

master:

preStart invoked!
0
1
1


worker:

akka.tcp://MasterSystem@192.168.64.1:8888/user/Master
send heartbeat to master
send heartbeat to master
send heartbeat to master
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐