您的位置:首页 > 编程语言 > Java开发

JAVA Socket编程学习5--IO模型

2017-11-08 22:36 330 查看
我们首先模拟一个比较糟糕的TCP客户端:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

public class HeavyTCPClient {
private static ExecutorService tp = Executors.newCachedThreadPool();
private static final int sleep_time = 1000*1000*1000;
public static class EchoClient implements Runnable{
public void run(){
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 7788));
writer = new PrintWriter(client.getOutputStream(), true);
writer.print("H");
LockSupport.parkNanos(sleep_time);
writer.print("e");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("l");
LockSupport.parkNanos(sleep_time);
writer.print("o");
LockSupport.parkNanos(sleep_time);
writer.print("!");
LockSupport.parkNanos(sleep_time);
writer.println();
writer.flush();

reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());

reader.close();
writer.close();
client.close();
} catch (UnknownHostException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
} finally {
try {
if (writer != null)
writer.close();
if (reader != null)
reader.close();
if (client != null)
client.close();
} catch (IOException e){
}
}
}
}

public static void main(String args[]) throws IOException {
EchoClient ec = new EchoClient();
for(int i=0;i<5;i++)
tp.execute(ec);
//			tp.shutdown();
}
}
BIO:JDK1.4之前的阻塞IO
BIO,即为Blocking I/O,阻塞IO,大致流程为

  1)服务端建立ServerSocket,以一个端口启动,

  2)等待客户端建立socket连接,如果没有连接,一直阻塞(等待),3)一个socket建立连接之后,从线程池中取一个线程取处理socket

对于这种模型的总结:

 缺点:如果请求量过大,线程池不够用,那么会严重影响性能。

目前tomcat I/O模型默认还是BIO,对于连接不大的,该模型代码编写简单,只需要关注线程内的连接即可

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreaEchoServer {
private static ExecutorService tp = Executors.newCachedThreadPool();
static class HandleMsg implements Runnable{
Socket clientSocket;
public HandleMsg(Socket clientSocket){
this.clientSocket = clientSocket;
}

@Override
public void run() {
BufferedReader is = null;
PrintWriter os = null;
try{
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
//从InputStream中读取客户端所发送的数据
String inputLine = null;
long b = System.currentTimeMillis();
while((inputLine = is.readLine()) != null){
os.println(inputLine);
}
long e = System.currentTimeMillis();
System.out.println("spend:"+(e-b)+"ms");
} catch (IOException e){
e.printStackTrace();
}finally{
try{
if(is!=null)is.close();
if(os!=null)os.close();
clientSocket.close();
} catch (IOException e){
e.printStackTrace();
}
}
}
}

public static void main(String args[]){
ServerSocket echoServer = null;
Socket clientSocket = null;
try{
echoServer = new ServerSocket(7788);
} catch (IOException e){
System.out.println(e);
}
while(true){
try{
clientSocket = echoServer.accept();
System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e){
System.out.println(e);
}
}
}
}
服务端输出:

/127.0.0.1:49505 connect!

/127.0.0.1:49504 connect!

/127.0.0.1:49506 connect!

/127.0.0.1:49508 connect!

/127.0.0.1:49507 connect!

(在这里停顿了一会儿)

spend:6003ms

spend:6003ms

spend:6003ms

spend:6003ms

spend:6003ms

客户端输出:

from server: Hello!

from server: Hello!

from server: Hello!

from server: Hello!

from server: Hello!

NIO:JDK1.4及以后的版本非阻塞IO

即是Non Blocking I/O,非阻塞IO,jdk1.4之后提供了一套专门的api专门操作非阻塞IO,接口以及类定义在java.nio包

   NIO API由四个主要的部分组成:缓冲区(Buffers)、通道(Channels)、选择器(Selector)是其核心组成类。

   NIO的工作大致流程为:

1、通道注册一个监听到事件处理器

2、有事件发生时,事件处理器会通知相应的通道处理

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadNIOEchoServer {
public static Map<Socket,Long> geym_time_stat = new HashMap<Socket,Long>(1024);
class EchoClient {
private LinkedList<ByteBuffer> outq;

EchoClient(){
outq = new LinkedList<ByteBuffer>();
}

//Return the output queue.
public LinkedList<ByteBuffer> getOutputQueue() {
return outq;
}

//Enqueue a ByteBuffer on the output queue.
public void enqueue(ByteBuffer bb) {
outq.addFirst(bb);
}
}

class HandleMsg implements Runnable{
SelectionKey sk;
ByteBuffer bb;
public HandleMsg(SelectionKey sk,ByteBuffer bb){
this.sk = sk;
this.bb = bb;
}
public void run(){
EchoClient echoClient = (EchoClient) sk.attachment();
echoClient.enqueue(bb);

sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
//强迫selector立即返回
selector.wakeup();
}
}

private Selector selector;
private ExecutorService tp = Executors.newCachedThreadPool();

//Accept a new client and set it up for reading.
private void doAccept(SelectionKey sk){
ServerSocketChannel server = (ServerSocketChannel) sk.channel();
SocketChannel clientChannel;
try {
clientChannel = server.accept();
clientChannel.configureBlocking(false);

//Register this channel for reading.
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
//Allocate an EchoClient instance and attach it to this selection key.
EchoClient echoClient = new EchoClient();
clientKey.attach(echoClient);

InetAddress clientAddress = clientChannel.socket().getInetAddress();
System.out.println("Accepted connection from " + clientAddress.getHostAddress() + ".");
} catch (Exception e){
System.out.println("Failed to accept new client");
e.printStackTrace();
}
}

private void doRead(SelectionKey sk) {
SocketChannel channel = (SocketChannel) sk.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);
int len;

try {
len = channel.read(bb);
if (len < 0) {
disconnect(sk);
return;
}
}catch (Exception e) {
System.out.println("Failed to read from client.");
e.printStackTrace();
disconnect(sk);
return;
}

//Flip the buffer.
bb.flip();
tp.execute(new HandleMsg(sk,bb));
}

//Called when a SelectionKey id ready for writing.
private void doWrite(SelectionKey sk) {
SocketChannel channel = (SocketChannel) sk.channel();
EchoClient echoClient = (EchoClient) sk.attachment();
LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();

ByteBuffer bb = outq.getLast();
try {
int len = channel.write(bb);
if (len == -1) {
disconnect(sk);
return;
}

if (bb.remaining() == 0) {
//The buffer was completely written, remove it.
outq.removeLast();
}
} catch (Exception e) {
System.out.println("Failed to write to client.");
e.printStackTrace();
disconnect(sk);
}

if (outq.size() == 0) {
sk.interestOps(SelectionKey.OP_READ);
}
}

private void disconnect(SelectionKey sk) {

}

private void startServer() throws Exception {
selector = SelectorProvider.provider().openSelector();

//Create non-blocking server socket.
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);

InetSocketAddress isa = new InetSocketAddress(7788);
ssc.socket().bind(isa);

//Register the socket for select events.
SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);

//Loop forever.
for(;;) {
selector.select();
//			if(selector.selectNow()==0){
//				continue;
//			}
Set readyKeys = selector.selectedKeys();
Iterator i = readyKeys.iterator();
long e = 0;
while(i.hasNext()){
SelectionKey sk = (SelectionKey) i.next();
i.remove();

if(sk.isAcceptable()) {
doAccept(sk);
}
else if(sk.isValid() && sk.isReadable()){
if(!geym_time_stat.containsKey(((SocketChannel)sk.channel()).socket()))
geym_time_stat.put(((SocketChannel)sk.channel()).socket(),
System.currentTimeMillis());
doRead(sk);
}
else if (sk.isValid() && sk.isWritable()) {
doWrite(sk);
e = System.currentTimeMillis();
long b = geym_time_stat.remove(((SocketChannel)sk.channel()).socket());
System.out.println("spend:"+(e-b)+"ms");
}
}
}
}

//Main entry point.
public static void main(String args[]){
MultiThreadNIOEchoServer echoServer = new MultiThreadNIOEchoServer();
try {
echoServer.startServer();
} catch (Exception e) {
System.out.println("Exception caught, program exiting..");
e.printStackTrace();
}
}
}
服务端输出:

Accepted connection from 127.0.0.1.

Accepted connection from 127.0.0.1.

Accepted connection from 127.0.0.1.

Accepted connection from 127.0.0.1.

Accepted connection from 127.0.0.1.

(在这里停顿了一会儿)

spend:0ms

spend:1ms

spend:2ms

spend:1ms

spend:1ms

客户端输出:

from server: Hello!

from server: Hello!

from server: Hello!

from server: Hello!

from server: Hello!

代码中的主要流程为:

1、open ServerSocketChannel,configureBlocking false,bind host and port

2、open Selector

3、ServerSocketChannel register on Selector

4、有客户端连接的事件发生,事件处理器通知ServerSocketChannel去处理

对这一模型的总结:

NIO本身是基于事件驱动思想来完成的

NIO基于Selector,当有感兴趣的事件发生时,就通知对应的事件处理器去处理事件,如果没有,则不处理。当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。所以使用一个线程做轮询就可以了

Buffer,也是NIO的一个新特性,可以块状的读/写数据,效率得到极大的提高。

JDK1.7之后,AIO异步非阻塞IO

AIO,即是Asynchronous I/O,异步非阻塞I/O

JDK1.7之后,也叫作AIO,工作方式是异步非阻塞

AIO主要工作流程为:

客户端发起一个IO调用

服务端接受IO之后,异步回调接收成功后的IO,不会阻挡当前主流程,主流程继续接受下一个请求

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AsynchronousIOServer {
private static Charset charset = Charset.forName("UTF-8");

public static void main(String[] args) {
int port = 9999;

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService threadPool = Executors.newFixedThreadPool(processors);

try {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);
server.bind(new InetSocketAddress(port));

doAccept(server);

group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (Exception e) {
//        } catch (IOException | InterruptedException e) {//原文章中时这么写的,我导入却报错,换成jdk1.8再看看报错不
e.printStackTrace();
System.out.println("close server");
System.exit(0);
}
}

private static void doAccept(final AsynchronousServerSocketChannel server) {
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel client, Void attachment) {
server.accept(null, this);// accept next client connect

doRead(client, attachment);
}

@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});

}

private static void doRead(final AsynchronousSocketChannel client, Void attachment) {
ByteBuffer buffer = ByteBuffer.allocate(1024);

client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result <= 0) {
try {
System.out.println("客户端断线:" + client.getRemoteAddress().toString());
attachment = null;
} catch (IOException e) {
e.printStackTrace();
}
return;
}

attachment.flip();
String req = charset.decode(attachment).toString();
attachment.compact();

client.read(attachment, attachment, this);// next client read

/** do service code **/
System.out.println(req);

ByteBuffer resBuffer = ByteBuffer.wrap(("response:" + req).getBytes());
doWrite(client, resBuffer, resBuffer);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}

});
}

private static <V> void doWrite(final AsynchronousSocketChannel client, ByteBuffer resBuffer, ByteBuffer attachment) {
client.write(attachment, attachment, new CompletionHandler<Integer, ByteBuffer>() {

@Override
public void completed(Integer result, ByteBuffer attachment) {

if (result <= 0) {
try {
System.out.println("客户端断线:" + client.getRemoteAddress().toString());
attachment = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
}

注意:后来我想用AIO来实现UDP的发送与接收,但是却做不到,这是我自己发的一个帖子http://bbs.csdn.net/topics/392282605,哎,也没人回答,还得自问自答。。。

网上说是有AsynchronousDatagramChannel可以,但为什么我在jdk1.7.0_25根本就没有找到这个类啊。看到这么个连接http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6993126,我理解的意思是AsynchronousDatagramChannel在jdk1.7中已经被移除了,在jdk1.8中再观察观察情况再做决定。我看了jdk1.8也没有找到这个类啊,难道也被移除了?不知道我的理解是否正确。

补充:一开始模拟的客户端是发送TCP数据的,后来我想将这个代码改为发送UDP数据的,可是达不到那样的效果(无法用PrintWriter写数据和BufferedReader读数据,只能用DatagramSocket类的send和receive方法来发送和接收数据,所以也就无法发送一些数据和睡眠一定时间再发送。不知道我理解还是方法有问题,如果有知道的一起讨论下)。下面是我修改后的代码:
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class HeavyUDPClient {

private static ExecutorService tp = Executors.newCachedThreadPool();
final private static String TAG = "SocketUdp: ";

public static class EchoClient implements Runnable{
public void run(){
try {
DatagramSocket getSocket = new DatagramSocket();
InetSocketAddress toAddress = new InetSocketAddress(InetAddress.getLocalHost(), 7777);

byte[] buf = "Hello!".getBytes();
DatagramPacket datapacket = new DatagramPacket(buf, buf.length);
datapacket.setSocketAddress(toAddress);
getSocket.send(datapacket);
System.out.println("==message sended");

System.out.println("==block for receive messages...");
getSocket.receive(datapacket);
buf = datapacket.getData();
System.out.println("from server: " + new String(buf));

getSocket.close();
} catch (SocketException e) {
System.out.println(TAG + e.getMessage());
e.printStackTrace();
} catch (UnknownHostException e) {
System.out.println(TAG + e.getMessage());
e.printStackTrace();
} catch (IOException e) {
System.out.println(TAG + e.getMessage());
e.printStackTrace();
}
}
}

public static void main(String args[]) throws IOException {
EchoClient ec = new EchoClient();
//		for(int i=0;i<15000;i++)
for(int i=0;i<5;i++)
tp.execute(ec);
}
}


参考:http://blog.csdn.net/wanghang88/article/details/51922117

推荐:可以浏览下该博主http://blog.csdn.net/column/details/sys-communication.html?&page=3的专栏,其中对io通信模型该系列的第1-5篇文章写得就很不错
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: