二、基于BIO、伪异步IO、NIO的Java代码实例
2017-05-16 11:13
381 查看
1、BIO代码实例
服务端:package BIODemo; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * Created by L_kanglin on 2017/5/11. */ public class TimeServer { public static void main(String[] args) throws IOException { int port=9030; if(args !=null && args.length >0){ port=Integer.valueOf(args[0]); } ServerSocket server=null; try{ server= new ServerSocket(port); System.out.println("The time server is start in port: "+port); Socket socket=null; while(true){ //当没有客户端连接时,程序会阻塞在accept这里,当有客户端访问时,就会创建新的线程去重新执行。 socket=server.accept(); //每有一个客户端访问,就添加一个线程执行 new Thread(new TimeServerHandler(socket)).start(); } }finally{ if(server !=null){ System.out.println("The time server close"); server.close(); server=null; } } } }
package BIODemo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; /** * Created by L_kanglin on 2017/5/12. */ public class TimeServerHandler implements Runnable { private Socket socket; public TimeServerHandler(Socket socket) { this.socket=socket; } @Override public void run() { BufferedReader in=null; PrintWriter out=null; try { //获取输入流 in= new BufferedReader(new InputStreamReader(this.socket.getInputStream())); //构造输出流 out= new PrintWriter(this.socket.getOutputStream(),true); String currentTime=null; String body=null; Scanner sc= new Scanner(System.in); while(true){ while(sc.hasNext()){ body=in.readLine(); if(body==null){break;} System.out.println("server receive : "+body); String str=sc.nextLine(); out.println(str); // System.out.println("the server send: "+str); } /*currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date (System.currentTimeMillis()).toString():"BAD ORDER"; out.println(currentTime);*/ } } catch(Exception e){ if(in != null){ try{ in.close(); }catch (IOException el) { el.printStackTrace(); } } if(out !=null){ out.close(); out=null; } if(this.socket !=null){ try { this.socket.close(); } catch (IOException e1) { e1.printStackTrace(); } this.socket=null; } } } }
客户端:
package BIODemo; import java.io.BufferedReader; import java.io.IOException; import java.io.In 4000 putStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.nio.Buffer; /** * Created by L_kanglin on 2017/5/12. */ public class TimeClient { public static void main(String[] args) throws IOException { int port = 9030; if(args !=null && args.length>0){ port =Integer.valueOf(args[0]); } Socket socket = null; BufferedReader in =null; PrintWriter out=null; while(true){ socket = new Socket("127.0.0.1",port); in= new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println("QUERY TIME ORDER"); //输出给服务端 System.out.println("Send order 2 server succeed."); String resp=in.readLine(); //获取输入流 System.out.println("NOW is: "+resp); } /*catch (Exception e) { }finally{ //client自行依次关闭in、out和socket,否则会造成内存泄漏 if(out!=null){ out.close(); out=null; } if(in !=null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(socket !=null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket=null; } }*/ } }
2、伪异步IO
服务端:package BBIODemo; import BIODemo.TimeServerHandler; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * Created by L_kanglin on 2017/5/12. * @description 首先创建一个时间服务器处理类的线程池,当接收到新的客户端连接的时候, * 将请求socket封装成一个Task,然后调用线程池的execute方法执行,从而避免了每个请求接 * 入都创建一个新的线程。 */ public class TimeServer { public static void main(String[] args) throws IOException { int port=9030; if(args !=null && args.length >0){ port=Integer.valueOf(args[0]); } ServerSocket server=null; try { server = new ServerSocket(port); System.out.println("The time server is start in port: "+port); Socket socket = null; TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool (50,10000); //创建IO任务线程池 while(true){ socket = server.accept(); //由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,它都不会 //导致线程个数过于膨胀或者内存溢出,相比于传统的一连接一线程模型,是一种改良 singleExecutor.execute(new TimeServerHandler(socket)); } } finally{ if(server !=null){ System.out.println("The time server close"); server.close(); server=null; } } } }
package BBIODemo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by L_kanglin on 2017/5/12. */ public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){ executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() ,maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize)); } public void execute(Runnable task){ executor.execute(task); } }
客户端:
package BBIODemo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * Created by L_kanglin on 2017/5/12. * 客户端通过socket创建,发送查询时间服务器的“QUERY TIME ORDER”指令,然后 * 读取服务端的响应并将结果打印出来,随后关闭连接,释放资源,程序退出执行。 */ public class TimeClient { public static void main(String[] args) throws IOException { int port = 9030; if(args !=null && args.length>0){ port =Integer.valueOf(args[0]); } Socket socket = null; BufferedReader in =null; PrintWriter out=null; try{ socket = new Socket("127.0.0.1",port); in= new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println("QUERY TIME ORDER"); //输出给服务端 System.out.println("Send order 2 server succeed."); String resp=in.readLine(); //获取输入流 System.out.println("NOW is: "+resp); } catch (Exception e) { }finally{ if(out!=null){ out.close(); out=null; } if(in !=null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } in = null; } if(socket !=null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket=null; } } } }
3、NIO代码实例
服务端:package NIODemo; /** * Created by L_kanglin on 2017/5/13. */ public class TimeServer { public static void main(String[] args) { int port=9030; if (args !=null && args.length>0){ port=Integer.valueOf(args[0]); } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); //创建Reactor线程,并启动线程 new Thread(timeServer,"NIO-MultiplexerTimerServer-001").start(); } }
package NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * Created by L_kanglin on 2017/5/13. */ public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; //初始化多路复用器、绑定监听端口 public MultiplexerTimeServer(int port){ try { //创建多路复用器,并开启 selector = Selector.open(); //打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接 //的父管道 servChannel = ServerSocketChannel.open(); //绑定监听端口,设置连接为非阻塞模式 servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port),1024); //将ServerSocketChannel注册到Reactor线程的多路复用器Selector上, //监听ACCEPT事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port: "+port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop(){ this.stop=true; } @Override public void run() { while(!stop){ try { //多路复用器在线程run方法的无限循环体内轮询准备就绪的Key selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it=selectionKeys.iterator(); SelectionKey key=null; while(it.hasNext()){ key=it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key !=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } //多路复用器关闭后,所有注册在上面的Channel和 // Pipe等资源都会被自动去注册并关闭,所以不需 // 要重复释放资源 if(selector !=null) try{ selector.close(); }catch(IOException e){ e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()){ //处理新接入的请求消息 ServerSocketChannel ssc= (ServerSocketChannel) key.channel(); //多路复用器监听到有新的客户端接入,处理新的接入请求, //完成TCP三次握手,建立物理链路 SocketChannel sc= ssc.accept(); //设置客户端链路为非阻塞模式 sc.configureBlocking(false); //将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作, //用来读取客户端发送的网络消息 sc.register(selector,SelectionKey.OP_READ); } if(key.isReadable()){ //read the data SocketChannel sc= (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); //异步读取客户端请求消息到缓冲区 int readBytes = sc.read(readBuffer); if(readBytes>0){ readBuffer.flip(); byte[] bytes=new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body=new String(bytes,"UTF-8"); System.out.println("the time server receive order: "+body); String currentTime ="QUERY TIME ORDER".equalsIgnoreCase(body)? new java. util.Date(System.currentTimeMillis()).toString():"BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes<0){ //对端链路关闭 key.cancel(); sc.close(); }else{ ; //读到0字节,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if(response!=null && response.trim().length() >0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
客户端:
package NIODemo; import BIODemo.TimeServerHandler; import java.io.IOException; /** * Created by L_kanglin on 2017/5/15. */ public class TimeClient { public static void main(String[] args) throws IOException { int port = 9030; if (args != null && args.length > 0) { port = Integer.valueOf(args[0]); } new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start(); } }
package NIODemo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * Created by L_kanglin on 2017/5/15. */ public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host,int port){ this.host =host==null?"127.0.0.1":host; this.port=port; try { selector=Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while(!stop){ try { selector.select(1000); Set<SelectionKey> selectedkeys = selector.selectedKeys(); Iterator<SelectionKey> it =selectedkeys.iterator(); SelectionKey key=null; while(it.hasNext()){ key =it.next(); it.remove(); handleInput(key); if(key !=null){ key.cancel(); if(key.channel()!=null) key.channel().close(); } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } //多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去 //注册并关闭,所以不需要重复释放资源 if(selector !=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if(key.isValid()){ //判断是否连接成功 SocketChannel sc = (SocketChannel) key.channel(); if(key.isConnectable()){ if(sc.finishConnect()){ sc.register(selector,SelectionKey.OP_READ); doWrite(sc); }else{ System.exit(1); //连接失败,进程退出 } } if(key.isReadable()){ ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if(readBytes>0){ readBuffer.flip(); byte[] bytes=new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body=new String(bytes,"UTF-8"); System.out.println("Now is : "+body); this.stop=true; }else if(readBytes<0){ //对端链路关闭 key.cancel(); sc.close(); }else{ ; //读到0字节,忽略 } } } } private void doConnect() throws IOException { //如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if(socketChannel.connect(new InetSocketAddress(host,port))){ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{ socketChannel.register(selector,SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { byte[] req ="QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer=ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if(!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } }
相关文章推荐
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 【转】Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)