您的位置:首页 > 其它

Cassandra 源码解析 2: 文件流传输 对象序列化

2010-06-29 17:31 429 查看

org.apache.cassandra.streaming

java文件流传输

发起端:读文件,写入socket:FileChannel.transferTo SocketChannel

//FileStreamTask
SocketChannel channel = SocketChannel.open();
// force local binding on correctly specified interface.
channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
// obey the unwritten law that all nodes on a cluster must use the same storage port.
channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
//写入Comm header,标记stream socket
ByteBuffer buffer = MessagingService.constructStreamHeader(false);
channel.write(buffer);
RandomAccessFile raf = new RandomAccessFile(new File(file), "r");
FileChannel fc = raf.getChannel();
while (start < endPosition)
{
long bytesTransferred = fc.transferTo(start, CHUNK_SIZE, channel);
start += bytesTransferred;
}


接收端:读socket,写入文件:FileChannel.transferFrom

//IncomingStreamReader
FileOutputStream fos = new FileOutputStream(pendingFile.getTargetFile(), true)
FileChannel fc = fos.getChannel();
long bytesRead = 0;
while (bytesRead < pendingFile.getExpectedBytes())
{
bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
}


控制流和数据流的分离

文件的大小和个数是由前一篇中介绍的Message来传输

首先由发起端发STREAM_INITIATE Message给接收端,通知要传输的每个文件名称和大小(startPosition, endPosition)

//StreamOut.transferSSTables
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
MessagingService.instance.sendOneWay(message, target);


接收端接到STREAM_INITIATE Message后,将待传送的File添加到Context中,并注册一个StreamCompletionHandler,然后回送STREAM_INITIATE_DONE Message

//StreamInitiateVerbHandler
StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0] );
MessagingService.instance.sendOneWay(doneMessage, message.getFrom());


发送端接收到STREAM_INITIATE_DONE Message,开始第一个文件的传输(start本文刚开始的FileStreamTask)

// StreamInitiateDoneVerbHandler
StreamOutManager.get(message.getFrom()).startNext();


接收端开始接收文件(本文刚开始的IncomingStreamReader),完毕后通过前面注册的StreamCompletionHandler发送Delete或者Stream的CompletedFileStatus(异常,重发), STREAM_FINISHED Message

//IncomingStreamReader
handleStreamCompletion(remoteAddress.getAddress());
//handleStreamCompletion
IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost);
if (streamComplete != null)
streamComplete.onStreamCompletion(remoteHost, pendingFile, streamStatus);
//StreamCompletionHandler
MessagingService.instance.sendOneWay(streamStatus.makeStreamStatusMessage(), host);


发送端收到STREAM_FINISHED Message,在StreamFinishedVerbHandler中处理,如果发送成功,则删除本地文件并开始下一文件,否则重新发送

switch (streamStatus.getAction())
{
case DELETE:
StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
break;
case STREAM:
if (logger.isDebugEnabled())
logger.debug("Need to re-stream file " + streamStatus.getFile());
StreamOutManager.get(message.getFrom()).startNext();
break;
default:
break;
}


STREAM_REQUEST, StreamRequestVerbHandler

STREAM_INITIATE, StreamInitiateVerbHandler

STREAM_INITIATE_DONE, StreamInitiateDoneVerbHandler

STREAM_FINISHED, StreamFinishedVerbHandler

java对象序列化

在java中序列化经常要进行byte[]和long, int, char, string基本类型之间的相互转换,一种简洁的方法是使用DataOutputStream + ByteArrayOutputStream

ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeInt();
dos.writeUTF();
dos.writeLong();
byte[] data = dos.toByteArray();


cassandra 中的做法是直接将两者耦合成一个新的对象

/**
* An implementation of the DataOutputStream interface. This class is completely thread
* unsafe.
*/
public class DataOutputBuffer extends DataOutputStream
{
private static class Buffer extends ByteArrayOutputStream
{
public byte[] getData()
{
return buf;
}

public int getLength()
{
return count;
}

public void reset()
{
count = 0;
}

public void write(DataInput in, int len) throws IOException
{
int newcount = count + len;
if (newcount > buf.length)
{
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
in.readFully(buf, count, len);
count = newcount;
}
}

private Buffer buffer;

/** Constructs a new empty buffer. */
public DataOutputBuffer()
{
this(new Buffer());
}

private DataOutputBuffer(Buffer buffer)
{
super(buffer);
this.buffer = buffer;
}

/**
* Returns the current contents of the buffer. Data is only valid to
* {@link #getLength()}.
*/
public byte[] getData()
{
return buffer.getData();
}

/** Returns the length of the valid data currently in the buffer. */
public int getLength()
{
return buffer.getLength();
}

/** Resets the buffer to empty. */
public DataOutputBuffer reset()
{
this.written = 0;
buffer.reset();
return this;
}

/** Writes bytes from a DataInput directly into the buffer. */
public void write(DataInput in, int length) throws IOException
{
buffer.write(in, length);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐