JAVA Socket编程学习5--IO模型
2017-11-08 22:36
330 查看
我们首先模拟一个比较糟糕的TCP客户端:
BIO,即为Blocking I/O,阻塞IO,大致流程为
1)服务端建立ServerSocket,以一个端口启动,
2)等待客户端建立socket连接,如果没有连接,一直阻塞(等待),3)一个socket建立连接之后,从线程池中取一个线程取处理socket
对于这种模型的总结:
缺点:如果请求量过大,线程池不够用,那么会严重影响性能。
目前tomcat I/O模型默认还是BIO,对于连接不大的,该模型代码编写简单,只需要关注线程内的连接即可
/127.0.0.1:49505 connect!
/127.0.0.1:49504 connect!
/127.0.0.1:49506 connect!
/127.0.0.1:49508 connect!
/127.0.0.1:49507 connect!
(在这里停顿了一会儿)
spend:6003ms
spend:6003ms
spend:6003ms
spend:6003ms
spend:6003ms
客户端输出:
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
NIO:JDK1.4及以后的版本非阻塞IO
即是Non Blocking I/O,非阻塞IO,jdk1.4之后提供了一套专门的api专门操作非阻塞IO,接口以及类定义在java.nio包
NIO API由四个主要的部分组成:缓冲区(Buffers)、通道(Channels)、选择器(Selector)是其核心组成类。
NIO的工作大致流程为:
1、通道注册一个监听到事件处理器
2、有事件发生时,事件处理器会通知相应的通道处理
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
(在这里停顿了一会儿)
spend:0ms
spend:1ms
spend:2ms
spend:1ms
spend:1ms
客户端输出:
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
代码中的主要流程为:
1、open ServerSocketChannel,configureBlocking false,bind host and port
2、open Selector
3、ServerSocketChannel register on Selector
4、有客户端连接的事件发生,事件处理器通知ServerSocketChannel去处理
对这一模型的总结:
NIO本身是基于事件驱动思想来完成的
NIO基于Selector,当有感兴趣的事件发生时,就通知对应的事件处理器去处理事件,如果没有,则不处理。当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。所以使用一个线程做轮询就可以了
Buffer,也是NIO的一个新特性,可以块状的读/写数据,效率得到极大的提高。
JDK1.7之后,AIO异步非阻塞IO
AIO,即是Asynchronous I/O,异步非阻塞I/O
JDK1.7之后,也叫作AIO,工作方式是异步非阻塞
AIO主要工作流程为:
客户端发起一个IO调用
服务端接受IO之后,异步回调接收成功后的IO,不会阻挡当前主流程,主流程继续接受下一个请求
注意:后来我想用AIO来实现UDP的发送与接收,但是却做不到,这是我自己发的一个帖子http://bbs.csdn.net/topics/392282605,哎,也没人回答,还得自问自答。。。
网上说是有AsynchronousDatagramChannel可以,但为什么我在jdk1.7.0_25根本就没有找到这个类啊。看到这么个连接http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6993126,我理解的意思是AsynchronousDatagramChannel在jdk1.7中已经被移除了,在jdk1.8中再观察观察情况再做决定。我看了jdk1.8也没有找到这个类啊,难道也被移除了?不知道我的理解是否正确。
补充:一开始模拟的客户端是发送TCP数据的,后来我想将这个代码改为发送UDP数据的,可是达不到那样的效果(无法用PrintWriter写数据和BufferedReader读数据,只能用DatagramSocket类的send和receive方法来发送和接收数据,所以也就无法发送一些数据和睡眠一定时间再发送。不知道我理解还是方法有问题,如果有知道的一起讨论下)。下面是我修改后的代码:
参考:http://blog.csdn.net/wanghang88/article/details/51922117
推荐:可以浏览下该博主http://blog.csdn.net/column/details/sys-communication.html?&page=3的专栏,其中对io通信模型该系列的第1-5篇文章写得就很不错
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; public class HeavyTCPClient { private static ExecutorService tp = Executors.newCachedThreadPool(); private static final int sleep_time = 1000*1000*1000; public static class EchoClient implements Runnable{ public void run(){ Socket client = null; PrintWriter writer = null; BufferedReader reader = null; try { client = new Socket(); client.connect(new InetSocketAddress("localhost", 7788)); writer = new PrintWriter(client.getOutputStream(), true); writer.print("H"); LockSupport.parkNanos(sleep_time); writer.print("e"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("l"); LockSupport.parkNanos(sleep_time); writer.print("o"); LockSupport.parkNanos(sleep_time); writer.print("!"); LockSupport.parkNanos(sleep_time); writer.println(); writer.flush(); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); System.out.println("from server: " + reader.readLine()); reader.close(); writer.close(); client.close(); } catch (UnknownHostException e){ e.printStackTrace(); } catch (IOException e){ e.printStackTrace(); } finally { try { if (writer != null) writer.close(); if (reader != null) reader.close(); if (client != null) client.close(); } catch (IOException e){ } } } } public static void main(String args[]) throws IOException { EchoClient ec = new EchoClient(); for(int i=0;i<5;i++) tp.execute(ec); // tp.shutdown(); } }BIO:JDK1.4之前的阻塞IO
BIO,即为Blocking I/O,阻塞IO,大致流程为
1)服务端建立ServerSocket,以一个端口启动,
2)等待客户端建立socket连接,如果没有连接,一直阻塞(等待),3)一个socket建立连接之后,从线程池中取一个线程取处理socket
对于这种模型的总结:
缺点:如果请求量过大,线程池不够用,那么会严重影响性能。
目前tomcat I/O模型默认还是BIO,对于连接不大的,该模型代码编写简单,只需要关注线程内的连接即可
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiThreaEchoServer { private static ExecutorService tp = Executors.newCachedThreadPool(); static class HandleMsg implements Runnable{ Socket clientSocket; public HandleMsg(Socket clientSocket){ this.clientSocket = clientSocket; } @Override public void run() { BufferedReader is = null; PrintWriter os = null; try{ is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); os = new PrintWriter(clientSocket.getOutputStream(), true); //从InputStream中读取客户端所发送的数据 String inputLine = null; long b = System.currentTimeMillis(); while((inputLine = is.readLine()) != null){ os.println(inputLine); } long e = System.currentTimeMillis(); System.out.println("spend:"+(e-b)+"ms"); } catch (IOException e){ e.printStackTrace(); }finally{ try{ if(is!=null)is.close(); if(os!=null)os.close(); clientSocket.close(); } catch (IOException e){ e.printStackTrace(); } } } } public static void main(String args[]){ ServerSocket echoServer = null; Socket clientSocket = null; try{ echoServer = new ServerSocket(7788); } catch (IOException e){ System.out.println(e); } while(true){ try{ clientSocket = echoServer.accept(); System.out.println(clientSocket.getRemoteSocketAddress() + " connect!"); tp.execute(new HandleMsg(clientSocket)); } catch (IOException e){ System.out.println(e); } } } }服务端输出:
/127.0.0.1:49505 connect!
/127.0.0.1:49504 connect!
/127.0.0.1:49506 connect!
/127.0.0.1:49508 connect!
/127.0.0.1:49507 connect!
(在这里停顿了一会儿)
spend:6003ms
spend:6003ms
spend:6003ms
spend:6003ms
spend:6003ms
客户端输出:
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
NIO:JDK1.4及以后的版本非阻塞IO
即是Non Blocking I/O,非阻塞IO,jdk1.4之后提供了一套专门的api专门操作非阻塞IO,接口以及类定义在java.nio包
NIO API由四个主要的部分组成:缓冲区(Buffers)、通道(Channels)、选择器(Selector)是其核心组成类。
NIO的工作大致流程为:
1、通道注册一个监听到事件处理器
2、有事件发生时,事件处理器会通知相应的通道处理
import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MultiThreadNIOEchoServer { public static Map<Socket,Long> geym_time_stat = new HashMap<Socket,Long>(1024); class EchoClient { private LinkedList<ByteBuffer> outq; EchoClient(){ outq = new LinkedList<ByteBuffer>(); } //Return the output queue. public LinkedList<ByteBuffer> getOutputQueue() { return outq; } //Enqueue a ByteBuffer on the output queue. public void enqueue(ByteBuffer bb) { outq.addFirst(bb); } } class HandleMsg implements Runnable{ SelectionKey sk; ByteBuffer bb; public HandleMsg(SelectionKey sk,ByteBuffer bb){ this.sk = sk; this.bb = bb; } public void run(){ EchoClient echoClient = (EchoClient) sk.attachment(); echoClient.enqueue(bb); sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //强迫selector立即返回 selector.wakeup(); } } private Selector selector; private ExecutorService tp = Executors.newCachedThreadPool(); //Accept a new client and set it up for reading. private void doAccept(SelectionKey sk){ ServerSocketChannel server = (ServerSocketChannel) sk.channel(); SocketChannel clientChannel; try { clientChannel = server.accept(); clientChannel.configureBlocking(false); //Register this channel for reading. SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); //Allocate an EchoClient instance and attach it to this selection key. EchoClient echoClient = new EchoClient(); clientKey.attach(echoClient); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("Accepted connection from " + clientAddress.getHostAddress() + "."); } catch (Exception e){ System.out.println("Failed to accept new client"); e.printStackTrace(); } } private void doRead(SelectionKey sk) { SocketChannel channel = (SocketChannel) sk.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try { len = channel.read(bb); if (len < 0) { disconnect(sk); return; } }catch (Exception e) { System.out.println("Failed to read from client."); e.printStackTrace(); disconnect(sk); return; } //Flip the buffer. bb.flip(); tp.execute(new HandleMsg(sk,bb)); } //Called when a SelectionKey id ready for writing. private void doWrite(SelectionKey sk) { SocketChannel channel = (SocketChannel) sk.channel(); EchoClient echoClient = (EchoClient) sk.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer bb = outq.getLast(); try { int len = channel.write(bb); if (len == -1) { disconnect(sk); return; } if (bb.remaining() == 0) { //The buffer was completely written, remove it. outq.removeLast(); } } catch (Exception e) { System.out.println("Failed to write to client."); e.printStackTrace(); disconnect(sk); } if (outq.size() == 0) { sk.interestOps(SelectionKey.OP_READ); } } private void disconnect(SelectionKey sk) { } private void startServer() throws Exception { selector = SelectorProvider.provider().openSelector(); //Create non-blocking server socket. ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(7788); ssc.socket().bind(isa); //Register the socket for select events. SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); //Loop forever. for(;;) { selector.select(); // if(selector.selectNow()==0){ // continue; // } Set readyKeys = selector.selectedKeys(); Iterator i = readyKeys.iterator(); long e = 0; while(i.hasNext()){ SelectionKey sk = (SelectionKey) i.next(); i.remove(); if(sk.isAcceptable()) { doAccept(sk); } else if(sk.isValid() && sk.isReadable()){ if(!geym_time_stat.containsKey(((SocketChannel)sk.channel()).socket())) geym_time_stat.put(((SocketChannel)sk.channel()).socket(), System.currentTimeMillis()); doRead(sk); } else if (sk.isValid() && sk.isWritable()) { doWrite(sk); e = System.currentTimeMillis(); long b = geym_time_stat.remove(((SocketChannel)sk.channel()).socket()); System.out.println("spend:"+(e-b)+"ms"); } } } } //Main entry point. public static void main(String args[]){ MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer(); try { echoServer.startServer(); } catch (Exception e) { System.out.println("Exception caught, program exiting.."); e.printStackTrace(); } } }服务端输出:
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
Accepted connection from 127.0.0.1.
(在这里停顿了一会儿)
spend:0ms
spend:1ms
spend:2ms
spend:1ms
spend:1ms
客户端输出:
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
from server: Hello!
代码中的主要流程为:
1、open ServerSocketChannel,configureBlocking false,bind host and port
2、open Selector
3、ServerSocketChannel register on Selector
4、有客户端连接的事件发生,事件处理器通知ServerSocketChannel去处理
对这一模型的总结:
NIO本身是基于事件驱动思想来完成的
NIO基于Selector,当有感兴趣的事件发生时,就通知对应的事件处理器去处理事件,如果没有,则不处理。当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。所以使用一个线程做轮询就可以了
Buffer,也是NIO的一个新特性,可以块状的读/写数据,效率得到极大的提高。
JDK1.7之后,AIO异步非阻塞IO
AIO,即是Asynchronous I/O,异步非阻塞I/O
JDK1.7之后,也叫作AIO,工作方式是异步非阻塞
AIO主要工作流程为:
客户端发起一个IO调用
服务端接受IO之后,异步回调接收成功后的IO,不会阻挡当前主流程,主流程继续接受下一个请求
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.nio.charset.Charset; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class AsynchronousIOServer { private static Charset charset = Charset.forName("UTF-8"); public static void main(String[] args) { int port = 9999; int processors = Runtime.getRuntime().availableProcessors(); ExecutorService threadPool = Executors.newFixedThreadPool(processors); try { AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool); AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group); server.bind(new InetSocketAddress(port)); doAccept(server); group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } catch (Exception e) { // } catch (IOException | InterruptedException e) {//原文章中时这么写的,我导入却报错,换成jdk1.8再看看报错不 e.printStackTrace(); System.out.println("close server"); System.exit(0); } } private static void doAccept(final AsynchronousServerSocketChannel server) { server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel client, Void attachment) { server.accept(null, this);// accept next client connect doRead(client, attachment); } @Override public void failed(Throwable exc, Void attachment) { exc.printStackTrace(); } }); } private static void doRead(final AsynchronousSocketChannel client, Void attachment) { ByteBuffer buffer = ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (result <= 0) { try { System.out.println("客户端断线:" + client.getRemoteAddress().toString()); attachment = null; } catch (IOException e) { e.printStackTrace(); } return; } attachment.flip(); String req = charset.decode(attachment).toString(); attachment.compact(); client.read(attachment, attachment, this);// next client read /** do service code **/ System.out.println(req); ByteBuffer resBuffer = ByteBuffer.wrap(("response:" + req).getBytes()); doWrite(client, resBuffer, resBuffer); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } private static <V> void doWrite(final AsynchronousSocketChannel client, ByteBuffer resBuffer, ByteBuffer attachment) { client.write(attachment, attachment, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (result <= 0) { try { System.out.println("客户端断线:" + client.getRemoteAddress().toString()); attachment = null; } catch (IOException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } }
注意:后来我想用AIO来实现UDP的发送与接收,但是却做不到,这是我自己发的一个帖子http://bbs.csdn.net/topics/392282605,哎,也没人回答,还得自问自答。。。
网上说是有AsynchronousDatagramChannel可以,但为什么我在jdk1.7.0_25根本就没有找到这个类啊。看到这么个连接http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6993126,我理解的意思是AsynchronousDatagramChannel在jdk1.7中已经被移除了,在jdk1.8中再观察观察情况再做决定。我看了jdk1.8也没有找到这个类啊,难道也被移除了?不知道我的理解是否正确。
补充:一开始模拟的客户端是发送TCP数据的,后来我想将这个代码改为发送UDP数据的,可是达不到那样的效果(无法用PrintWriter写数据和BufferedReader读数据,只能用DatagramSocket类的send和receive方法来发送和接收数据,所以也就无法发送一些数据和睡眠一定时间再发送。不知道我理解还是方法有问题,如果有知道的一起讨论下)。下面是我修改后的代码:
import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class HeavyUDPClient { private static ExecutorService tp = Executors.newCachedThreadPool(); final private static String TAG = "SocketUdp: "; public static class EchoClient implements Runnable{ public void run(){ try { DatagramSocket getSocket = new DatagramSocket(); InetSocketAddress toAddress = new InetSocketAddress(InetAddress.getLocalHost(), 7777); byte[] buf = "Hello!".getBytes(); DatagramPacket datapacket = new DatagramPacket(buf, buf.length); datapacket.setSocketAddress(toAddress); getSocket.send(datapacket); System.out.println("==message sended"); System.out.println("==block for receive messages..."); getSocket.receive(datapacket); buf = datapacket.getData(); System.out.println("from server: " + new String(buf)); getSocket.close(); } catch (SocketException e) { System.out.println(TAG + e.getMessage()); e.printStackTrace(); } catch (UnknownHostException e) { System.out.println(TAG + e.getMessage()); e.printStackTrace(); } catch (IOException e) { System.out.println(TAG + e.getMessage()); e.printStackTrace(); } } } public static void main(String args[]) throws IOException { EchoClient ec = new EchoClient(); // for(int i=0;i<15000;i++) for(int i=0;i<5;i++) tp.execute(ec); } }
参考:http://blog.csdn.net/wanghang88/article/details/51922117
推荐:可以浏览下该博主http://blog.csdn.net/column/details/sys-communication.html?&page=3的专栏,其中对io通信模型该系列的第1-5篇文章写得就很不错
相关文章推荐
- UNIX环境高级编程学习之第十六章网络IPC:套接字 - 非阻塞的Socket通信EPoll模型(多路复用), 实用Socket通信模板
- 关于重叠IO网络编程模型的学习!
- 网络编程IO模型--学习笔记
- 18天Java学习---Java的Socket网络编程以及多线程
- java网络编程Socket学习(一)
- day12 Java IO 中的文件路径 Propertity 文件 Socket 网络编程
- [零散篇]Java学习笔记---Java的Socket网络编程以及多线程
- Java Socket多线程编程、通信模型及socket协议详解
- JAVA学习笔记(网络编程 之 socket)
- 黑马程序员_Java学习日记20_Socket编程2
- Java学习疑惑(8)----可视化编程, 对Java中事件驱动模型的理解
- 黑马程序员_Java学习日记19_Socket编程1
- Java学习疑惑(8)----可视化编程, 对Java中事件驱动模型的理解
- java NIO非阻塞式IO网络编程学习笔记(一)
- SOCKET编程进阶之Overlapped IO完成例程模型
- java网络编程Socket学习(二)
- UNIX环境高级编程学习之第十六章网络IPC:套接字 - 非阻塞的Socket通信Select模型(多路复用), 实用Socket通信模板。
- java网络编程Socket学习(二)
- UNIX环境高级编程学习之第十六章网络IPC:套接字 - 非阻塞的Socket通信Poll模型(多路复用), 实用Socket通信模板
- java学习日记(9)———socket,网络编程的学习