您的位置:首页 > 编程语言 > Java开发

Java NIO 总结与示例

2016-08-14 21:03 323 查看
Java NIO是New IO的简称,它是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制。

Java NIO中涉及的基础内容有通道(Channel)和缓冲区(Buffer)、文件IO和网络IO。有关通道、缓冲区以及文件IO在这里不打算进行详细的介绍。这里参考《实战Java高并发程序设计》利用NIO实现一个Echo服务器的服务端与客户端。

在看完Echo服务器实现之后,发现使用NIO进行网络编程跟Linux中的epoll模型是非常类似的。同样是将Channel注册到Selector上,并且说明感兴趣的事件。注册之后调用selector.select()进行阻塞等待。而对于epoll来说,需要使用

epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

进行事件的注册,使用

epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

来进行事件阻塞等待。epoll在Linux中是一种IO复用技术,感觉NIO对于Java的作用是类似的,一个Selector可以监听多个Channel上的事件,有一个事件(可能有多个)触发时,则进行相应。

对于NIO中的socket来说,感兴趣的事件有以下几类,在SelectionKey中定义:

    public static final int OP_READ = 1
<< 0;
    public static final int OP_WRITE= 1
<< 2;
    public static final int OP_CONNECT=
1 << 3;
    public
static final
int OP_ACCEPT
=1 << 4;

在Java中使用Socket进行编程的过程跟在Linux上类似,这里大体总结一下:

服务端:

1、使用静态工厂产生一个Selector实例,

    private Selectorselector =null;
    selector = SelectorProvider.provider().openSelector();

2、使用静态工厂产生一个SerSocketChannel,也就是一个服务端的通道

       ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.configureBlocking(false);

3、将上面的通道(Channel)绑定(bind)到服务器的地址。

       InetSocketAddressinet SocketAddress = new InetSocketAddress("127.0.0.1",8001);
       serverSocketChannel.socket().bind(inetSocketAddress);

4、将服务端Channel注册到Selector,并说明感兴趣的事件。这里的SelectionKey就关联了对应的Channel和Selector。

       SelectionKey acceptKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

5、服务端等待事件发生(这个过程在一个无限while循环中),这个过程是阻塞的。

      int readyEventNum =selector.select();

6、有事件发生了,获得所有的通道的SelectionKey,进行遍历检查,检查主要是检查这个SelectionKey是对什么事件感兴趣,不同事件有不同的处理。

           Set<SelectionKey> readyKeys = selector.selectedKeys();
           Iterator<SelectionKey> iterator = readyKeys.iterator();
           while(iterator.hasNext()){
              SelectionKeyreadyKey=iterator.next();
              //将正在处理的实例移除,否则就会重复处理相同的SelectionKey
              iterator.remove();
               //TODO 检查并处理
               ……
           }//endwhile
客户端编程类似,大家直接看代码吧。

服务端代码:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NioEchoServer {

//selector用于处理每一个网络连接,一个服务端程序使用一个selector实例就够了
//这个实例使用静态工厂生成
private Selector selector = null;

//用于对每一个客户端进行相应的处理,每一个请求都会委托给线程池中的线程进行实际处理
private ExecutorService pool = Executors.newCachedThreadPool();

//统计服务器线程在一个客户端花费了多少时间
public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240);

private void startServer() throws IOException{

//服务套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);

//一个socket地址。
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1",8001);

//UnknownHostException
serverSocketChannel.socket().bind(inetSocketAddress);

//通过工厂方法获得一个Selector对象的实例
selector = SelectorProvider.provider().openSelector();

/**
* 注意每向selector注册一个channel就返回一个SelectionKey实例
* 一个SelectionKey表示一个SelectableChannel到一个Selector的注册
* 一个Selector可以管理多个SelectableChannel, SocketChannel是SelectableChannel的一个子类
*/
//这里注册感兴趣的事件为 Accept
SelectionKey acceptKey =  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

for(;;){

//这个select方法是一个阻塞方法,如果没有任何数据准备好,他就会等待,一旦有数据可读
//他就会返回,它的返回值是已经准备就绪的SelectionKey数量。
int readyEventNum = selector.select();

//获取那些准备好的SelectionKey。因为可能多个Chennel已经准备好
Set<SelectionKey> readyKeys = selector.selectedKeys();

Iterator<SelectionKey> iterator = readyKeys.iterator();

while(iterator.hasNext()){

SelectionKey readyKey = iterator.next();

//将正在处理的实例移除,否则就会重复处理相同的SelectionKey
iterator.remove();

if(readyKey.isAcceptable()){
doAccept(readyKey);
}else if(readyKey.isValid() && readyKey.isReadable()){
if(!time_stat.containsKey(((SocketChannel)readyKey.channel()).socket())){
time_stat.put(((SocketChannel)readyKey.channel()).socket(), System.currentTimeMillis());
doRead(readyKey);
}
}else if(readyKey.isValid() && readyKey.isWritable()){
doWrite(readyKey);
long end = System.currentTimeMillis();
long begin = time_stat.remove(((SocketChannel)readyKey.channel()).socket());
System.out.println("spend: "+(end-begin)+" ms");
}
}//end while

}//end for
}

private void doAccept(SelectionKey selectionKey){
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel clientChannel = null;

try{
//生成的clientChannel表示和新的客户端通信的通道
clientChannel = server.accept();
clientChannel.configureBlocking(false);

//Register this channel for reading,将新生成的Channel注册到selector选择器上,并告诉Selector,
//我现在对读OP_READ操作很感兴趣。这样当Selector发现这个Channel已经准备好读时,就能给线程一个通知
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);

//Allocate an EchoClient instance and attach it to this selection key.
//将一个客户端实例作为附件,附加到这个连接的SelectionKey上,这样在连接的处理过程中
//我们都可以共享这个EchoClient实例
EchoClient echoClient = new EchoClient();
clientKey.attach(echoClient);

InetAddress clientAddress = clientChannel.socket().getInetAddress();
System.out.println("accepted connection from "+ clientAddress.getHostAddress());
}catch(Exception e){
System.out.println("failed to accept new client.");
e.printStackTrace();
}
}

//读完了要
private void doRead(SelectionKey selectionKey){
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);

int len;
try{
len = channel.read(bb);
if(len<0){
disconnect(selectionKey);
return;
}
}catch(Exception e){
System.out.println("Failed to read from client.");
e.printStackTrace();
disconnect(selectionKey);
return;
}
bb.flip();
pool.execute(new HandleMsg(selectionKey,bb));
}

private void doWrite(SelectionKey sk){
SocketChannel channel = (SocketChannel) sk.channel();
EchoClient echoClient = (EchoClient) sk.attachment();

LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();
ByteBuffer byteBuffer = outq.getLast();

try{
int len = channel.write(byteBuffer);

if(len==-1){
disconnect(sk);
return;
}
if(byteBuffer.remaining()==0){
outq.removeLast();
}
}catch(Exception e){
System.out.println("Failed to write to client");
e.printStackTrace();
disconnect(sk);
}

if(outq.size()==0){
sk.interestOps(SelectionKey.OP_READ);
}

}

private void disconnect(SelectionKey sk){
try {
sk.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}

//启动一个新的线程任务来处理消息
class HandleMsg implements Runnable{
SelectionKey selectionKey;
ByteBuffer byteBuffer;
public HandleMsg(SelectionKey sk,ByteBuffer bb){
this.byteBuffer = bb;
this.selectionKey = sk;
}
@Override
public void run(){
EchoClient echoClient = (EchoClient)selectionKey.attachment();
echoClient.enqueue(byteBuffer);

//从此对写也很感兴趣,那这个通道空了就可以写了?
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);

//强迫selector立即返回
selector.wakeup();
}
}

public static void main(String[] args) throws IOException {
NioEchoServer server = new NioEchoServer();
server.startServer();
}

/**
* 一个EchoClient代表一个客户端,保存所有收到的ByteBuffer缓冲
* @version 创建时间:2016年8月14日 下午7:27:51
*/
class EchoClient {

private LinkedList<ByteBuffer> outq = null;

EchoClient(){
outq = new LinkedList<ByteBuffer>();
}
public LinkedList<ByteBuffer> getOutputQueue(){
return outq;
}
public void enqueue(ByteBuffer bb){
outq.addFirst(bb);
}
}

}


客户端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;

/**
* 用Nio实现EchoServer的客户端
* @version 创建时间:2016年8月14日 下午5:17:26
*/
public class ClientNio {

//同样是一个客户端使用一个selector实例,这个实例也是使用静态工厂生成的
private Selector selector = null;

public void init(String ip,int port) throws IOException{

//使用静态工厂产生一个channel
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
this.selector = SelectorProvider.provider().openSelector();

//将这个通道连接到服务端
channel.connect(new InetSocketAddress(ip,port));

//这个通道暂时只对 "读" 感兴趣
channel.register(selector, SelectionKey.OP_CONNECT);
}

public void working()throws IOException{
while(true){
if(!selector.isOpen())
break;

//阻塞,直到有事件发生
selector.select();

//获取所有发生的时间,并进行遍历
Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();

if(key.isConnectable()){  //连接事件
connect(key);
}else if(key.isReadable()){  //读事件
read(key);
}
}
}
}

public void connect(SelectionKey selectionKey) throws IOException{
SocketChannel channel = (SocketChannel)selectionKey.channel();

//如果正在连接,则完成连接
if(channel.isConnectionPending()){
channel.finishConnect();
}

channel.configureBlocking(false);

//这里进行了一系列转换:String -> byte[] -> ByteBuffer
//调用的方法依次是 String.getBytes() ByteBuffer.wrap()
channel.write(ByteBuffer.wrap(new String("hello server.\r\n").getBytes()));

//从此这个channel对读感兴趣
channel.register(this.selector, SelectionKey.OP_READ);
}

public void read(SelectionKey key) throws IOException{
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer);

byte[] data = buffer.array();
String msg = new String(data).trim();

System.out.println("client receive: "+msg);
channel.close();
key.selector().close();
}

public static void main(String[] args) {
ClientNio client = new ClientNio();
try {
client.init("127.0.0.1", 8001);
client.working();
} catch (IOException e) {
e.printStackTrace();
}
}
}


参考《实战Java高并发程序设计》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: