您的位置:首页 > 编程语言 > Java开发

jdk 源码分析(20)java NIO包简单分析

2017-08-08 22:05 330 查看
BIO 是一种阻塞的io,主要体现在:

1)accept 时候或者客户端尝试连接时是阻塞的,

2)数据读写是阻塞的,即使是没有读到数据,而且每次都是读写一个字节。

对于服务端一般系统中常用的方式是没接收一个请求new 一个thread,然后由这个handler去处理,如果请求数目过多new 的thread 将会很多,有的时候会建立一个线程池,将由线程池管理。

如果线程过多,在线程中相互切换会消耗大量的性能,而实际上这些线程并没有做什么事情。同时因为重连接到数据读取结束是一个阻塞过程,如果网络,或者服务器性能问题,会极大消耗客户端的性能去等待服务端的返回结果。

基于以上问题,nio出来了,

Buffer

nio定义了很多的Buffer,buffer是一个对象,存储需要写入或者读出的数据。底层就是一个数组。

下面是一个位的buffer。

public abstract class ByteBuffer

extends Buffer

implements Comparable<ByteBuffer>

{

//数组,数据直接放在堆区。

final byte[] hb; // Non-null only for heap buffers
final int offset;

boolean isReadOnly;                 // Valid only for heap buffers



ByteBuffer(int mark, int pos, int lim, int cap,   // package-private

byte[] hb, int offset)

{

super(mark, pos, lim, cap);

this.hb = hb;

this.offset = offset;

}

[/code]

下面是个floatBuffer的定义的数组。

final float[] hb;


其分配方法: ByteBuffer buffer = ByteBuffer.allocate(10);

public static ByteBuffer allocate(int capacity) {

if (capacity < 0)

throw new IllegalArgumentException();

return new HeapByteBuffer(capacity, capacity);

}

[/code]

直接调用,在这里new 的数组,

super(-1, 0, lim, cap, new byte[cap], 0);

[/code]

channel

channel定义了一个管道(pile),用于数据读写,双向的,可以同时读写,其 实现可以分为两大类,FileChannel(文件)和SelectorChannel(网络)。nio 会用到ServiceSocketChannel 和SocketChannel。

1)ServerSocketChannel serverChannel = ServerSocketChannel.open();

public static ServerSocketChannel open() throws IOException {

return SelectorProvider.provider().openServerSocketChannel();

}

[/code]

这个SelectorProvider.provider() 会调用

provider = sun.nio.ch.DefaultSelectorProvider.create();

[/code]

而这个就是

public static SelectorProvider create() {

return new WindowsSelectorProvider();

}

[/code]

最后openServerSocketChannel 会new 一个ServerSocketChannelImpl

new ServerSocketChannelImpl(this);

[/code]

这个会删除一个FileDescriptor

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {

super(var1);

this.fd = Net.serverSocket(true);

this.fdVal = IOUtil.fdVal(this.fd);

this.state = 0;

}

[/code]

selector

选择器(多路复用器),通过channel注册个selector ,selector去查询是是否有channel准备好数据了,如果有数据准备好了,将启动新的线程处理。一般selector会轮询方式查询。

selector底层代码参考的openjdk。selector 同步open生存,

public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}


底层是一个WindowsSelectorImpl

public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}


底层会定义一个pollWrapper 同时会启动一个管道pipe

WindowsSelectorImpl(SelectorProvider sp) throws IOException {

super(sp);

pollWrapper = new PollArrayWrapper(INIT_CAP);

wakeupPipe = Pipe.open();

wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();


// Disable the Nagle algorithm so that the wakeup is more immediate

SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();

(sink.sc).socket().setTcpNoDelay(true);

wakeupSinkFd = ((SelChImpl)sink).getFDVal();


pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

}

[/code]

selector.select();

底层主要操作

这个方法主要是查询channel是否准备好,如果准备就将准备好的channel写到publicSelectedKeys。

protected int doSelect(long timeout) throws IOException {

if (channelArray == null)

throw new ClosedSelectorException();

this.timeout = timeout; // set selector timeout

processDeregisterQueue();

if (interruptTriggered) {

resetWakeupSocket();

return 0;

}

// Calculate number of helper threads needed for poll. If necessary

// threads are created here and start waiting on startLock

adjustThreadsCount();

finishLock.reset(); // reset finishLock

// Wakeup helper threads, waiting on startLock, so they start polling.

// Redundant threads will exit here after wakeup.

startLock.startThreads();

// do polling in the main thread. Main thread is responsible for

// first MAX_SELECTABLE_FDS entries in pollArray.

try {

begin();

    try {

subSelector.poll();

} catch (IOException e) {

finishLock.setException(e); // Save this exception

}

// Main thread is out of poll(). Wakeup others and wait for them

if (threads.size() > 0)

finishLock.waitForHelperThreads();

  } finally {

end();

  }

// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

finishLock.checkForException();

processDeregisterQueue();

int updated = updateSelectedKeys();

// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

resetWakeupSocket();

return updated;

}

[/code]

updateSelectedKeys最后会调用processFDSet,将SelectionKeyImpl
放入publicSelectedKeys集合中,然后selector 通过迭代器遍历SelectionKeyImpl,因为SelectionKeyImpl包含channel信息,可以重channel读取数据。

private int processFDSet(long updateCount, int[] fds, int rOps,

boolean isExceptFds)

    {

int numKeysUpdated = 0;

for (int i = 1; i <= fds[0]; i++) {

int desc = fds[i];

if (desc == wakeupSourceFd) {

synchronized (interruptLock) {

interruptTriggered = true;

}

continue;

}

MapEntry me = fdMap.get(desc);

// If me is null, the key was deregistered in the previous

// processDeregisterQueue.

if (me == null)

continue;

SelectionKeyImpl sk = me.ski;


// The descriptor may be in the exceptfds set because there is

// OOB data queued to the socket. If there is OOB data then it

// is discarded and the key is not added to the selected set.

if (isExceptFds &&

(sk.channel() instanceof SocketChannelImpl) &&

discardUrgentData(desc))

    {

continue;

}


if (selectedKeys.contains(sk)) { // Key in selected set

if (me.clearedCount != updateCount) {

if (sk.channel.translateAndSetReadyOps(rOps, sk) &&

(me.updateCount != updateCount)) {

me.updateCount = updateCount;

numKeysUpdated++;

}

} else { // The readyOps have been set; now add

if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&

(me.updateCount != updateCount)) {

me.updateCount = updateCount;

numKeysUpdated++;

}

}

me.clearedCount = updateCount;

} else { // Key is not in selected set yet

if (me.clearedCount != updateCount) {

sk.channel.translateAndSetReadyOps(rOps, sk);

if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {

//

selectedKeys.add(sk);

me.updateCount = updateCount;

numKeysUpdated++;

}

} else { // The readyOps have been set; now add

sk.channel.translateAndUpdateReadyOps(rOps, sk);

if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {

selectedKeys.add(sk);

me.updateCount = updateCount;

numKeysUpdated++;

}

}

me.clearedCount = updateCount;

}

}

return numKeysUpdated;

}

}

[/code]

分析到此结束。

优点,采用多路复用,而且因为采用buffer机制,当读写buffer时不需要阻塞。

nio 也有缺点,因为nio需要很多代码去出去半包问题,而底层采用epoll也是有问题,这些问题在多并发是可能出现,因为这些问题,所以出现netty,netty能快速开发出稳定的通信框架,所以spark/kafka都有netty。

推荐书netty 权威指南
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  jdk 源代码
相关文章推荐