网络编程-BIO、NIO、AIO
2017-10-31 23:59
363 查看
基本概念
Socket又称“套接字”,应用程序通常通过“套接字”向网络发出请求或者应答网络请求。Socket类库位于java.net包中,ServerSocket用于服务器端,Socket是建立网络连接时使用的。在连接成功时,应用程序两端都会产生一个Socket实例,操作这个实例,完成所需的会话。对于一个网络连接来说,套接字是平等的,不因为在服务器端或在客户端而产生不同级别。不管是Socket还是ServerSocket都是通过SocketImpl类及子类完成的。
Socket通信四步骤:
1. 服务器监听,是服务器端套接字并不定位具体的客户端套接字,而是处于等待连接的状态,实时监控网络状态。
2. 客户端请求,由客户端的套接字提出连接请求,要连接的目标是服务器端的套接字。客户端的套接字必须首先描述它要连接的服务器的套接字,指出服务器套接字的地址和端口号,然后就向服务器端套接字提出连接请求。
3. 服务器端连接确认,服务器端套接字监听到或者接收到客户端套接字的连接请求,它就响应客户端套接字的请求,建立一个新的线程,把服务器端套接字的描述发给客户端。
4. 客户端连接确认,一旦客户端确认了此描述,连接就建立好了。双方开始进行通信,而服务器端套接字继续处于监听状态,继续接收其他客户端套接字的连接请求。
public class Server { final static int PORT = 6666; public static void main(String[] args) { ServerSocket server = null; try { server = new ServerSocket(PORT); System.out.println("server start.."); //进行阻塞 Socket socket = server.accept(); //新建一个线程执行客户端任务 new Thread(new ServerHandler(socket)).start();; } catch (IOException e) { e.printStackTrace(); } finally { if (server != null) { try { server.close(); } catch (IOException e) { e.printStackTrace(); } } server = null; } } } public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT = 6666; public static void main(String[] args) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(ADDRESS,PORT); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); //向服务端发送数据 out.println("接收到客户端请求数据.."); String response = in.readLine(); System.out.println("Client:" + response); } catch (IOException e) { e.printStackTrace(); } finally { if(in != null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } in = null; out = null; socket = null; } } }
伪异步IO,采用线程池和任务队列可以实现一种伪异步的IO通信框架。将客户端的socket封装成一个task任务(实现runnable接口的类)然后投递到线程池中,配置相应的队列进行实现。
阻塞:应用程序在获取网络数据的时候,如果网络传输数据很慢,那么程序就一直等待,直到传输完毕为止。
非阻塞:应用程序直接可以获取已经准备就绪好的数据,无需等待。
同步:应用程序会直接参与IO读写操作,并且我们的应用程序会直接阻塞到某一个方法上,直到数据准备就绪,或者采用轮询的策略实时检查数据的就绪状态,如果就绪则获取数据。
异步时,则所有的IO读写操作交给操作系统处理,与我们的应用程序没有直接关系,我们程序不需要关心IO读写,当操作系统完成了IO读写操作时,会给我们应用程序发送通知,应用程序直接拿走数据即可。
BIO为同步阻塞,NIO为同步非阻塞,AIO为异步非阻塞。
Buffer是一个对象,它包含一些要写入或者要读取的数据。在NIO类库中加入Buffer对象,体现了新库与原IO的的一个重要的区别。在面向流的IO中,可以将数据直接写入或读取到Stream对象中。在NIO库中,所有数据都是用缓冲区处理的(读写)。缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),也可以使用其他类型的数组。这个数组为缓冲区提供了数据的访问读写等操作属性,如位置、容量、上限等概念。
IntBuffer buffer = IntBuffer.allocate(10); buffer.put(6); buffer.put(8); buffer.put(10); buffer.flip();//buffer位置复位 System.out.println("使用flip复位:" + buffer); System.out.println("容量为:" + buffer.capacity()); System.out.println("限制为:" + buffer.limit()); System.out.println("获取下标为1的元素:" + buffer.get(1)); System.out.println("get(index)方法,position位置不改变:" + buffer); buffer.put(1, 4); System.out.println("put(index,change)方法,position位置不变:" + buffer); for (int i = 0; i < buffer.limit(); i++) { //调用get方法会使其缓冲区位置(position)向后递增一位 System.out.println(buffer.get()); } System.out.println("buffer对象遍历之后为:" + buffer);
//wrap方法使用 int[] arr = new int[]{1,2,3}; IntBuffer buffer = IntBuffer.wrap(arr); System.out.println(buffer); IntBuffer intBuffer = IntBuffer.wrap(arr, 0, 2); //容量为数组arr的长度,但是可操作的元素只有实际进入缓存区的元素长度 System.out.println(intBuffer);*
IntBuffer buffer = IntBuffer.allocate(10); int[] arr = new int[]{1,2,3}; buffer.put(arr); System.out.println(buffer); //一种复制方法 IntBuffer intBuffer = buffer.duplicate(); System.out.println(intBuffer); //设置buffer的位置属性 // buffer.position(0); buffer.flip();//复位 System.out.println(buffer); System.out.println("可读数据为:" + buffer.remaining()); int[] arr2 = new int[buffer.remaining()]; //将缓冲区数据放入arr2中去 buffer.get(arr2); for (int i : arr2) { System.out.print(Integer.toString(i) + ","); }
BIO编程
public class ServerHandler1 implements Runnable { private Socket socket; public ServerHandler1(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader br = null ; PrintWriter pw = null; try { br = new BufferedReader(new InputStreamReader(socket.getInputStream())); pw = new PrintWriter(socket.getOutputStream(),true); String body = null; while (true) { body = br.readLine(); if (body == null) { break; } System.out.println("server:" + body); pw.println("服务端回应数据"); } } catch (IOException e) { e.printStackTrace(); } finally { if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class Server1 { private final static int PORT = 6666; public static void main(String[] args) { ServerSocket serverSocket = null; Socket socket = null; try { //创建serverSocket serverSocket = new ServerSocket(PORT); System.out.println("server start"); //阻塞 socket = serverSocket.accept(); new Thread(new ServerHandler1(socket)).start(); } catch (IOException e) { e.printStackTrace(); } finally { if(serverSocket != null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class Client1 { private final static String ADDRESS = "127.0.0.1"; private final static int PORT = 6666; public static void main(String[] args) { BufferedReader br = null; PrintWriter pw = null; Socket socket = null; try { //创建socket socket = new Socket(ADDRESS, PORT); br = new BufferedReader(new InputStreamReader(socket.getInputStream())); pw = new PrintWriter(socket.getOutputStream(),true); pw.println("客户端向服务端发送信息"); String readLine = br.readLine(); System.out.println(readLine); } catch (IOException e) { e.printStackTrace(); } finally { if (pw != null) { pw.close(); } if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
伪异步
public class ServerHandler1 implements Runnable{ private Socket socket; public ServerHandler1(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); String body = null; while (true) { body = in.readLine(); if(body == null) break; System.out.println("客户端发送信息: " + body); out.println("server response"); } } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } } //自定义线程池 public class HandlerExecutorPool1 { private ExecutorService executors; public HandlerExecutorPool1(int maxPoolSize, int queueSize) { this.executors = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); } public void execute(Runnable task){ this.executors.submit(task); } } //服务端 public class Server1 { final static int PORT = 6666; public static void main(String[] args) { try { ServerSocket serverSocket = new ServerSocket(PORT); System.out.println("server start"); HandlerExecutorPool1 pool = new HandlerExecutorPool1(50,1000); Socket socket = null; while (true) { socket = serverSocket.accept(); pool.execute(new ServerHandler1(socket)); } } catch (IOException e) { e.printStackTrace(); } } } //客户端 public class Client1 { final static String ADDRESS = "127.0.0.1"; final static int PORT = 6666; public static void main(String[] args) { BufferedReader in = null; PrintWriter out = null; Socket socket = null; try { socket = new Socket(ADDRESS, PORT); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(),true); out.println("client response"); String response = in.readLine(); System.out.println("服务端发送的信息:" + response); } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
NIO编程
public class Server1 implements Runnable{ //1 多路复用器(管理所有的通道) private Selector seletor; //2 建立缓冲区 private ByteBuffer readBuf = ByteBuffer.allocate(1024); //3 private ByteBuffer writeBuf = ByteBuffer.allocate(1024); public Server1(int port){ try { //1 打开路复用器 this.seletor = Selector.open(); //2 打开服务器通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //3 设置服务器通道为非阻塞模式 ssc.configureBlocking(false); //4 绑定地址 ssc.bind(new InetSocketAddress(port)); //5 把服务器通道注册到多路复用器上,并且监听阻塞事件 ssc.register(this.seletor, SelectionKey.OP_ACCEPT); System.out.println("Server start, port :" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { while(true){ try { //1 必须要让多路复用器开始监听 this.seletor.select(); //2 返回多路复用器已经选择的结果集 Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator(); //3 进行遍历 while(keys.hasNext()){ //4 获取一个选择的元素 SelectionKey key = keys.next(); //5 直接从容器中移除就可以了 keys.remove(); //6 如果是有效的 if(key.isValid()){ //7 如果为阻塞状态 if(key.isAcceptable()){ this.accept(key); } //8 如果为可读状态 if(key.isReadable()){ this.read(key); } //9 写数据 if(key.isWritable()){ //this.write(key); //ssc } } } } catch (IOException e) { e.printStackTrace(); } } } private void write(SelectionKey key){ //ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //ssc.register(this.seletor, SelectionKey.OP_WRITE); } private void read(SelectionKey key) { try { //1 清空缓冲区旧的数据 this.readBuf.clear(); //2 获取之前注册的socket通道对象 SocketChannel sc = (SocketChannel) key.channel(); //3 读取数据 int count = sc.read(this.readBuf); //4 如果没有数据 if(count == -1){ key.channel().close(); key.cancel(); return; } //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位) this.readBuf.flip(); //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据 byte[] bytes = new byte[this.readBuf.remaining()]; //7 接收缓冲区数据 this.readBuf.get(bytes); //8 打印结果 String body = new String(bytes).trim(); System.out.println("Server : " + body); // 9..可以写回给客户端数据 } catch (IOException e) { e.printStackTrace(); } } private void accept(SelectionKey key) { try { //1 获取服务通道 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); //2 执行阻塞方法 SocketChannel sc = ssc.accept(); //3 设置阻塞模式 sc.configureBlocking(false); //4 注册到多路复用器上,并设置读取标识 sc.register(this.seletor, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(new Server1(8765)).start();; } } public class Client1 { //需要一个Selector public static void main(String[] args) { //创建连接的地址 InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765); //声明连接通道 SocketChannel sc = null; //建立缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); try { //打开通道 sc = SocketChannel.open(); //进行连接 sc.connect(address); while(true){ //定义一个字节数组,然后使用系统录入功能: byte[] bytes = new byte[1024]; System.in.read(bytes); //把数据放到缓冲区中 buf.put(bytes); //对缓冲区进行复位 buf.flip(); //写出数据 sc.write(buf); //清空缓冲区数据 buf.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if(sc != null){ try { sc.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
AIO编程,在NIO基础之上引入了异步通道,并提供了异步文件和异步套接字通道的实现。
- AsynchronousServerScoketChannel
- AsynchronousScoketChanel
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { @Override public void completed(AsynchronousSocketChannel asc, Server attachment) { //当有下一个客户端接入的时候 直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞 attachment.assc.accept(attachment, this); read(asc); } private void read(final AsynchronousSocketChannel asc) { //读取数据 ByteBuffer buf = ByteBuffer.allocate(1024); asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer resultSize, ByteBuffer attachment) { //进行读取之后,重置标识位 attachment.flip(); //获得读取的字节数 System.out.println("Server -> " + "收到客户端的数据长度为:" + resultSize); //获取读取的数据 String resultData = new String(attachment.array()).trim(); System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData); String response = "服务器响应, 收到了客户端发来的数据: " + resultData; write(asc, response); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } private void write(AsynchronousSocketChannel asc, String response) { try { ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(response.getBytes()); buf.flip(); asc.write(buf).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Server attachment) { exc.printStackTrace(); } } public class Server { //线程池 private ExecutorService executorService; //线程组 private AsynchronousChannelGroup threadGroup; //服务器通道 public AsynchronousServerSocketChannel assc; public Server(int port){ try { //创建一个缓存池 executorService = Executors.newCachedThreadPool(); //创建线程组 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //创建服务器通道 assc = AsynchronousServerSocketChannel.open(threadGroup); //进行绑定 assc.bind(new InetSocketAddress(port)); System.out.println("server start , port : " + port); //进行阻塞 assc.accept(this, new ServerCompletionHandler()); //一直阻塞 不让服务器停止 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { Server server = new Server(8765); } } public class Client implements Runnable{ private AsynchronousSocketChannel asc ; public Client() throws Exception { asc = AsynchronousSocketChannel.open(); } public void connect(){ asc.connect(new InetSocketAddress("127.0.0.1", 8765)); } public void write(String request){ try { asc.write(ByteBuffer.wrap(request.getBytes())).get(); read(); } catch (Exception e) { e.printStackTrace(); } } private void read() { ByteBuffer buf = ByteBuffer.allocate(1024); try { asc.read(buf).get(); buf.flip(); byte[] respByte = new byte[buf.remaining()]; buf.get(respByte); System.out.println(new String(respByte,"utf-8").trim()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void run() { while(true){ } } public static void main(String[] args) throws Exception { Client c1 = new Client(); c1.connect(); Client c2 = new Client(); c2.connect(); Client c3 = new Client(); c3.connect(); new Thread(c1, "c1").start(); new Thread(c2, "c2").start(); new Thread(c3, "c3").start(); Thread.sleep(1000); c1.write("c1 aaa"); c2.write("c2 bbbb"); c3.write("c3 ccccc"); } }
相关文章推荐
- 详解Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 网络编程之BIO、NIO、AIO
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 11 Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 网络IO编程BIO、NIO、AIO区别
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 互联网架构(7):Socket网络通信编程--BIO/NIO/AIO
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- 【转】Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)