您的位置:首页 > 编程语言 > Java开发

java NIO源码分析

2018-01-21 12:32 239 查看
用一份样例代码进行分析

public static void nioServerStart() throws IOException  {

        ServerSocketChannel channel = ServerSocketChannel.open();

        channel.configureBlocking(false);

        ServerSocket serverSocket = channel.socket();

        serverSocket.setReuseAddress(true);

        serverSocket.bind(new InetSocketAddress("0.0.0.0", 8080));

        Selector selector = Selector.open();

        channel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {

            int s = selector.select(1000);

            if( s <= 0) {

            }

            else{

                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

//                LOGGER.info("keys:" + selector.selectedKeys().size());

                int accepts = 0;

                while(selecionKeys.hasNext()) {

                    SelectionKey key = selecionKeys.next();

                    if(key.isValid() && key.isAcceptable()) {

                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

                        SocketChannel socketChannel = serverSocketChannel.accept();

                        socketChannel.configureBlocking(false);

                        socketChannel.register(selector, SelectionKey.OP_READ);

                        accepts++;

                    }

                    if(key.isValid() && key.isReadable()) {

                        SocketChannel socketChannel = (SocketChannel) key.channel();

                        ByteBuffer buffer = ByteBuffer.allocate(1024);

                        int count = socketChannel.read(buffer);

                        if(count >= 0) {

                            byte[] bytes = new byte[1024];

                            StringBuilder sb = new StringBuilder();

                            buffer.flip();

                            while (buffer.hasRemaining()) {

                                buffer.get(bytes, 0, count);

                            }

                            buffer.clear();

                            buffer.put("server received".getBytes());

                            socketChannel.write(buffer);

                            sb.append(bytes);

                            LOGGER.info(new String(bytes, 0, count));

                        }

                        else {

                            key.cancel();

                        }

                    }

                    selecionKeys.remove();

                }

//                LOGGER.info("accepts:" + accepts);

            }

        }
    }

第一步中ServerSocketChannel.open()方法实例化ServerSocketChannelImpl对象,通过this.fd =  Net.serverSocket(true);方法将c方法创建的socket文件描述符封装保存在fd中,源码分别是

static FileDescriptor serverSocket(boolean stream) {

        return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));

    }

JNIEXPORT jint JNICALL

Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,

                            jboolean stream, jboolean reuse, jboolean fastLoopback)

{

    SOCKET s;

    int domain = (preferIPv6) ? AF_INET6 : AF_INET;

    s = socket(domain, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);

    if (s != INVALID_SOCKET) {

        SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0);

        /* IPV6_V6ONLY is true by default */

        if (domain == AF_INET6) {

            int opt = 0;

            setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,

                       (const char *)&opt, sizeof(opt));

        }

        /* Disable WSAECONNRESET errors for initially unconnected UDP sockets */

        if (!stream) {

            setConnectionReset(s, FALSE);

        }

    } else {

        NET_ThrowNew(env, WSAGetLastError(), "socket");

    }

    if (stream && fastLoopback) {

        static int loopback_available = 1;

        if (loopback_available) {

            int rv = NET_EnableFastTcpLoopback((jint)s);

            if (rv) {

                if (rv == WSAEOPNOTSUPP || rv == WSAEINVAL) {

                    loopback_available = 0;

                } else {

                    NET_ThrowNew(env, rv, "fastLoopback");

                }

            }

        }

    }

    return (jint)s;

}

第二步   channel.register(selector, SelectionKey.OP_ACCEPT);的作用是将关注的对象保存在selector中;

第三步   int s = selector.select(1000);的关键代码是

int poll(long timeout) {

        updateRegistrations();

        int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);

        return updated;

    }

其中updateRegistrations()方法遍历selector中的关注对象,注册在kqueue中,注册的代码是

void updateRegistrations() {

        synchronized (updateList) {

            Update u = null;

            while ((u = updateList.poll()) != null) {

                SelChImpl ch = u.channel;

                if (!ch.isOpen())

                    continue;

                register0(kq, ch.getFDVal(), u.events & Net.POLLIN, u.events & Net.POLLOUT);

            }

        }

    }

register0的c实现代码是以下,本质是调用了kevent进行注册

JNIEXPORT void JNICALL

Java_sun_nio_ch_KQueueArrayWrapper_register0(JNIEnv *env, jobject this,

                                             jint kq, jint fd, jint r, jint w)

{

    struct kevent changes[2];

    struct kevent errors[2];

    struct timespec dontBlock = {0, 0};

    // if (r) then { register for read } else { unregister for read }

    // if (w) then { register for write } else { unregister for write }

    // Ignore errors - they're probably complaints about deleting non-

    //   added filters - but provide an error array anyway because

    //   kqueue behaves erratically if some of its registrations fail.

    EV_SET(&changes[0], fd, EVFILT_READ,  r ? EV_ADD : EV_DELETE, 0, 0, 0);

    EV_SET(&changes[1], fd, EVFILT_WRITE, w ? EV_ADD : EV_DELETE, 0, 0, 0);

    kevent(kq, changes, 2, errors, 2, &dontBlock);

}

所有的关注事件注册完后调用int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);进行select操作,

JNIEXPORT jint JNICALL

Java_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq,

                                           jlong kevAddr, jint kevCount,

                                           jlong timeout)

{

    struct kevent *kevs = (struct kevent *)jlong_to_ptr(kevAddr);

    struct timespec ts;

    struct timespec *tsp;

    int result;

    // Java timeout is in milliseconds. Convert to struct timespec.

    // Java timeout == -1 : wait forever : timespec timeout of NULL

    // Java timeout == 0  : return immediately : timespec timeout of zero

    if (timeout >= 0) {

        ts.tv_sec = timeout / 1000;

        ts.tv_nsec = (timeout % 100
4000
0) * 1000000; //nanosec = 1 million millisec

        tsp = &ts;

    } else {

        tsp = NULL;

    }

    result = kevent(kq, NULL, 0, kevs, kevCount, tsp);

    if (result < 0) {

        if (errno == EINTR) {

            // ignore EINTR, pretend nothing was selected

            result = 0;

        } else {

            JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed");

        }

    }

    return result;

}

select完成后的响应事件列表存放在KQueueArrayWrapper.keventArrayAddress上,本质上是一个数组指针的地址,根据这个地址取出所有的响应事件列表,取出所有的数据保存到selecor的publicSelectedKeys中,最后java代码遍历这些key,进行相应的处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: