您的位置:首页 > Web前端 > React

Netty源码学习-Java-NIO-Reactor

2014-05-05 00:27 441 查看
Netty里面采用了NIO-based Reactor Pattern

了解这个模式对学习Netty非常有帮助

参考以下两篇文章:

http://jeewanthad.blogspot.com/2013/02/reactor-pattern-explained-part-1.html

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

本文所贴的代码来自第一篇文章,在注释部分加入了我自己的理解

完整代码可以到我的github上下载,仅供参考:

https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/reactor

Java代码


package com.ljn.reactor;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;

/*

单线程的实现

Server端用一个Selector利用一个线程(在main方法里面start)来响应所有请求

1.当ACCEPT事件就绪,Acceptor被选中,执行它的run方法:创建一个Handler(例如为handlerA),并将Handler的interestOps初始为READ

2.当READ事件就绪,handlerA被选中,执行它的run方法:它根据自身的当前状态,来执行读或写操作

因此,每一个Client连接过来,Server就创建一个Handler,但都所有操作都在一个线程里面

Selection Key Channel Handler Interested Operation

------------------------------------------------------------------------

SelectionKey 0 ServerSocketChannel Acceptor Accept

SelectionKey 1 SocketChannel 1 Handler 1 Read and Write

SelectionKey 2 SocketChannel 2 Handler 2 Read and Write

SelectionKey 3 SocketChannel 3 Handler 3 Read and Write

如果采用多个selector,那就是所谓的“Multiple Reactor Threads”,大体思路如下:

Selector[] selectors; // also create threads

int next = 0;

class Acceptor { // ...

public synchronized void run() { ...

Socket connection = serverSocket.accept();

if (connection != null)

new Handler(selectors[next], connection);

if (++next == selectors.length) next = 0;

}

}

*/

public class Reactor implements Runnable {

final Selector selector;

final ServerSocketChannel serverSocketChannel;

final boolean isWithThreadPool;

/*Reactor的主要工作:

* 1.给ServerSocketChannel设置一个Acceptor,接收请求

* 2.给每一个一个SocketChannel(代表一个Client)关联一个Handler

* 要注意其实Acceptor也是一个Handler(只是与它关联的channel是ServerSocketChannel而不是SocketChannel)

*/

Reactor(int port, boolean isWithThreadPool) throws IOException {

this.isWithThreadPool = isWithThreadPool;

selector = Selector.open();

serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(port));

serverSocketChannel.configureBlocking(false);

SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

selectionKey0.attach(new Acceptor());

}

public void run() {

System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort());

try {

while (!Thread.interrupted()) {

int readySelectionKeyCount = selector.select();

if (readySelectionKeyCount == 0) {

continue;

}

Set<SelectionKey> selected = selector.selectedKeys();

Iterator<SelectionKey> it = selected.iterator();

while (it.hasNext()) {

dispatch((SelectionKey) (it.next()));

}

//不会自动remove,因此要手动清;下次事件到来会自动添加

selected.clear();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

//从SelectionKey中取出Handler并执行Handler的run方法,没有创建新线程

void dispatch(SelectionKey k) {

Runnable r = (Runnable) (k.attachment());

if (r != null) {

r.run();

}

}

//主要工作是为每一个连接成功后返回的SocketChannel关联一个Handler,详见Handler的构造函数

class Acceptor implements Runnable {

public void run() {

try {

SocketChannel socketChannel = serverSocketChannel.accept();

if (socketChannel != null) {

if (isWithThreadPool)

new HandlerWithThreadPool(selector, socketChannel);

else

new Handler(selector, socketChannel);

}

System.out.println("Connection Accepted by Reactor2");

} catch (IOException ex) {

ex.printStackTrace();

}

}

}

public static void main(String[] args) throws IOException{

int port = 9900;

boolean withThreadPool = false;

Reactor reactor = new Reactor(port, withThreadPool);

new Thread(reactor).start();

}

}

Java代码


package com.ljn.reactor;

import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

/*

* 单线程版本的Handler

*/

public class Handler implements Runnable {

final SocketChannel socketChannel;

final SelectionKey selectionKey;

ByteBuffer input = ByteBuffer.allocate(1024);

static final int READING = 0, SENDING = 1;

//初始状态

int state = READING;

String clientName = "";

//在handler里面设置interestOps,而且这个interestOps是会随着事件的进行而改变的

Handler(Selector selector, SocketChannel c) throws IOException {

socketChannel = c;

c.configureBlocking(false);

selectionKey = socketChannel.register(selector, 0);

/*

handler作为SellectionKey的attachment。这样,handler就与SelectionKey也就是interestOps对应起来了

反过来说,当interestOps发生、SelectionKey被选中时,就能从SelectionKey中取得handler

*/

selectionKey.attach(this);

selectionKey.interestOps(SelectionKey.OP_READ);

selector.wakeup();

}

//在Reactor的dispatch方法里面被调用,但是直接的方法调用,没有创建新线程

public void run() {

try {

if (state == READING) {

read();

} else if (state == SENDING) {

send();

}

} catch (IOException ex) {

ex.printStackTrace();

}

}

void read() throws IOException {

int readCount = socketChannel.read(input);

if (readCount > 0) {

readProcess(readCount);

}

state = SENDING;

// Interested in writing

selectionKey.interestOps(SelectionKey.OP_WRITE);

}

/**

* Processing of the read message. This only prints the message to stdOut.

* 非IO操作(业务逻辑,实际应用中可能会非常耗时):将Client发过来的信息(clientName)转成字符串形式

* @param readCount

*/

synchronized void readProcess(int readCount) {

StringBuilder sb = new StringBuilder();

input.flip(); //from writing mode to reading mode

byte[] subStringBytes = new byte[readCount];

byte[] array = input.array();

System.arraycopy(array, 0, subStringBytes, 0, readCount);

// Assuming ASCII (bad assumption but simplifies the example)

sb.append(new String(subStringBytes));

input.clear();

clientName = sb.toString().trim();

}

void send() throws IOException {

System.out.println("Saying hello to " + clientName);

ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());

socketChannel.write(output);

selectionKey.interestOps(SelectionKey.OP_READ);

state = READING;

}

}

Java代码


package com.ljn.reactor;

import java.io.IOException;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

/*

* 多线程版本的Handler

* 思路就是把耗时的操作(非IO操作)放到其他线程里面跑,

* 使得Handler只专注与Channel之间的IO操作;

* Handler快速地从Channel中读或写,可以使Channel及时地、更快地响应其他请求

* 耗时的操作完成后,产生一个事件(改变state),再“通知”(由Handler轮询这个状态是否有改变)

* Handler执行Channel的读写操作

*/

public class HandlerWithThreadPool extends Handler {

static ExecutorService pool = Executors.newFixedThreadPool(2);

static final int PROCESSING = 2;

public HandlerWithThreadPool(Selector sel, SocketChannel c) throws IOException {

super(sel, c);

}

//Handler从SocketChannel中读到数据后,把“数据的处理”这个工作扔到线程池里面执行

void read() throws IOException {

int readCount = socketChannel.read(input);

if (readCount > 0) {

state = PROCESSING;

//execute是非阻塞的,所以要新增一个state(PROCESSING),表示数据在处理当中,Handler还不能执行send操作

pool.execute(new Processer(readCount));

}

//We are interested in writing back to the client soon after read processing is done.

//这时候虽然设置了OP_WRITE,但下一次本Handler被选中时不会执行send()方法,因为state=PROCESSING

//或者可以把这个设置放到Processer里面,等process完成后再设为OP_WRITE

selectionKey.interestOps(SelectionKey.OP_WRITE);

}

//Start processing in a new Processer Thread and Hand off to the reactor thread.

synchronized void processAndHandOff(int readCount) {

readProcess(readCount);

//Read processing done. Now the server is ready to send a message to the client.

state = SENDING;

}

class Processer implements Runnable {

int readCount;

Processer(int readCount) {

this.readCount = readCount;

}

public void run() {

processAndHandOff(readCount);

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: