您的位置:首页 > 其它

多线程非阻塞服务器设计

2009-12-17 20:00 162 查看
接着上篇日志写。。一般在设计服务器时,都是非阻塞的,且为了简单,通常都设计为一个线程来操作。
但是这样设计的缺点也很明显,倘若服务器有很多连接,那么每次循环都会处理很多套接字,除了CPU使用率不高外,如果某个套接字的数据传输速度很慢,那么他的调用也会很慢(个人猜测,因为TCP传输速度是双向均衡的),从而势必会影响其他套接字的数据传输。
所以非阻塞+多线程是大型服务器必备的解决方案。
多线程操作同一套接字需要考虑的问题比较复杂,我在http://blog.csdn.net/shallwake/archive/2009/12/15/5014160.aspx有讨论,原则就是:尽量不要多线程同时recv();绝对不能多线程send();可以用两个线程,一个recv(),一个send()。原因简单点就是虽然套接字是线程安全,但是多线程会有发包错乱。
所以,对于服务器,只要保证每个线程操作的套接字间没有交集就行,当然,此交集只是逻辑上没有交集,比如,某个线程操作A套接字send(),另一个线程负责操作A套接字recv(),那么他们还是逻辑上没有交集的,原因上面已讲述。
剩下就是数据结构的设计,个人总结有2种。
一,动态创建线程:
就是规定某个线程处理N个套接字,当总套接字超过此值就另外再创建线程处理。
二,预先分配线程:
就是先分配N个线程,然后随着套接字的增长,将他们均匀地分配给每个线程。
难点:套接字是动态变化的,增加的时候比较好办,但是若是减少,那么方法一与方法二就有难度差距了。
对于方法一:当某个线程负责处理的套接字减少,小于N时,那么下次增加套接字时不能再开线程,要继续利用它,若是套接字减少为0,那么此线程就自己销毁了。所以整个动态过程的维护时比较复杂的,自己只是想会,没试验过。
对于方法二:我下面提供解决方案。
 
方法二的实现:
首先感谢kasicass GG在上篇文章中的指点~~,今天下午按照他的思路用java简单尝试了下
思路还是比较简单,开启5个线程负责异步读取客户端信息,当然,若是没有套接字处理,线程不能退出,所以每个线程都用到了BlockingQueue,而且维护了一个变量sockCount记录自己当前处理的套接字数量。当有连接时,主线程选择sockCount最少的那个线程负责处理该连接,就OK了,没有方法一动态维护那么复杂。
嗯,懒人大招,帖代码了。。。
Server类
import java.nio.channels.*;
import java.util.*;
import java.net.*;
import java.io.*;

public class Server extends Thread{

private ServerSocketChannel sSockChan;

private Selector selector;

private ArrayList readers;

public Server(){
readers = new ArrayList();
for(int i = 0; i < 5; i ++){
EventReader er = new EventReader(this);
er.start();
readers.add(er);
}

initServerSocket();
}

public int getIndex(){
int min = 999999;
int pos = 0;
for(int i = 0; i < 5; i ++){
if(min >= readers.get(i).getSocketCount()){
min = readers.get(i).getSocketCount();
pos = i;
}
}
return pos;
}

private void initServerSocket() {
try {
// open a non-blocking server socket channel
sSockChan = ServerSocketChannel.open();
sSockChan.configureBlocking(false);

// bind to localhost on designated port
InetAddress addr = InetAddress.getLocalHost();
System.out.println("binding to address: " + addr.getHostAddress());
sSockChan.socket().bind(new InetSocketAddress(addr, 8550));

// get a selector
selector = Selector.open();

// register the channel with the selector to handle accepts
SelectionKey acceptKey = sSockChan.register(selector, SelectionKey.OP_ACCEPT);
}
catch (Exception e) {
System.out.println("error initializing ServerSocket");
System.exit(1);
}
}

@Override
public void run(){

while(true){

try {
// blocking select, will return when we get a new connection
selector.select();

// fetch the keys
Set readyKeys = selector.selectedKeys();

// run through the keys and process
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();

ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = ssChannel.accept();

// 这里是重点。
readers.get(getIndex()).addNewClient(clientChannel,getIndex());
System.out.println("got connection from: " + clientChannel.socket().getInetAddress());
}
}
catch (IOException ioe) {
System.out.println("error during serverSocket select(): " + ioe.getMessage());
}
catch (Exception e) {
System.out.println("exception in run()");
}
}
}

public static void main(String[] args) {
Server server = new Server();
server.start();
}
}


EventReader类(就是每个线程)


import java.nio.*;
import java.nio.channels.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class EventReader extends Thread{
private Server sv;

private Selector selector;

private BlockingQueue newClients;

private int scoketCount = 0;

public int getSocketCount(){
return scoketCount;
}

public EventReader(Server sv){
this.sv = sv;
newClients = new LinkedBlockingQueue();
}

public void addNewClient(SocketChannel clientChannel, int id) {
try {
scoketCount++;
newClients.put(clientChannel);
selector.wakeup();
System.out.println("I'm in thread" + id);
} catch (InterruptedException ex) {
Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
}
}

@Override
public void run(){
try {
selector = Selector.open();

while(true){
read();
checkNewConnections();

try { Thread.sleep(30); } catch (InterruptedException e) {}
}
} catch (IOException ex) {
Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
}

}

private void read(){
try {
selector.select();
Set readyKeys = selector.selectedKeys();

Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();

long nbytes = channel.read(buffer);

if(nbytes == -1){
System.out.println("channel has closed");
scoketCount--;
channel.close();
}
System.out.println(buffer.array());

//注意,只是作为简单演示,并未考虑如何读取完整数据包,读多少输出多少然后复原。
buffer.clear();
}
} catch (IOException ex) {
Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
}
}

private void checkNewConnections(){
try {
SocketChannel clientChannel = (SocketChannel) newClients.take();
clientChannel.configureBlocking( false);
clientChannel.register( selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
} catch (InterruptedException ex) {
Logger.getLogger(EventReader.class.getName()).log(Level.SEVERE, null, ex);
}
catch(IOException ex){

}
}
}


Over。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: