您的位置:首页 > Web前端 > Node.js

hadoop datanode源码分析

2013-03-13 02:14 489 查看
http://blog.csdn.net/flyingpig4/article/details/7667290

DataNode源代码分析:

1.简介:DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,

同时周期性地将所有存在的Block信息发送给NameNode



2.main启动DataNode



2.1:shell脚本启动DataNode

|-->hadoop/bin/start-all.sh

|-->start-dfs.sh

|-->"$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR start datanode $dataStartOpt



2.2:main()函数启动分析

|-->StringUtils.startupShutdownMessage(DataNode.class, args, LOG); |设置启动和关闭日志信息

|-->toStartupShutdownString()

|-->Runtime.getRuntime().addShutdownHook() |通过设置钩子,完成日志结束标志

|-->DataNode datanode = createDataNode(args, null); |见2.3

|-->datanode.join(); |主线程等待datanode线程执行完成



2.3 createDataNode(args, null) |用于创建Datanode实例,并启动Datanode线程

|-->DataNode dn = instantiateDataNode(args, conf);

|-->runDatanodeDaemon(dn);



2.3.1 instantiateDataNode(args, conf) |实例化DataNode结点

|-->parseArguments(args, conf) |根据args解析加载conf的参数值

|-->String[] dataDirs = conf.getStrings("dfs.data.dir"); |获取datanode的本地存储路径

|-->makeInstance(dataDirs, conf);



2.3.2 makeInstance(dataDirs, conf); |创建Datanode实例

|-->for (int i = 0; i < dataDirs.length; i++)

|-->dirs.add(data);

|-->return new DataNode(conf, dirs); |返回DataNode实例



2.3.3 runDatanodeDaemon(dn); |运行DataNode结点

|-->dn.register(); |向namenode发送注册信息,namenode会通过心跳机制传递命令给datanode

|-->dn.dataNodeThread = new Thread(dn, dnThreadName);

|-->dn.dataNodeThread.setDaemon(true);

|-->dn.dataNodeThread.start();





3.DataNode实例化,通过startDataNode(conf, dataDirs)进行实例化

|-->setMachineName |设置machineName

|-->machineName = conf.get("slave.host.name");

|-->machineName = DNS.getDefaultHost()

|-->nameNodeAddr = NameNode.getAddress(conf); |获取nameNode的地址信息

|-->setSocketout时间

|-->his.socketTimeout = conf.getInt("dfs.socket.timeout",HdfsConstants.READ_TIMEOUT);

|-->this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",

HdfsConstants.WRITE_TIMEOUT);

|-->this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); |写包的大小,默认64K

|-->String address = NetUtils.getServerAddress( |设置地址

conf,

"dfs.datanode.bindAddress",

"dfs.datanode.port",

"dfs.datanode.address");)

|-->InetSocketAddress socAddr = NetUtils.createSocketAddr(address); |创建本地socketaddress地址

|-->int tmpPort = socAddr.getPort(); |端口号

|-->storage = new DataStorage(); |DataStorage保存了存储相关的信息

|-->this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort); |构造一个注册器

|-->this.namenode = (DatanodeProtocol) RPC.waitForProxy(); |通过动态代理生成namenode实例

|-->RPC.class中的getProxy()

|-->VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(

protocol.getClassLoader(), new Class[] { protocol },

new Invoker(addr, ticket, conf, factory));

|-->NamespaceInfo nsInfo = handshake(); |主要包含buildVersin和distributeUpgradeVersion,用于版本检验

|-->nsInfo = namenode.versionRequest();

|-->return namesystem.getNamespaceInfo();

|-->boolean simulatedFSDataset =

conf.getBoolean("dfs.datanode.simulateddatastorage", false);

|-->if (simulatedFSDataset) |判断一下是否是伪分布式,否则走正常判断,此处分析正常逻辑

|-->else

|-->storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);

|-->this.dnRegistration.setStorageInfo(storage); |将storage进行信息注册

|-->this.data = new FSDataset(storage, conf); |根据storage和conf信息,生成FSDataset,用于数据块操作

|-->ServerSocket ss = (socketWriteTimeout > 0) ? |初始化Socket服务器端,区分NIO和IO

ServerSocketChannel.open().socket() : new ServerSocket();

|-->Server.bind(ss, socAddr, 0);

|-->ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); |设置接收的buffer缓存大小,默认64K

|-->selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),

tmpPort);

|-->this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer |初始化处理类dataXceiverServer

(ss, conf, this));

|-->setInterval |分别设置块状态信息间隔时间和心跳间隔时间

|-->blockReportInterval

|-->heartBeatInterval

|-->blockScanner = new DataBlockScanner(this, (FSDataset)data, conf); |blockScanner用于定时对文件块进行扫描

|-->this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort, |创建HttpServer,内部用jetty实现,用于页面监控

tmpInfoPort == 0, conf);

|-->ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), |开启本地ipc服务,监听来自client和其它

conf.getInt("dfs.datanode.handler.count", 3), false, conf); datanode结点的指令信息



4.Datanode线程运行 run()方法

|-->dataXceiverServer.start(); |启动dataXceiverServer服务器

|-->new Daemon(datanode.threadGroup, |根据socket接送状态,启动DataXceiver,见4.1

new DataXceiver(s, datanode, this)).start();

|-->startDistributedUpgradeIfNeeded();

|-->offerService(); |与namenode完成心跳机制,并接受来自namenode的命令 ,见4.2



4.1 DataXceiver的run()方法

|-->in = new DataInputStream( |获取来自namenode结点的流信息

new BufferedInputStream(NetUtils.getInputStream(s),

SMALL_BUFFER_SIZE));

|-->short version = in.readShort(); |读取版本信息

|-->boolean local = s.getInetAddress().equals(s.getLocalAddress()) |判断是否本地地址

|-->byte op = in.readByte(); |获取命令指令,主要有以下几种

|-->DataTransferProtocol.OP_READ_BLOCK |读取block信息

|-->DataTransferProtocol.OP_WRITE_BLOCK: |写block信息

|-->DataTransferProtocol.OP_READ_METADATA: |读取元数据信息

|-->DataTransferProtocol.OP_REPLACE_BLOCK |替换块信息

|-->DataTransferProtocol.OP_COPY_BLOCK |复制块信息

|-->DataTransferProtocol.OP_BLOCK_CHECKSUM |较验block信息



4.1.1 .OP_READ_BLOCK -->readBlock(DataInputStream in) |读取数据块信息

|-->首先读取block描述信息

|-->long blockId = in.readLong();

|-->Block block = new Block( blockId, 0 , in.readLong());

|-->long startOffset = in.readLong();

|-->long length = in.readLong();

|-->String clientName = Text.readString(in); |Utf-9转码读取clientName信息

|-->创建输出流

|-->OutputStream baseStream = NetUtils.getOutputStream(s,

datanode.socketWriteTimeout);

|-->DataOutputStream out = new DataOutputStream(

new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));

|-->blockSender = new BlockSender(block, startOffset, length,

true, true, false, datanode, clientTraceFmt);

|-->out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);

|-->long read = blockSender.sendBlock(out, baseStream, null); |发送block数据

|-->if (blockSender.isBlockReadFully()) |如果读取的整个块信息,则需要校验块信息

|-->datanode.blockScanner.verifiedByClient(block);

|-->datanode.myMetrics.bytesRead.inc((int) read);

|-->datanode.myMetrics.blocksRead.inc();

|-->关闭相应流信息

|-->IOUtils.closeStream(out);

|-->IOUtils.closeStream(blockSender);



4.1.1.1 sendBlock(out, baseStream, null) |读取Block信息时,发送block数据流

|-->this.throttler = throttler; |设置调节器,用于调节流速度与带宽的关系

|-->写头信息

|-->checksum.writeHeader(out);

|-->out.writeLong( offset );

|-->out.flush();

|-->设置packetSize大小

|-->int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER; |初始化设置,并根据流性质,设定大小

|-->ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);

|-->while (endOffset > offset) |循环读,直到读取完成

|-->long len = sendChunks(pktBuf, maxChunksPerPacket,

streamForSendChunks);

|-->out.writeInt(0); 设置0为标志位,读取完成

|-->return totalRead;



4.1.1.2 sendChunks() 一共分为三个部分

|-->1:较验数据

|-->设置packet头信息

|-->pkt.putInt(packetLen)

|-->pkt.putLong(offset)

|-->pkt.putLong(seqno);

|-->pkt.put((byte)

|-->pkt.putInt(len);

|-->checksumIn.readFully(buf, checksumOff, checksumLen);

|-->2:读取流信息

|-->int dataOff = checksumOff + checksumLen;

|-->IOUtils.readFully(blockIn, buf, dataOff, len); |从blockIn中读取block流信息

|-->for (int i=0; i<numChunks; i++) |针对每个checkSum的chunk块,进行较验

|-->checksum.update(buf, dOff, dLen);

|-->3:写流数据

|-->if (blockInPosition >= 0) |如果blockPosition大于0,则为socketOutputSteam流

|-->SocketOutputStream sockOut = (SocketOutputStream)out;

|-->sockOut.write(buf, 0, dataOff);

|-->sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),

blockInPosition, len);



|-->else

|-->out.write(buf, 0, dataOff + len);

|-->throttler.throttle(packetLen); |调节带宽与传输流

|-->return len; |返回读取大小







4.1.1.3 writeBlock() |写block数据流,比读要复杂,涉及到与上下datanode节点的交互

1:读取头文件信息

|-->Block block = new Block(in.readLong(),

dataXceiverServer.estimateBlockSize, in.readLong());

|-->int pipelineSize = in.readInt();

|-->boolean isRecovery = in.readBoolean();

|-->String client = Text.readString(in)

|-->boolean hasSrcDataNode = in.readBoolean()

|-->srcDataNode.readFields(in); |此时为发送命令的datanode节点,srcDataNode

|-->int numTargets = in.readInt(); |共需要传递的节点数,最后一个节点就是1

|-->DatanodeInfo targets[] = new DatanodeInfo[numTargets];

|-->for (int i = 0; i < targets.length; i++) |从流当中读取DatanodeInfo信息

|-->tmp.readFields(in);

|-->targets[i] = tmp;



2:创建输入、输出流,及socket端口

|-->mirrorOut = new DataOutputStream( |创建下一节点的输出流

new BufferedOutputStream(

NetUtils.getOutputStream(mirrorSock, writeTimeout),

SMALL_BUFFER_SIZE));

|-->mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));|创建下一节点的输入流

|-->replyOut = new DataOutputStream( |响应上一节点的输出流

NetUtils.getOutputStream(s, datanode.socketWriteTimeout));

|-->Socket mirrorSock |创建下一节点的端口号

|-->BlockReceiver blockReceiver = new BlockReceiver(block, in, |创建block接收者,并写block数据

s.getRemoteSocketAddress().toString(),

s.getLocalSocketAddress().toString(),

isRecovery, client, srcDataNode, datanode);

3.数据传递

|-->mirrorNode = targets[0].getName();

|-->mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

|-->mirrorSock = datanode.newSocket();

|-->NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); |连接到下一节点datanode的客户端

|-->写下一节点输出流的版本等信息

|-->mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );

|--> mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );

|-->mirrorOut.writeLong( block.getBlockId() );

|-->mirrorOut.writeLong( block.getGenerationStamp() );

|-->mirrorOut.writeInt( pipelineSize );

|-->mirrorOut.writeBoolean( isRecovery );

|-->Text.writeString( mirrorOut, client );

|-->mirrorOut.writeBoolean(hasSrcDataNode);

|-->srcDataNode.write(mirrorOut); |前提条件hasSrcDataNode

|-->mirrorOut.writeInt( targets.length - 1 );

|-->for ( int i = 1; i < targets.length; i++ )

|-->targets[i].write( mirrorOut );

|-->blockReceiver.writeChecksumHeader(mirrorOut); |写入检验头文件

|-->mirrorOut.flush();

|-->if (client.length() != 0)

|-->firstBadLink = Text.readString(mirrorIn); |当为client端的时候,读取ack信息



4.接收block数据及发送miorror镜像

|-->blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,

mirrorAddr, null, targets.length);

|-->datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);

|-->datanode.blockScanner.addBlock(block);



4.1:接收block信息 receiveBlock()

|-->BlockMetadataHeader.writeHeader(checksumOut, checksum);

|-->responder = new Daemon(datanode.threadGroup,

new PacketResponder(this, block, mirrIn,

replyOut, numTargets));

|-->responder.start();

|-->while (receivePacket() > 0) {} |接收流数据,写磁盘,每一次writeBlock只写一次磁盘

|-->mirrorOut.writeInt(0);

|-->((PacketResponder)responder.getRunnable()).close();

|-->if (clientName.length() == 0)

|-->block.setNumBytes(offsetInBlock);

|-->datanode.data.finalizeBlock(block);





4.2:receivePacket() |不断读packet数据至buf当中,循环至数据长度为o

|-->int payloadLen = readNextPacket(); |读取下一个packet,下述是处理和传输过程

|-->读取packet的头信息 ,然后回滚至最初位置

|-->buf.mark();

|-->buf.getInt()

|-->offsetInBlock = buf.getLong()

|-->long seqno = buf.getLong()

|-->lastPacketInBlock = (buf.get() != 0)

|-->int endOfHeader = buf.position(); |header头最后的位置

|-->buf.reset();

|-->setBlockPosition(offsetInBlock);

|-->写入下一DataNode节点镜像

|-->mirrorOut.write(buf.array(), buf.position(), buf.remaining());|整个Packet包往下传,

position和remaining确定包大小

|-->mirrorOut.flush(); |flush使之生效

|-->buf.position(endOfHeader); |从文件头处开始处理

|-->int len = buf.getInt(); |获取data的长度初始值

|-->offsetInBlock += len; |设置Block当中的offset值

|-->checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize |获取checksumLen的长度

|-->int checksumOff = buf.position(); |此时bytebuffer的初始位置已经为真实的data数据

|-->int dataOff = checksumOff + checksumLen; |data数据的存储右端值

|-->byte pktBuf[] = buf.array();

|-->buf.position(buf.limit()); |移到数据data的末尾

|-->verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); |验证chunk信息

|-->out.write(pktBuf, dataOff, len); |数据写本地磁盘

|-->验证chunk是否为packet

|-->partialCrc.update(pktBuf, dataOff, len);

|-->checksumOut.write(pktBuf, checksumOff, checksumLen);

|-->flush();

|-->checksumOut.flush()

|-->out.flush()

|-->responder.getRunnable()).enqueue(seqno,lastPacketInBlock) |用responder返回packet包

|-->throttler.throttle(payloadLen);



|-->return payloadLen;





5.关闭流及端口

|-->IOUtils.closeStream(mirrorOut);

|-->IOUtils.closeStream(mirrorIn);

|-->IOUtils.closeStream(replyOut);

|-->IOUtils.closeSocket(mirrorSock);

|-->IOUtils.closeStream(blockReceiver);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: