您的位置:首页 > 编程语言 > Java开发

java NIO流代码

2016-05-05 14:32 423 查看
服务器端:

package org.zbus.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class EchoServer {
public static SelectorLoop connectionBell;
public static SelectorLoop readBell;
public boolean isReadBellRunning=false;

public static void main(String[] args) throws IOException {
new EchoServer().startServer();
}

// 启动服务器
public void startServer() throws IOException {
// 准备好一个闹钟.当有链接进来的时候响.
connectionBell = new SelectorLoop();

// 准备好一个闹装,当有read事件进来的时候响.
readBell = new SelectorLoop();

// 开启一个server channel来监听
ServerSocketChannel ssc = ServerSocketChannel.open();
// 开启非阻塞模式
ssc.configureBlocking(false);

ServerSocket socket = ssc.socket();
socket.bind(new InetSocketAddress("localhost",8888));

// 给闹钟规定好要监听报告的事件,这个闹钟只监听新连接事件.
ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
new Thread(connectionBell).start();
}

// Selector轮询线程类
public class SelectorLoop implements Runnable {
private Selector selector;
private ByteBuffer temp = ByteBuffer.allocate(1024);

public SelectorLoop() throws IOException {
this.selector = Selector.open();
}

public Selector getSelector() {
return this.selector;
}

@Override
public void run() {
while(true) {
try {
// 阻塞,只有当至少一个注册的事件发生的时候才会继续.
this.selector.select();

Set<SelectionKey> selectKeys = this.selector.selectedKeys();
Iterator<SelectionKey> it = selectKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// 处理事件. 可以用多线程来处理.
this.dispatch(key);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void dispatch(SelectionKey key) throws IOException, InterruptedException {
if (key.isAcceptable()) {
// 这是一个connection accept事件, 并且这个事件是注册在serversocketchannel上的.
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 接受一个连接.
SocketChannel sc = ssc.accept();
System.out.println("服务器启动连接成功!");
// 对新的连接的channel注册read事件. 使用readBell闹钟.
sc.configureBlocking(false);
sc.register(readBell.getSelector(), SelectionKey.OP_READ);

// 如果读取线程还没有启动,那就启动一个读取线程.
synchronized(EchoServer.this) {
if (!EchoServer.this.isReadBellRunning) {
EchoServer.this.isReadBellRunning = true;
new Thread(readBell).start();
}
}

} else if (key.isReadable()) {
// 这是一个read事件,并且这个事件是注册在socketchannel上的.
SocketChannel sc = (SocketChannel) key.channel();
// 写数据到buffer
int count = sc.read(temp);
if (count < 0) {
// 客户端已经断开连接.
key.cancel();
sc.close();
return;
}
// 切换buffer到读状态,内部指针归位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("服務器端收到消息 ["+msg+"] from client address:" + sc.getRemoteAddress());

Thread.sleep(1000);
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));

// 清空buffer
temp.clear();
}
}

}

}


客户端代码:
package org.zbus.test;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class MyClient implements Runnable{

// 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
private static int idleCounter = 0;
private static Selector selector;
private static SocketChannel socketChannel;
private ByteBuffer temp = ByteBuffer.allocate(1024);
public MyClient() throws IOException {
socketChannel=SocketChannel.open();
selector=Selector.open();
Boolean isConnected=socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
System.out.println("============================="+isConnected);
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
if(isConnected){
MyClient.sendFirstMsg();
}else {
// 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
key.interestOps(SelectionKey.OP_CONNECT);
}
}
public static void main(String[] args) throws Exception {
new Thread(new MyClient()).start();;
}
public static void sendFirstMsg() throws IOException {
String msg = "你好,世界!";
socketChannel.write(ByteBuffer.wrap(msg.getBytes("UTF-8")));
}

@Override
public void run() {
while (true) {
int num1=0;
try {
// 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
int num = this.selector.select(1000);
if (num ==0) {
idleCounter ++;
if(idleCounter >10) {
// 如果server断开了连接,发送消息将失败.
try {
MyClient.sendFirstMsg();
} catch(ClosedChannelException e) {
e.printStackTrace();
MyClient.socketChannel.close();
return;
}
}
continue;
} else {
idleCounter = 0;
}
Set<SelectionKey> keys = MyClient.selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isConnectable()) {
// socket connected
SocketChannel sc = (SocketChannel)key.channel();
if (sc.isConnectionPending()) {
sc.finishConnect();
}
// send first message;
this.sendFirstMsg();
}
if (key.isReadable()) {
// msg received.
SocketChannel sc = (SocketChannel)key.channel();
this.temp = ByteBuffer.allocate(1024);
int count = sc.read(temp);
if (count<0) {
sc.close();
continue;
}
// 切换buffer到读状态,内部指针归位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("客户端发出信息 ["+msg+"] from server address:" + sc.getRemoteAddress());

Thread.sleep(1000);
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));

// 清空buffer
temp.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: