[align=center][/align]
[align=left]1.[讨论]使用java nio的selector做一个数据查询主机[/align]
| [align=right]Copy to clipboard[/align]
|
[align=left]Posted by: koji Posted on: 2003-10-29 12:40
之前跟米花大大讨论过的东西,想说跟大家分享讨论一下 最底下有图 名字可能取的有点不太好
基本上我是为了学校的东西必须做p2p的集权式主机 负责管理节目表,p2p client都必须去跟这台主机要求数据 所以写这个程序测试 会使用selector的原因在于 不希望有太多thread产生,外加想试试看nio 所以我希望固定thread数去处理多个connection
当使用者要查询节目时,每个connection都只会做查询数据库的动作 遇到最大问题是 selector在等待connection发生动作时,是block住的 所以当他在等待的时候无法接受其它connection的regist
下面这是javadoc的说明 -- A thread blocked in one of the select() or select(long) methods may be interrupted by some other thread in one of three ways:
*
By invoking the selector's wakeup method, *
By invoking the selector's close method, or *
By invoking the blocked thread's interrupt method, in which case its interrupt status will be set and the selector's wakeup method will be invoked. -- 所以后来就跟米花兄请教 最后就使用queue去塞要regist的connection
底下是我的code 不知这种设计方式怎样 请多多指教~
ServerWindow [/align]
[align=left]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66[/align]
| [align=left]public class ServerWindow[/align] [align=left]{[/align] [align=left] static Logger log;[/align] [align=left] public static int PORT_NUM = 28888;[/align] [align=left] public WorkLine wline;[/align] [align=left] [/align] [align=left] static [/align] [align=left] {[/align] [align=left] log = Logger.getLogger(p2p.server.ServerWindow.class);[/align] [align=left] }[/align] [align=left] public ServerWindow()[/align] [align=left] {[/align] [align=left] }[/align] [align=left] [/align] [align=left] public static void main(String args[])[/align] [align=left] throws Exception[/align] [align=left] {[/align] [align=left] (new ServerWindow()).go();[/align] [align=left] }[/align] [align=left] [/align] [align=left] public void go()[/align] [align=left] throws Exception[/align] [align=left] {[/align] [align=left] wline = new WorkLine();[/align] [align=left] int i = PORT_NUM;[/align] [align=left] log.info("Server is Listening on port: " + i);[/align] [align=left] ServerSocketChannel serversocketchannel = ServerSocketChannel.open();[/align] [align=left] ServerSocket serversocket = serversocketchannel.socket();[/align] [align=left] serversocket.bind(new InetSocketAddress(i));[/align] [align=left] serversocketchannel.configureBlocking(false);[/align] [align=left] Selector selector = Selector.open();[/align] [align=left] serversocketchannel.register(selector, SelectionKey.OP_ACCEPT);[/align] [align=left] try[/align] [align=left] {[/align] [align=left] while(true)[/align] [align=left] {[/align] [align=left] int n = selector.select();[/align] [align=left] if(n == 0)[/align] [align=left] {[/align] [align=left] continue;[/align] [align=left] }[/align] [align=left] Iterator iterator = selector.selectedKeys().iterator();[/align] [align=left] while(iterator.hasNext()) [/align] [align=left] {[/align] [align=left] SelectionKey selectionkey = (SelectionKey)iterator.next();[/align] [align=left] if(selectionkey.isAcceptable())[/align] [align=left] {[/align] [align=left] ServerSocketChannel serversocketchannel1 = (ServerSocketChannel)selectionkey.channel();[/align] [align=left] java.nio.channels.SocketChannel socketchannel = serversocketchannel1.accept();[/align] [align=left] wline.enQueue(socketchannel);[/align] [align=left] }[/align] [align=left] if(selectionkey.isReadable())[/align] [align=left] {[/align] [align=left] log.info("wont happened!!");[/align] [align=left] }[/align] [align=left] iterator.remove();[/align] [align=left] }[/align] [align=left] }[/align] [align=left] }[/align] [align=left] catch(IOException ioexception)[/align] [align=left] {[/align] [align=left] log.error(ioexception);[/align] [align=left] return;[/align] [align=left] }[/align] [align=left] }[/align] [align=left]}[/align]
| [align=left]
WorkerThread [/align]
[align=left]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144[/align]
| [align=left]public class WorkerThread extends Thread implements Comparable, Serializable{[/align] [align=left] [/align] [align=left] static Logger log = Logger.getLogger(p2p.server.WorkerThread.class);[/align] [align=left] private WorkLine wline;[/align] [align=left] private Selector selector;[/align] [align=left] private boolean READY;[/align] [align=left] [/align] [align=left] public WorkerThread(String name, WorkLine wline){[/align] [align=left] try{[/align] [align=left] this.setName(name);[/align] [align=left] this.setWorkLine(wline);[/align] [align=left] this.setReady(false);[/align] [align=left] selector = Selector.open();[/align] [align=left] }catch(Exception e){[/align] [align=left] log.error(e);[/align] [align=left] }[/align] [align=left] }[/align] [align=left] [/align] [align=left] public void setReady( boolean ready ){[/align] [align=left] this.READY = ready;[/align] [align=left] }[/align] [align=left] [/align] [align=left] public void setWorkLine(WorkLine wline){[/align] [align=left] this.wline = wline;[/align] [align=left] }[/align] [align=left] [/align] [align=left] public void registChannel( ) throws Exception{[/align] [align=left] this.setReady(true);[/align] [align=left] selector.wakeup();[/align] [align=left] }[/align] [align=left] [/align] [align=left] public int getSize(){[/align] [align=left] [/align] [align=left] return selector.keys().size();[/align] [align=left] [/align] [align=left] }[/align] [align=left] [/align] [align=left] public boolean equals(Object o){[/align] [align=left] return false;[/align] [align=left] }[/align] [align=left] [/align] [align=left] public void run(){[/align] [align=left] [/align] [align=left] Iterator iterator;[/align] [align=left] SelectableChannel channel;[/align] [align=left] try{[/align] [align=left] while( true )[/align] [align=left] { [/align] [align=left] [/align] [align=left] if(READY == true){[/align] [align=left] channel = (SelectableChannel)wline.getQueue().remove();[/align] [align=left] if(channel != null){[/align] [align=left] channel.configureBlocking(false);[/align] [align=left] channel.register(selector, SelectionKey.OP_READ);[/align] [align=left] }else{[/align] [align=left] log.info("Wont happened!! channel is null");[/align] [align=left] //System.out.println("Wont happened!!");[/align] [align=left] }[/align] [align=left] }[/align] [align=left] READY = false;[/align] [align=left] [/align] [align=left] int n = selector.select();[/align] [align=left] if( n == 0){[/align] [align=left] continue;[/align] [align=left] }[/align] [align=left] Iterator it = selector.selectedKeys().iterator();[/align] [align=left] while( it.hasNext() )[/align] [align=left] {[/align] [align=left] [/align] [align=left] SelectionKey key = (SelectionKey) it.next();[/align] [align=left] it.remove();[/align] [align=left] [/align] [align=left] [/align] [align=left] if( key.isAcceptable() ){[/align] [align=left] //wont happened [/align] [align=left] System.out.println("wont happened");[/align] [align=left] log.info("wont happened!! channels are all ReadOPS ");[/align] [align=left] }[/align] [align=left] if(key.isReadable()){[/align] [align=left] //receive search command[/align] [align=left] log.info("Got the message!! Server is working!!");[/align] [align=left] readDataFromSocket(key);[/align] [align=left] }[/align] [align=left] key = null;[/align] [align=left] }[/align] [align=left] }[/align] [align=left] }[/align] [align=left] [/align] [align=left] catch(Exception e){[/align] [align=left] log.error(e);[/align] [align=left] } [/align] [align=left] }[/align] [align=left] protected void readDataFromSocket(SelectionKey key) throws Exception[/align] [align=left] {[/align] [align=left] ByteBuffer buffer = ByteBuffer.allocate(100);[/align] [align=left] try{[/align] [align=left] SocketChannel socketChannel = (SocketChannel) key.channel( );[/align] [align=left] ((SocketChannel)key.channel()).socket().setKeepAlive(true);[/align] [align=left] int count;[/align] [align=left] buffer.clear( ); // Empty buffer[/align] [align=left] //Loop while data is available; channel is nonblocking[/align] [align=left] while ((count = socketChannel.read (buffer)) > 0) {[/align] [align=left] buffer.flip( ); // Make buffer readable[/align] [align=left] while (buffer.hasRemaining( )) {[/align] [align=left] Charset charset = Charset.forName("ISO-8859-1");[/align] [align=left] CharsetDecoder decoder = charset.newDecoder();[/align] [align=left] CharBuffer charBuffer = decoder.decode(buffer);[/align] [align=left] //System.out.println( charBuffer.toString() );[/align] [align=left] log.info( "Message-->" + charBuffer.toString() );[/align] [align=left] }[/align] [align=left] }[/align] [align=left] [/align] [align=left] if((count = socketChannel.read (buffer)) < 0){[/align] [align=left] key.cancel();[/align] [align=left] key.channel().close();[/align] [align=left] log.info("connection closed naturally!!");[/align] [align=left] //System.out.println("connection closed nature!!");[/align] [align=left] }[/align] [align=left] }catch(Exception e){[/align] [align=left] log.info("connection closed immediately!!");[/align] [align=left] //System.out.println("connection closed!!");[/align] [align=left] key.cancel();[/align] [align=left] key.channel().close();[/align] [align=left] }finally{[/align] [align=left] buffer.clear( );[/align] [align=left] }[/align] [align=left] }[/align] [align=left] [/align] [align=left] public int compareTo(Object o){[/align] [align=left] WorkerThread temp =(WorkerThread)o; [/align] [align=left] if(this.getSize() > temp.getSize())[/align] [align=left] return 1;[/align] [align=left] else if(this.getSize() < temp.getSize()){[/align] [align=left] return -1;[/align] [align=left] }else{[/align] [align=left] return 1;[/align] [align=left] }[/align] [align=left] }[/align] [align=left] [/align] [align=left] public WorkLine getWorkLine() {[/align] [align=left] return wline;[/align] [align=left] }[/align] [align=left] [/align] [align=left]}[/align]
| [align=left]
ChannelQueue [/align]
[align=left]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30[/align]
| [align=left]public class ChannelQueue[/align] [align=left]{[/align] [align=left] private LinkedList list;[/align] [align=left] [/align] [align=left] public ChannelQueue()[/align] [align=left] {[/align] [align=left] list = new LinkedList();[/align] [align=left] }[/align] [align=left] [/align] [align=left] public synchronized void add(Object obj)[/align] [align=left] {[/align] [align=left] list.add(obj);[/align] [align=left] }[/align] [align=left] [/align] [align=left] public synchronized int size()[/align] [align=left] {[/align] [align=left] return list.size();[/align] [align=left] }[/align] [align=left] [/align] [align=left] public synchronized Object remove()[/align] [align=left] {[/align] [align=left] Object obj = list.removeFirst();[/align] [align=left] if(obj != null)[/align] [align=left] return obj;[/align] [align=left] else[/align] [align=left] return null;[/align] [align=left] }[/align] [align=left] [/align] [align=left] [/align] [align=left]}[/align]
| [align=left]
WorkLine [/align]
[align=left]1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33[/align]
| [align=left]public class WorkLine[/align] [align=left]{[/align] [align=left] [/align] [align=left] List set;[/align] [align=left] ChannelQueue queue;[/align] [align=left] [/align] [align=left] public WorkLine()[/align] [align=left] {[/align] [align=left] set = new ArrayList();[/align] [align=left] queue = new ChannelQueue();[/align] [align=left] for(int i = 0; i < 10; i++)[/align] [align=left] {[/align] [align=left] WorkerThread worker = new WorkerThread("Thread:" + i, this);[/align] [align=left] worker.start();[/align] [align=left] set.add(worker);[/align] [align=left] }[/align] [align=left] [/align] [align=left] }[/align] [align=left] [/align] [align=left] public void enQueue(Object obj)[/align] [align=left] throws Exception[/align] [align=left] {[/align] [align=left] Collections.sort(set);[/align] [align=left] queue.add(obj);[/align] [align=left] ((WorkerThread)set.get(0)).registChannel();[/align] [align=left] }[/align] [align=left] [/align] [align=left] public ChannelQueue getQueue()[/align] [align=left] {[/align] [align=left] return queue;[/align] [align=left] }[/align] [align=left] [/align] [align=left]}[/align]
| [align=left] [/align]
|
[align=center][/align]
[align=left]2.Re:[讨论]使用java nio的selector做一个数据查询主机 [Re: koji][/align]
| [align=right]Copy to clipboard[/align]
|
[align=left]Posted by: linexpmail Posted on: 2003-10-29 15:13
推荐 NIO 的反应器样式,可以实作来比较看看:
http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
分配责任清楚,过去实做过单执行绪版本,效能不错。
个人觉得反应器样式难在观察何时踩煞车,建议将 socketChannel.read (buffer) > 0 抽出实作 boolean inputIsComplete() { /* ... */ } 会比较清楚 责任所在。
实际上不同平台底层实作不同,有的环境会传回读取到 0 byte 的情形,抽出个 method 也好观察测试。
实做测试发现效能瓶颈是 CharsetDecoder,改成直接读 byte 会让你的反应器更猛。除非瓶颈已到,建议还是先实作单执行绪, 让反应器好好跑一跑看看。 [/align]
|
[align=center][/align]
[align=left]3.Re:[讨论]使用java nio的selector做一个数据查询主机 [Re: koji][/align]
| [align=right]Copy to clipboard[/align]
|
[align=left]Posted by: koji Posted on: 2003-10-30 13:49
hmm 因为我是预设希望有几百个到千个联机 会去使用主机搜寻节目表 所以用单执行绪怕会让反应时间过长 多执行绪的话想说可以稍微平均一下反应时间
CharsetDecoder自己没测试过效能,感谢
用boolean inputIsComplete() { /* ... */ }会比较清楚 但是就像你给的pdf一样会变成我要把处理数据整个拉出来做对巴 下次改版看看
koji [/align]
|
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理