您的位置:首页 > 其它

spark学习七 共享内存的实现(快速的共享数据)

2014-07-22 16:16 429 查看


存储子系统概览(*重要*)



上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下
CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果
BlockManager   CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取
MemoryStore   负责将数据保存在内存或从内存读取
DiskStore        负责将数据写入磁盘或从磁盘读入
BlockManagerWorker  数据写入本地的MemoryStore或DiskStore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由BlockManagerWorker来处理这一部分事情
ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收
BlockManagerMaster 注意该模块只运行在Driver Application所在的Executor,功能是负责记录下所有BlockIds存储在哪个SlaveWorker上,比如RDD Task运行在机器A,所需要的BlockId为3,但在机器A上没有BlockId为3的数值,这个时候Slave worker需要通过BlockManager向BlockManagerMaster询问数据存储的位置,然后再通过ConnectionManager去获取




支持的操作

由于BlockManager起到实际的存储管控作用,所以在讲支持的操作的时候,以BlockManager中的public api为例
put  数据写入
get      数据读取
remoteRDD 数据删除,一旦整个job完成,所有的中间计算结果都可以删除




启动过程分析

上述的各个模块由SparkEnv来创建,创建过程在SparkEnv.create中完成

val blockManagerMaster = new BlockManagerMaster(
<span style="white-space:pre">	</span>registerOrLookup("BlockManagerMaster",<span style="font-family: Verdana, Arial, Helvetica, sans-serif;">new BlockManagerMasterActor(isLocal, conf)), </span>
<span style="white-space:pre">	</span>conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了BlockManagerMasterActor,其实不然,仔细看registerOrLookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
val timeout = AkkaUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}初始化过程中一个主要的动作就是BlockManager需要向BlockManagerMaster发起注册


数据写入过程分析(*重要*)



数据写入的简要流程
RDD.iterator是与storage子系统交互的入口
CacheManager.getOrCompute调用BlockManager的put接口来写入数据
数据优先写入到MemoryStore即内存,如果MemoryStore中的数据已满则将最近使用次数不频繁的数据写入到磁盘
通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据
将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1




数据读取过程分析

def get(blockId: BlockId): Option[Iterator[Any]] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo("Found block %s locally".format(blockId))
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
}


远程读取

远程获取调用路径, getRemote->doGetRemote, 在doGetRemote中最主要的就是调用BlockManagerWorker.syncGetBlock来从远程获得数据
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
val blockManager = blockManagerWorker.blockManager
val connectionManager = blockManager.connectionManager
val blockMessage = BlockMessage.fromGetBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage)
val responseMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
responseMessage match {
case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
logDebug("Response message received " + bufferMessage)
BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
logDebug("Found " + blockMessage)
return blockMessage.getData
})
}
case None => logDebug("No response message received")
}
null
}

上述这段代码中最有意思的莫过于sendMessageReliablySync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?

别急,继续去看看sendMessageReliablySync的定义

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Option[Message]] = {
val promise = Promise[Option[Message]]
val status = new MessageStatus(
message, connectionManagerId, s => promise.success(s.ackMessage))
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
sendMessage(connectionManagerId, message)
promise.future
}要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字Promise和Future没。

如果这个future执行完毕,返回s.ackMessage。我们再看看这个ackMessage是在什么地方被写入的呢。看一看ConnectionManager.handleMessage中的代码片段

case bufferMessage: BufferMessage => {
if (authEnabled) {
val res = handleAuthentication(connection, bufferMessage)
if (res == true) {
// message was security negotiation so skip the rest
logDebug("After handleAuth result was true, returning")
return
}
}
if (bufferMessage.hasAckId) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
status
}
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
null
}
}
}
sentMessageStatus.synchronized {
sentMessageStatus.ackMessage = Some(message)
sentMessageStatus.attempted = true
sentMessageStatus.acked = true
sentMessageStaus.markDone()
}
注意,此处的所调用的sentMessageStatus.markDone就会调用在sendMessageReliablySync中定义的promise.Success.
不妨看看MessageStatus的定义。

class MessageStatus(
val message: Message,
val connectionManagerId: ConnectionManagerId,
completionHandler: MessageStatus => Unit) {

var ackMessage: Option[Message] = None
var attempted = false
var acked = false

def markDone() { completionHandler(this) }
}
我想至此调用关系搞清楚了,scala中的Future和Promise理解起来还有有点费劲。




TachyonStore值得关注的项目(分布式内存文件系统)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark