spark入门学习(2)---利用akka建立基于心跳基础的通信框架
2017-01-12 10:59
435 查看
1、架构图
2、业务要求
通信业务逻辑:首先启动master,然后启动所有的worker
1.worker启动后,在preStart方法中与master建立连接,向master发送注册,将worker的信息通过case class封装起来发送给master
2.master接受到worker的注册消息后将worker的消息保存起来,然后向worker反馈注册成功
3.worker向master定期发送心跳,报活
4.master定时清理超时的worker
3、代码实现
3.1、master
Master.scalapackage org.tianjun.rpc import akka.actor.{Props, ActorSystem, Actor} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ /** * 基于akka的rpc * Created by tianjun on 2017/1/11 0011. */ class Master(val host:String,val port:Int) extends Actor{ // println("constructor invoked!") //workerId-->workerInfo val idToWorker = new mutable.HashMap[String,WorkInfo]() //workerINfo val workes = new mutable.HashSet[WorkInfo] //超时检测的间隔 val CHECK_INTERVAL = 15000 override def preStart(): Unit = { println("preStart invoked!") //导入隐式转换 import context.dispatcher context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker) } override def receive: Receive = { case RegisterWorker(id,memory,cores) => { //判断是否已经注册 if(!idToWorker.contains(id)){ //没有注册,就把worker的信息封装保存到内存中 val workerInfo = new WorkInfo(id,memory,cores) idToWorker(id) = workerInfo workes+=workerInfo sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master") } } case HeartBeat(id)=>{ if(idToWorker.contains(id)){ val workInfo = idToWorker(id) //报活 val currentTime = System.currentTimeMillis() workInfo.lastHearBeatTime = currentTime } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workes.filter(x=>currentTime-x.lastHearBeatTime > CHECK_INTERVAL) for(w <- toRemove){ workes -= w idToWorker -= w.id } println(workes.size) } } } object Master{ def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控actor,它是单例的 val actorSystem = ActorSystem("MasterSystem",config) val master = actorSystem.actorOf(Props(new Master(host,port)),"Master") actorSystem.awaitTermination() } }
3.2、worker
Worker.scalapackage org.tianjun.rpc import java.util.UUID import akka.actor.{Props, ActorSystem, ActorSelection, Actor} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** * rpc入门 * Created by tianjun on 2017/1/11 0011. */ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor{ var master:ActorSelection = _ val workId = UUID.randomUUID().toString val Heart_INTERVAL=10000 override def preStart(): Unit = { //跟Master建立链接 master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //向Master发送注册信息 master ! RegisterWorker(workId,memory,cores) } override def receive: Receive = { case RegisteredWorker(masterUrl) => { println(masterUrl) //启动定时器发送心跳 import context.dispatcher context.system.scheduler.schedule(0 millis,Heart_INTERVAL millis,self,SendHeartbeat) } case SendHeartbeat =>{ println("send heartbeat to master") master ! HeartBeat(workId) } } } object Worker{ def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val masterHost=args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控actor,它是单例的 val actorSystem = ActorSystem("WorkSystem",config) actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory ,cores)),"Worker") actorSystem.awaitTermination() } }
RemoteMessage.scala
package org.tianjun.rpc /** * 跨进程通信 * Created by tianjun on 2017/1/11 0011. */ trait RemoteMessage extends Serializable //Worker-->Master case class RegisterWorker(id:String,memory:Int,cores:Int) extends RemoteMessage //worker->master case class HeartBeat(id:String) //Master-->Worker case class RegisteredWorker(masterUrl:String) extends RemoteMessage //master->self case object CheckTimeOutWorker //worker-->self case object SendHeartbeat
WorkInfo.scala
package org.tianjun.rpc /** * Created by tianjun on 2017/1/11 0011. */ class WorkInfo(val id:String,val memory:Int,cores:Int) { //TODO 上一次心跳 var lastHearBeatTime : Long = _ }
4、运行
master:preStart invoked! 0 1 1
worker:
akka.tcp://MasterSystem@192.168.64.1:8888/user/Master send heartbeat to master send heartbeat to master send heartbeat to master
相关文章推荐
- spark入门学习(1)---利用akka建立最基础的通信框架
- Spark入门到精通视频学习资料--第四章:基于Spark的流处理框架Spark Streaming(2讲)
- 安卓网络通信框架Volley学习(二)基于Volley高效加载网络图片
- 股票入门基础知识43:利用多时间框架分析
- 快速入门过程与方法:设计与思路;如何学习新的知识框架,建立思维模式,熟悉应用场景体系
- Spark MLlib 入门学习笔记 - RDD基础
- 初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,
- OpenCV基础入门 基于官方文档解读(3)--core模块学习
- 零基础入门深度学习之-002-徒手搭建Spark
- OpenCV基础入门 基于官方文档解读(5)--core模块学习
- Spark为何使用Netty通信框架替代Akka
- Android基础入门教程——7.6.2 基于TCP协议的Socket通信(1)
- SSM框架基础入门学习1——servlet入门实例(包含mysql增查,实例免费下载)
- 【零基础入门学习Python笔记007】了不起的分支和循环1:打飞机框架
- 深度学习Deeplearning4j 入门实战(5):基于多层感知机的Mnist压缩以及在Spark实现
- 2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门
- Android基础入门教程——7.6.3 基于TCP协议的Socket通信(2)
- 基于ASP.NET MVC的ABP框架入门学习教程
- 基于ASP.NET MVC的ABP框架入门学习教程
- ci框架基础详解(入门学习)