Cassandra 源码解析 2: 文件流传输 对象序列化
2010-06-29 17:31
429 查看
org.apache.cassandra.streaming
java文件流传输发起端:读文件,写入socket:FileChannel.transferTo SocketChannel
//FileStreamTask SocketChannel channel = SocketChannel.open(); // force local binding on correctly specified interface. channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0)); // obey the unwritten law that all nodes on a cluster must use the same storage port. channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort())); //写入Comm header,标记stream socket ByteBuffer buffer = MessagingService.constructStreamHeader(false); channel.write(buffer); RandomAccessFile raf = new RandomAccessFile(new File(file), "r"); FileChannel fc = raf.getChannel(); while (start < endPosition) { long bytesTransferred = fc.transferTo(start, CHUNK_SIZE, channel); start += bytesTransferred; }
接收端:读socket,写入文件:FileChannel.transferFrom
//IncomingStreamReader FileOutputStream fos = new FileOutputStream(pendingFile.getTargetFile(), true) FileChannel fc = fos.getChannel(); long bytesRead = 0; while (bytesRead < pendingFile.getExpectedBytes()) { bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE); }
控制流和数据流的分离
文件的大小和个数是由前一篇中介绍的Message来传输首先由发起端发STREAM_INITIATE Message给接收端,通知要传输的每个文件名称和大小(startPosition, endPosition)
//StreamOut.transferSSTables StreamOutManager.get(target).addFilesToStream(pendingFiles); StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles); Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage); message.setHeader(StreamOut.TABLE_NAME, table.getBytes()); MessagingService.instance.sendOneWay(message, target);
接收端接到STREAM_INITIATE Message后,将待传送的File添加到Context中,并注册一个StreamCompletionHandler,然后回送STREAM_INITIATE_DONE Message
//StreamInitiateVerbHandler StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler()); Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] ); MessagingService.instance.sendOneWay(doneMessage, message.getFrom());
发送端接收到STREAM_INITIATE_DONE Message,开始第一个文件的传输(start本文刚开始的FileStreamTask)
// StreamInitiateDoneVerbHandler StreamOutManager.get(message.getFrom()).startNext();
接收端开始接收文件(本文刚开始的IncomingStreamReader),完毕后通过前面注册的StreamCompletionHandler发送Delete或者Stream的CompletedFileStatus(异常,重发), STREAM_FINISHED Message
//IncomingStreamReader handleStreamCompletion(remoteAddress.getAddress()); //handleStreamCompletion IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost); if (streamComplete != null) streamComplete.onStreamCompletion(remoteHost, pendingFile, streamStatus); //StreamCompletionHandler MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);
发送端收到STREAM_FINISHED Message,在StreamFinishedVerbHandler中处理,如果发送成功,则删除本地文件并开始下一文件,否则重新发送
switch (streamStatus.getAction()) { case DELETE: StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile()); break; case STREAM: if (logger.isDebugEnabled()) logger.debug("Need to re-stream file " + streamStatus.getFile()); StreamOutManager.get(message.getFrom()).startNext(); break; default: break; }
STREAM_REQUEST, StreamRequestVerbHandler
STREAM_INITIATE, StreamInitiateVerbHandler
STREAM_INITIATE_DONE, StreamInitiateDoneVerbHandler
STREAM_FINISHED, StreamFinishedVerbHandler
java对象序列化
在java中序列化经常要进行byte[]和long, int, char, string基本类型之间的相互转换,一种简洁的方法是使用DataOutputStream + ByteArrayOutputStreamByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeInt(); dos.writeUTF(); dos.writeLong(); byte[] data = dos.toByteArray();
cassandra 中的做法是直接将两者耦合成一个新的对象
/** * An implementation of the DataOutputStream interface. This class is completely thread * unsafe. */ public class DataOutputBuffer extends DataOutputStream { private static class Buffer extends ByteArrayOutputStream { public byte[] getData() { return buf; } public int getLength() { return count; } public void reset() { count = 0; } public void write(DataInput in, int len) throws IOException { int newcount = count + len; if (newcount > buf.length) { byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)]; System.arraycopy(buf, 0, newbuf, 0, count); buf = newbuf; } in.readFully(buf, count, len); count = newcount; } } private Buffer buffer; /** Constructs a new empty buffer. */ public DataOutputBuffer() { this(new Buffer()); } private DataOutputBuffer(Buffer buffer) { super(buffer); this.buffer = buffer; } /** * Returns the current contents of the buffer. Data is only valid to * {@link #getLength()}. */ public byte[] getData() { return buffer.getData(); } /** Returns the length of the valid data currently in the buffer. */ public int getLength() { return buffer.getLength(); } /** Resets the buffer to empty. */ public DataOutputBuffer reset() { this.written = 0; buffer.reset(); return this; } /** Writes bytes from a DataInput directly into the buffer. */ public void write(DataInput in, int length) throws IOException { buffer.write(in, length); } }
相关文章推荐
- JAVA 源码浅解析之为什么对象序列化要实现Serializable接口
- jquery 1.7.2源码解析(二)构造jquery对象
- spring boot 集成 activeMQ 传输序列化对象
- JQuery源码之“对象的结构解析”
- js 将json字符串转换为json对象的方法解析 作者: 字体:[增加 减小] 类型:转载 将json字符串转换为json对象的方法。在数据传输过程中,json是以文本,即字符串的形式传递的,而J
- netty 对象序列化传输示例
- underscore.js源码解析【对象】
- 把Javascript对象序列化后作为参数传输
- Java 对象序列化详解以及实例实现和源码下载
- 对象序列化在客户端和服务端传输实例
- 使用mina2对象传输数据是,传输的数据序列化错误与解决方案
- 第一天:Java源码级实战速成(通过动手实战类、对象等,通过Spark和Hadoop案例代码和源码解析具体指知识的应用、深度详解匿名接口在Spark开发中的运用)
- jquery源码解析:jQuery延迟对象Deferred(工具方法)详解2
- 常见对象_Arrays工具类的源码解析
- Java对象的XML序列化与反序列化实例解析
- Android -- 创建XML文件对象及其序列化, pull解析XML文件
- JSON对象序列化传输格式设置
- .Net网络通讯编程[利用Socket实现字串、文件、序列化对象传输]--使用封装的网络服务2[使用IE浏览本页]
- Java笔记(7)-输入、输出流、File类、InputStream、Reader、文件字节流、文件字符流、缓冲流、随机流、数据流、对象流、序列化和对象克隆、Scanner解析文件、Console流
- Java 对象序列化 NIO NIO2详细介绍及解析