您的位置:首页 > 其它

kafka客户端源码解读(client-network包)

2017-07-24 18:37 369 查看

kafka-network层 send类源码解读

概述

send类:实现将消息(byte),通过channel发送到目标broker集群。

主要的类:接口send类,实现类ByteBufferSend implements Send ,继承类NetworkSend extends ByteBufferSend。

主要关联类:channel类。

源码分析

send接口类:
public interface Send {
String destination();
boolean completed();
long writeTo(GatheringByteChannel channel) throws IOException;
long size();
}


destination()–发送的目标集群地址。

completed()–判断是否完成发送。

writeTo(GatheringByteChannel channel)–将字节流写入到channel中。这里channel是生产者端-broker端的链路。将在channle篇章中分析。

size()—用来计算的buffer的字节大小。为什么需要计算发送字节大小,在继承类中NetworkSend中会解释下kafka消息体设计模式。

实现类ByteBufferSend

首先看构造方法:

public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
this.size = remaining;
}


构造方法完成参数初始化,可以看到size是bytebuffer的大小。

public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
}


从源码可以看出kafka消息最底层发送源码逻辑很简单。传送一个channel,写入buffer。这里kafka进行了解耦,send类和channel类,通过接口在writeTo()方法关联。具体实现类如何,相互不干扰。

pending = TransportLayers.hasPendingWrites(channel);这段源码从官方文档解释是用在阻塞channel中。具体判断条件是 return ((TransportLayer) channel).hasPendingWrites();

看方法的实现, @Override public boolean hasPendingWrites() { return netWriteBuffer.hasRemaining() },从中可以看出,有个buffer对象会记录传输的数据。在channel篇章中,研究其细节。

继承类:NetworkSend

看构造方法:

public NetworkSend(String destination, ByteBuffer buffer) {

super(destination, sizeDelimit(buffer));

}

参数destination是broker地址。buffer是需要传输的消息。构造方法中调用了sizeDelimit()方法。

private static ByteBuffer[] sizeDelimit(ByteBuffer buffer) {

return new ByteBuffer[] {sizeBuffer(buffer.remaining()), buffer};

}

sizeDelimit()方法中调用了sizeBuffer(),生成了一个新的buffer,buffer内容是参数buffer大小。源码如下:

private static ByteBuffer sizeBuffer(int size) {

ByteBuffer sizeBuffer = ByteBuffer.allocate(4);

sizeBuffer.putInt(size);

sizeBuffer.rewind();

return sizeBuffer;

}

从中可以看出构造方法最终传进去的super(buffer1+buffer2,)其中buffer2是我们发送到broker端消息,buffer1是一个4字节的大小的数字,其含义是消息buffer2的大小。从这里可以看出,kafka字节流的设计原则包含两个部分:

buffer=buffer1+buffer2

buffer1内容为buffer2.size

buffer2为消息。

最终在broker读取字节流时候,先去取出头四个字节,感知接下来需要读取多少字节。然后读取。

这三个类构成了send基本组成。高级应用通过创建NetworkSend方法,传入消息byte,调用其writeTo()方法,写入对应目标channel中。底层实现细节不用关心。

总结:send接口及其实现,一个目的是将消息buffer进行传输前进行处理,给消息加上四个字节的buffer,存储消息大小。也说明了kafka一次最大传输字节是有限定的。

同时与channel组合,松耦合,减少代码重复。直接传入一个channel对象即可完成消息发送。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kakfa 源码 Send