您的位置:首页 > 编程语言 > Java开发

Java 时间服务器demo之NIO实现

2016-01-07 22:49 543 查看

0.前文

Java 时间服务器demo之线程池

/article/9065257.html

1.NIO主要类库

缓冲区Buffer

通道Channel

多路复用器Selector

2.Java IO与NIO比较

面向流与面向缓冲

Java NIO和IO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞IO

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

3.Java IO与NIO的使用场景比较

如果需要管理同时打开的成千上万个连接,这些连接每次只是发送少量的数据,例如聊天服务器,实现NIO的服务器可能是一个优势。同样,如果你需要维持许多打开的连接到其他计算机上,如P2P网络中,使用一个单独的线程来管理你所有出站连接,可能是一个优势。

如果你有少量的连接使用非常高的带宽,一次发送大量的数据,也许典型的IO服务器实现可能非常契合。

4.时间服务器demo

代码来自 李林峰《Netty权威指南》

服务端:

TimeServer.java

package com.ccy.IO.nio;

import java.io.IOException;

public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if(args!=null && args.length>0){
port = Integer.valueOf(args[0]);
}

MultTimeServer server = new MultTimeServer(port);
new Thread(server).start();
}
}


MultTimeServer.java

package com.ccy.IO.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.Set;

public class MultTimeServer implements Runnable{
private Selector selector;
private ServerSocketChannel channel;
private volatile boolean stop;

public MultTimeServer(int port){
try {
selector = Selector.open();
channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress(port), 1024);
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("TIME SERVER IS LISTENING!!!");
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}

public void stop(){
this.stop =true;
}

@Override
public void run() {
while(!stop){
try {
//selector每一秒被唤醒一次
selector.select(1000);
//还回就绪状态的chanel的selectedKeys
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while(iterator.hasNext()){
key = iterator.next();
iterator.remove();
try{
handleInput(key);
}catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if(selector!=null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
if(key.isAcceptable()){
//通过ServerSocketChannel的accept()操作接收客户端的请求并创立SocketChannel连接,相当于完成TCP三次握手操作
ServerSocketChannel schannel = (ServerSocketChannel) key.channel();
SocketChannel accept = schannel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//开辟缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//非阻塞读
int size = sc.read(buffer);
//根据还回结果做判断
if(size>0){
//设置当前读取位置
buffer.flip();
byte[] arr = new byte[buffer.remaining()];
buffer.get(arr);
String body = new String(arr,"UTF-8");
System.out.println(body);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
doWriter(sc,format.format(new java.util.Date()));
}else if(size<0){
key.cancel();
sc.close();
}
}

}
}

private void doWriter(SocketChannel sc, String res) throws IOException {
if(res != null && res.trim().length()>0){
byte[] bytes = res.getBytes();
ByteBuffer buffe = ByteBuffer.allocate(bytes.length);
buffe.put(bytes);
buffe.flip();
sc.write(buffe);
}
}
}
客户端:

TimeClient.java

package com.ccy.IO.nio;

public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if(args!=null && args.length>0){
port = Integer.valueOf(args[0]);
}
new Thread(new TimeClientHandler("127.0.0.1", port)).start();
}
}


TimeClientHandler.java

package com.ccy.IO.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandler implements Runnable {
private int port;
private String host;
private Selector selector;
private SocketChannel channel;
private volatile boolean stop;

public TimeClientHandler(String host,int port){
this.host = host ==null?"127.0.0.1":host;
this.port = port;
try {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}

}

@Override
public void run() {
try {
doConnect();
} catch (IOException e1) {
e1.printStackTrace();
System.exit(1);
}
while(!stop){
try {
//selector每一秒被唤醒一次
selector.select(1000);
//还回就绪状态的chanel的selectedKeys
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while(iterator.hasNext()){
key = iterator.next();
iterator.remove();
try{
handleInput(key);
}catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if(selector!=null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
selector =null;
}
}

public void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else{
System.exit(1);// 连接失败,进程退出
}
}
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
this.stop = true;
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}

}
}

private void doConnect() throws IOException{
if(channel.connect(new InetSocketAddress(host, port))){
channel.register(selector, SelectionKey.OP_READ);
doWrite(channel);
}else{
channel.register(selector, SelectionKey.OP_CONNECT);
}
}

private void doWrite(SocketChannel schannel) throws IOException {
byte[] bytes = "What time is it now?".getBytes();
ByteBuffer buff = ByteBuffer.allocate(bytes.length);
buff.put(bytes);
buff.flip();
schannel.write(buff);
//判断是否发送完毕
if(!buff.hasRemaining()){
System.out.println("SEND SUCCESS!");
}

}

}


5.NIO版本时间服务器分析

3.1 通过多路复用器selector,客户端发起的链接是异步的,无须等待

3.2 SocketChannel读写操作亦是异步操作,提高了IO

3.3 线程模型优化,Selector通过epoll实现无连接数的限制,这意味着一个Selector可以处理成千上百个链接,而且性能不会随着客户端的数量的增加而下降,非常适合做高性能,高负载的网络服务器、

6.关于NIO的更多知识

http://www.iteye.com/magazines/132-Java-NIO#585

http://ifeve.com/overview/

先模仿在创造吧!

更多精彩内容请继续关注我的博客:http://blog.csdn.net/caicongyang

记录与分享,你我共成长 -from caicongyang
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: