您的位置:首页 > 大数据 > 人工智能

AIO原理

2016-05-05 15:48 453 查看
 Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

 java.nio.channels.AsynchronousChannel

       标记一个channel支持异步IO操作。

 java.nio.channels.AsynchronousServerSocketChannel

       ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

 java.nio.channels.AsynchronousSocketChannel

       面向流的异步socket channel,表示一个连接。

 java.nio.channels.AsynchronousChannelGroup

       异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler。AsynchronousServerSocketChannel创建的时候可以传入一个 AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的 AsynchronousSocketChannel将同属于一个组,共享资源。

 java.nio.channels.CompletionHandler

       异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,我更推荐用CompletionHandler的方式,这些handler的调用是由 AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

Java代码  


public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,  

                                                              ThreadFactory threadFactory)  

       throws IOException  

  

public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,  

                                                               int initialSize)  

  

public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)  

       throws IOException  

 

     需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

     在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reacot负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责 CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

     Reactor:  send(msg) -> 消息队列是否为空,如果为空  -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。

     Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色

 
4000
   CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

Java代码  


public interface CompletionHandler<V,A> {  

  

     void completed(V result, A attachment);  

  

    void failed(Throwable exc, A attachment);  

  

     

    void cancelled(A attachment);  

}  

 

    其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

    在初步介绍完aio引入的类和接口后,我们看看一个典型的tcp服务端是怎么启动的,怎么接受连接并处理读和写,这里引用的代码都是yanf4j 的aio分支中的代码,可以从svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

    第一步,创建一个AsynchronousServerSocketChannel,创建之前先创建一个 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以绑定一个 AsynchronousChannelGroup,那么通过这个AsynchronousServerSocketChannel建立的连接都将同属于一个AsynchronousChannelGroup并共享资源:

Java代码  


this.asynchronousChannelGroup = AsynchronousChannelGroup  

                    .withCachedThreadPool(Executors.newCachedThreadPool(),  

                            this.threadPoolSize);  

     然后初始化一个AsynchronousServerSocketChannel,通过open方法:

Java代码  


this.serverSocketChannel = AsynchronousServerSocketChannel  

                .open(this.asynchronousChannelGroup);  

 

    通过nio 2.0引入的SocketOption类设置一些TCP选项:

Java代码  


this.serverSocketChannel  

                    .setOption(  

                            StandardSocketOption.SO_REUSEADDR,true);  

this.serverSocketChannel  

                    .setOption(  

                            StandardSocketOption.SO_RCVBUF,16*1024);  

 

    绑定本地地址:

Java代码  


this.serverSocketChannel  

                    .bind(new InetSocketAddress("localhost",8080), 100);  

 

   

    其中的100用于指定等待连接的队列大小(backlog)。完了吗?还没有,最重要的监听工作还没开始,监听端口是为了等待连接上来以便accept产生一个AsynchronousSocketChannel来表示一个新建立的连接,因此需要发起一个accept调用,调用是异步的,操作系统将在连接建立后,将最后的结果——AsynchronousSocketChannel返回给你:

Java代码  


public void pendingAccept() {  

        if (this.started && this.serverSocketChannel.isOpen()) {  

            this.acceptFuture = this.serverSocketChannel.accept(null,  

                    new AcceptCompletionHandler());  

  

        } else {  

            throw new IllegalStateException("Controller has been closed");  

        }  

    }  

 

   注意,重复的accept调用将会抛出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一个参数是你想传给CompletionHandler的attchment,第二个参数就是注册的用于回调的CompletionHandler,最后返回结果Future<AsynchronousSocketChannel>。你可以对future做处理,这里采用更推荐的方式就是注册一个CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?显然一个赤裸裸的
AsynchronousSocketChannel是不够的,我们需要将它封装成session,一个session表示一个连接(mina里就叫 IoSession了),里面带了一个缓冲的消息队列以及一些其他资源等。在连接建立后,除非你的服务器只准备接受一个连接,不然你需要在后面继续调用pendingAccept来发起另一个accept请求

Java代码  


private final class AcceptCompletionHandler implements  

            CompletionHandler<AsynchronousSocketChannel, Object> {  

  

        @Override  

        public void cancelled(Object attachment) {  

            logger.warn("Accept operation was canceled");  

        }  

  

        @Override  

        public void completed(AsynchronousSocketChannel socketChannel,  

                Object attachment) {  

            try {  

                logger.debug("Accept connection from "  

                        + socketChannel.getRemoteAddress());  

                configureChannel(socketChannel);  

                AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);  

                Session session = new AioTCPSession(sessionConfig,  

                        AioTCPController.this.configuration  

                                .getSessionReadBufferSize(),  

                        AioTCPController.this.sessionTimeout);  

                session.start();  

                registerSession(session);  

            } catch (Exception e) {  

                e.printStackTrace();  

                logger.error("Accept error", e);  

                notifyException(e);  

            } finally {  

                <strong>pendingAccept</strong>();  

            }  

        }  

  

        @Override  

        public void failed(Throwable exc, Object attachment) {  

            logger.error("Accept error", exc);  

            try {  

                notifyException(exc);  

            } finally {  

                <strong>pendingAccept</strong>();  

            }  

        }  

    }  

 

  

    注意到了吧,我们在failed和completed方法中在最后都调用了pendingAccept来继续发起accept调用,等待新的连接上来。有的同学可能要说了,这样搞是不是递归调用,会不会堆栈溢出?实际上不会,因为发起accept调用的线程与CompletionHandler回调的线程并非同一个,不是一个上下文中,两者之间没有耦合关系。要注意到,CompletionHandler的回调共用的是 AsynchronousChannelGroup绑定的线程池,因此千万别在CompletionHandler回调方法中调用阻塞或者长时间的操作,例如sleep,回调方法最好能支持超时,防止线程池耗尽。

    连接建立后,怎么读和写呢?回忆下在nonblocking nio框架中,连接建立后的第一件事是干什么?注册OP_READ事件等待socket可读。异步IO也同样如此,连接建立后马上发起一个异步read调用,等待socket可读,这个是Session.start方法中所做的事情:

Java代码  


public class AioTCPSession {  

    protected void start0() {  

        pendingRead();  

    }  

  

    protected final void pendingRead() {  

        if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  

            if (!this.readBuffer.hasRemaining()) {  

                this.readBuffer = ByteBufferUtils  

                        .increaseBufferCapatity(this.readBuffer);  

            }  

            this.readFuture = this.asynchronousSocketChannel.read(  

                    this.readBuffer, this, this.readCompletionHandler);  

        } else {  

            throw new IllegalStateException(  

                    "Session Or Channel has been closed");  

        }  

    }  

     

}  

 

     AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用类似,同样是非阻塞的,返回结果也是一个Future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是 Future<Integer>,方法的第一个参数是读的缓冲区,操作系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给 CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的CompletionHandler的实现:

Java代码  


public final class ReadCompletionHandler implements  

        CompletionHandler<Integer, AbstractAioSession> {  

  

    private static final Logger log = LoggerFactory  

            .getLogger(ReadCompletionHandler.class);  

    protected final AioTCPController controller;  

  

    public ReadCompletionHandler(AioTCPController controller) {  

        this.controller = controller;  

    }  

  

    @Override  

    public void cancelled(AbstractAioSession session) {  

        log.warn("Session(" + session.getRemoteSocketAddress()  

                + ") read operation was canceled");  

    }  

  

    @Override  

    public void completed(Integer result, AbstractAioSession session) {  

        if (log.isDebugEnabled())  

            log.debug("Session(" + session.getRemoteSocketAddress()  

                    + ") read +" + result + " bytes");  

        if (result < 0) {  

            session.close();  

            return;  

        }  

        try {  

            if (result > 0) {  

                session.updateTimeStamp();  

                session.getReadBuffer().flip();  

                session.decode();  

                session.getReadBuffer().compact();  

            }  

        } finally {  

            try {  

                session.pendingRead();  

            } catch (IOException e) {  

                session.onException(e);  

                session.close();  

            }  

        }  

        controller.checkSessionTimeout();  

    }  

  

    @Override  

    public void failed(Throwable exc, AbstractAioSession session) {  

        log.error("Session read error", exc);  

        session.onException(exc);  

        session.close();  

    }  

  

}  

 

   如果IO读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:

Java代码  


if (null != this.readFuture) {  

            this.readFuture.cancel(true);  

        }  

this.asynchronousSocketChannel.close();  

 

   在读成功的情况下,我们还需要判断结果result是否小于0,如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终通过pendingRead继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行IO读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking
IO框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。

  

   IO写的操作与此类似,不过通常写的话我们会在session中关联一个缓冲队列来处理,没有完全写入或者等待写入的消息都存放在队列中,队列为空的情况下发起write调用:

Java代码  


protected void write0(WriteMessage message) {  

      boolean needWrite = false;  

      synchronized (this.writeQueue) {  

          needWrite = this.writeQueue.isEmpty();  

          this.writeQueue.offer(message);  

      }  

      if (needWrite) {  

          pendingWrite(message);  

      }  

  }  

  

  protected final void pendingWrite(WriteMessage message) {  

      message = preprocessWriteMessage(message);  

      if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  

          this.asynchronousSocketChannel.write(message.getWriteBuffer(),  

                  this, this.writeCompletionHandler);  

      } else {  

          throw new IllegalStateException(  

                  "Session Or Channel has been closed");  

      }  

  }  

 

    write调用返回的结果与read一样是一个Future<Integer>,而write的CompletionHandler处理的核心逻辑大概是这样:

Java代码  


@Override  

    public void completed(Integer result, AbstractAioSession session) {  

        if (log.isDebugEnabled())  

            log.debug("Session(" + session.getRemoteSocketAddress()  

                    + ") writen " + result + " bytes");  

                  

        WriteMessage writeMessage;  

        Queue<WriteMessage> writeQueue = session.getWriteQueue();  

        synchronized (writeQueue) {  

            writeMessage = writeQueue.peek();  

            if (writeMessage.getWriteBuffer() == null  

                    || !writeMessage.getWriteBuffer().hasRemaining()) {  

                writeQueue.remove();  

                if (writeMessage.getWriteFuture() != null) {  

                    writeMessage.getWriteFuture().setResult(Boolean.TRUE);  

                }  

                try {  

                    session.getHandler().onMessageSent(session,  

                            writeMessage.getMessage());  

                } catch (Exception e) {  

                    session.onException(e);  

                }  

                writeMessage = writeQueue.peek();  

            }  

        }  

        if (writeMessage != null) {  

            try {  

                session.pendingWrite(writeMessage);  

            } catch (IOException e) {  

                session.onException(e);  

                session.close();  

            }  

        }  

    }  

 

   compete方法中的result就是实际写入的字节数,然后我们判断消息的缓冲区是否还有剩余,如果没有就将消息从队列中移除,如果队列中还有消息,那么继续发起write调用。

   重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友可以直接check out出来看看:http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

   在引入了aio之后,java对于网络层的支持已经非常完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,由于这一切都交给了GC,因此在高性能的网络服务器上还是Cpp的天下。java这种单一堆模型比之erlang的进程内堆模型还是有差距,很难做到高效的垃圾回收和细粒度的内存管理。

   这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来说,还需要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及一定的通用性要求。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  AIO原理 异步IO