java NIO源码分析
2018-01-21 12:32
239 查看
用一份样例代码进行分析
public static void nioServerStart() throws IOException {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
ServerSocket serverSocket = channel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8080));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int s = selector.select(1000);
if( s <= 0) {
}
else{
Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();
// LOGGER.info("keys:" + selector.selectedKeys().size());
int accepts = 0;
while(selecionKeys.hasNext()) {
SelectionKey key = selecionKeys.next();
if(key.isValid() && key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
accepts++;
}
if(key.isValid() && key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if(count >= 0) {
byte[] bytes = new byte[1024];
StringBuilder sb = new StringBuilder();
buffer.flip();
while (buffer.hasRemaining()) {
buffer.get(bytes, 0, count);
}
buffer.clear();
buffer.put("server received".getBytes());
socketChannel.write(buffer);
sb.append(bytes);
LOGGER.info(new String(bytes, 0, count));
}
else {
key.cancel();
}
}
selecionKeys.remove();
}
// LOGGER.info("accepts:" + accepts);
}
}
}
第一步中ServerSocketChannel.open()方法实例化ServerSocketChannelImpl对象,通过this.fd = Net.serverSocket(true);方法将c方法创建的socket文件描述符封装保存在fd中,源码分别是
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse, jboolean fastLoopback)
{
SOCKET s;
int domain = (preferIPv6) ? AF_INET6 : AF_INET;
s = socket(domain, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);
if (s != INVALID_SOCKET) {
SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0);
/* IPV6_V6ONLY is true by default */
if (domain == AF_INET6) {
int opt = 0;
setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
(const char *)&opt, sizeof(opt));
}
/* Disable WSAECONNRESET errors for initially unconnected UDP sockets */
if (!stream) {
setConnectionReset(s, FALSE);
}
} else {
NET_ThrowNew(env, WSAGetLastError(), "socket");
}
if (stream && fastLoopback) {
static int loopback_available = 1;
if (loopback_available) {
int rv = NET_EnableFastTcpLoopback((jint)s);
if (rv) {
if (rv == WSAEOPNOTSUPP || rv == WSAEINVAL) {
loopback_available = 0;
} else {
NET_ThrowNew(env, rv, "fastLoopback");
}
}
}
}
return (jint)s;
}
第二步 channel.register(selector, SelectionKey.OP_ACCEPT);的作用是将关注的对象保存在selector中;
第三步 int s = selector.select(1000);的关键代码是
int poll(long timeout) {
updateRegistrations();
int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);
return updated;
}
其中updateRegistrations()方法遍历selector中的关注对象,注册在kqueue中,注册的代码是
void updateRegistrations() {
synchronized (updateList) {
Update u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
register0(kq, ch.getFDVal(), u.events & Net.POLLIN, u.events & Net.POLLOUT);
}
}
}
register0的c实现代码是以下,本质是调用了kevent进行注册
JNIEXPORT void JNICALL
Java_sun_nio_ch_KQueueArrayWrapper_register0(JNIEnv *env, jobject this,
jint kq, jint fd, jint r, jint w)
{
struct kevent changes[2];
struct kevent errors[2];
struct timespec dontBlock = {0, 0};
// if (r) then { register for read } else { unregister for read }
// if (w) then { register for write } else { unregister for write }
// Ignore errors - they're probably complaints about deleting non-
// added filters - but provide an error array anyway because
// kqueue behaves erratically if some of its registrations fail.
EV_SET(&changes[0], fd, EVFILT_READ, r ? EV_ADD : EV_DELETE, 0, 0, 0);
EV_SET(&changes[1], fd, EVFILT_WRITE, w ? EV_ADD : EV_DELETE, 0, 0, 0);
kevent(kq, changes, 2, errors, 2, &dontBlock);
}
所有的关注事件注册完后调用int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);进行select操作,
JNIEXPORT jint JNICALL
Java_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq,
jlong kevAddr, jint kevCount,
jlong timeout)
{
struct kevent *kevs = (struct kevent *)jlong_to_ptr(kevAddr);
struct timespec ts;
struct timespec *tsp;
int result;
// Java timeout is in milliseconds. Convert to struct timespec.
// Java timeout == -1 : wait forever : timespec timeout of NULL
// Java timeout == 0 : return immediately : timespec timeout of zero
if (timeout >= 0) {
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (timeout % 100
4000
0) * 1000000; //nanosec = 1 million millisec
tsp = &ts;
} else {
tsp = NULL;
}
result = kevent(kq, NULL, 0, kevs, kevCount, tsp);
if (result < 0) {
if (errno == EINTR) {
// ignore EINTR, pretend nothing was selected
result = 0;
} else {
JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed");
}
}
return result;
}
select完成后的响应事件列表存放在KQueueArrayWrapper.keventArrayAddress上,本质上是一个数组指针的地址,根据这个地址取出所有的响应事件列表,取出所有的数据保存到selecor的publicSelectedKeys中,最后java代码遍历这些key,进行相应的处理。
public static void nioServerStart() throws IOException {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
ServerSocket serverSocket = channel.socket();
serverSocket.setReuseAddress(true);
serverSocket.bind(new InetSocketAddress("0.0.0.0", 8080));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int s = selector.select(1000);
if( s <= 0) {
}
else{
Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();
// LOGGER.info("keys:" + selector.selectedKeys().size());
int accepts = 0;
while(selecionKeys.hasNext()) {
SelectionKey key = selecionKeys.next();
if(key.isValid() && key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
accepts++;
}
if(key.isValid() && key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer);
if(count >= 0) {
byte[] bytes = new byte[1024];
StringBuilder sb = new StringBuilder();
buffer.flip();
while (buffer.hasRemaining()) {
buffer.get(bytes, 0, count);
}
buffer.clear();
buffer.put("server received".getBytes());
socketChannel.write(buffer);
sb.append(bytes);
LOGGER.info(new String(bytes, 0, count));
}
else {
key.cancel();
}
}
selecionKeys.remove();
}
// LOGGER.info("accepts:" + accepts);
}
}
}
第一步中ServerSocketChannel.open()方法实例化ServerSocketChannelImpl对象,通过this.fd = Net.serverSocket(true);方法将c方法创建的socket文件描述符封装保存在fd中,源码分别是
static FileDescriptor serverSocket(boolean stream) {
return IOUtil.newFD(socket0(isIPv6Available(), stream, true, fastLoopback));
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
jboolean stream, jboolean reuse, jboolean fastLoopback)
{
SOCKET s;
int domain = (preferIPv6) ? AF_INET6 : AF_INET;
s = socket(domain, (stream ? SOCK_STREAM : SOCK_DGRAM), 0);
if (s != INVALID_SOCKET) {
SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0);
/* IPV6_V6ONLY is true by default */
if (domain == AF_INET6) {
int opt = 0;
setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
(const char *)&opt, sizeof(opt));
}
/* Disable WSAECONNRESET errors for initially unconnected UDP sockets */
if (!stream) {
setConnectionReset(s, FALSE);
}
} else {
NET_ThrowNew(env, WSAGetLastError(), "socket");
}
if (stream && fastLoopback) {
static int loopback_available = 1;
if (loopback_available) {
int rv = NET_EnableFastTcpLoopback((jint)s);
if (rv) {
if (rv == WSAEOPNOTSUPP || rv == WSAEINVAL) {
loopback_available = 0;
} else {
NET_ThrowNew(env, rv, "fastLoopback");
}
}
}
}
return (jint)s;
}
第二步 channel.register(selector, SelectionKey.OP_ACCEPT);的作用是将关注的对象保存在selector中;
第三步 int s = selector.select(1000);的关键代码是
int poll(long timeout) {
updateRegistrations();
int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);
return updated;
}
其中updateRegistrations()方法遍历selector中的关注对象,注册在kqueue中,注册的代码是
void updateRegistrations() {
synchronized (updateList) {
Update u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
register0(kq, ch.getFDVal(), u.events & Net.POLLIN, u.events & Net.POLLOUT);
}
}
}
register0的c实现代码是以下,本质是调用了kevent进行注册
JNIEXPORT void JNICALL
Java_sun_nio_ch_KQueueArrayWrapper_register0(JNIEnv *env, jobject this,
jint kq, jint fd, jint r, jint w)
{
struct kevent changes[2];
struct kevent errors[2];
struct timespec dontBlock = {0, 0};
// if (r) then { register for read } else { unregister for read }
// if (w) then { register for write } else { unregister for write }
// Ignore errors - they're probably complaints about deleting non-
// added filters - but provide an error array anyway because
// kqueue behaves erratically if some of its registrations fail.
EV_SET(&changes[0], fd, EVFILT_READ, r ? EV_ADD : EV_DELETE, 0, 0, 0);
EV_SET(&changes[1], fd, EVFILT_WRITE, w ? EV_ADD : EV_DELETE, 0, 0, 0);
kevent(kq, changes, 2, errors, 2, &dontBlock);
}
所有的关注事件注册完后调用int updated = kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);进行select操作,
JNIEXPORT jint JNICALL
Java_sun_nio_ch_KQueueArrayWrapper_kevent0(JNIEnv *env, jobject this, jint kq,
jlong kevAddr, jint kevCount,
jlong timeout)
{
struct kevent *kevs = (struct kevent *)jlong_to_ptr(kevAddr);
struct timespec ts;
struct timespec *tsp;
int result;
// Java timeout is in milliseconds. Convert to struct timespec.
// Java timeout == -1 : wait forever : timespec timeout of NULL
// Java timeout == 0 : return immediately : timespec timeout of zero
if (timeout >= 0) {
ts.tv_sec = timeout / 1000;
ts.tv_nsec = (timeout % 100
4000
0) * 1000000; //nanosec = 1 million millisec
tsp = &ts;
} else {
tsp = NULL;
}
result = kevent(kq, NULL, 0, kevs, kevCount, tsp);
if (result < 0) {
if (errno == EINTR) {
// ignore EINTR, pretend nothing was selected
result = 0;
} else {
JNU_ThrowIOExceptionWithLastError(env, "KQueueArrayWrapper: kqueue failed");
}
}
return result;
}
select完成后的响应事件列表存放在KQueueArrayWrapper.keventArrayAddress上,本质上是一个数组指针的地址,根据这个地址取出所有的响应事件列表,取出所有的数据保存到selecor的publicSelectedKeys中,最后java代码遍历这些key,进行相应的处理。
相关文章推荐
- Netty 源码分析之 番外篇 Java NIO 的前生今世
- Kafka源码分析-序列3 -Producer -Java NIO(Reactor VS Peactor)
- Netty源码分析之番外篇【Java NIO的前生今世】
- Java NIO——Selector机制源码分析系列——转
- Java NIO(一) Buffer类源码分析
- Java NIO——Selector机制源码分析---转
- 深入java NIO系列之通道分析与源码解读(二)
- memcache源码分析:slab结构与初始化
- ceph源码分析--Monitor对osd report进行报down处
- 第二人生的源码分析(106)脚本的词法分析(4)
- Java GC 源码分析(2)
- lguest 三步曲之三 (源码分析)
- 优秀的轻量级网络开发框架spserver源码分析(二)
- jquery源码分析
- Netty源码分析
- Okhttp使用和源码分析一(OkHttp2.x用法)
- RxJava源码分析之subscribeOn和observeOn
- linux源码分析之字节序(4)-- little_endian.h
- 源码分析RocketMQ消息PULL-长轮询模式
- u-boot makefile分析和源码分析