您的位置:首页 > 编程语言 > Java开发

使用javaNIO实现C/S模式的通信

2017-08-07 00:17 801 查看
NIO使用非阻塞IO的方式实现,服务器与客户端的交流,适用于大量连接,而数据量少的情况。通过一个线程轮询所有的通道,处理注册的事件,而主线程可以继续干其他的事情。这样所有的I/O都交给一个线程处理,减少了线程IO的切换。如果具体学习NIO的架构和原理请点击下面的连接

点击打开链接 http://ifeve.com/selectors/
以下为一个使用NIO实现的C/S通信模式,对应简单例子学习,可以更容易入手。

服务端代码如下:

主线程类:MainChannel

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

public class MainChannel {

public static void main(String[] args) {
// TODO Auto-generated method stub
// 声明服务器监听通道 和select选择器
ServerSocketChannel serverChannel = null;
Selector selector = null;

try {
// 实例化selector 此种实例化模式说明 selector 是单例的
selector = Selector.open();
// 实例化服务器监听端口
serverChannel = ServerSocketChannel.open();
// 绑定监听地址。
serverChannel.socket().bind(new InetSocketAddress(8881));
// 设置channel为非阻塞模式,一定要非阻塞模式才能注册到selector中
serverChannel.configureBlocking(false);

// 把监听通道注册到选择器中, 监听此通道的连接事件。SelectionKey.OP_ACCEPT 指定为连接事件。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//所有通道,交给通道管理器,统一管理
ChannelManager.addChannel(serverChannel);
// 开启一个线程负责管理selector,并轮询是否有注册监听的事件就绪。
Thread thread = new Thread(new ServerNIO(selector));
thread.start();

// 然后主线程 就可以干其他的事情了。不管客户端连接 还是I/O
// 都不会阻塞此线程,只会阻塞selector管理线程,且只在等待事件发生时阻塞。
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("服务器监听发生异常");
}
}
}管理selector的线程类:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerNIO implements Runnable {
// 客户端通道,用于给某个客户端收发数据
private SocketChannel socketChannel;

// 缓冲区,用户收发数据,面向通道。
private ByteBuffer buf = ByteBuffer.allocate(1024);

// 选择器,从主线程注入
private Selector selector;

// 指定线程是否轮询
private boolean flag = true;

// 构造器中注入selector
public ServerNIO(Selector selector) {
// TODO Auto-generated constructor stub
this.selector = selector;
}

// 开启线程等待事件的发生,轮询通道。
@Override
public void run() {
// TODO Auto-generated method stub
try {
while (flag) {

/**
* 获取等待的事件就绪的通道,如果没有通道有事件就绪有三种情况
* 1. 直接返回:selectNow();
* 2. 超时返回:select(int timeout);
* 3. 阻 塞: select();
*/
int nums = selector.select(); // 阻塞模式
if (nums > 0) { // 阻塞模式下 此次判定多余
// 当事件发生了,获取发生事件通道的key集合;
Set<SelectionKey> selectKeys = selector.selectedKeys();

// 迭代遍历这个keys集合
Iterator<SelectionKey> iter = selectKeys.iterator();

while (iter.hasNext()) {
// 获取单个通道的key
SelectionKey key = iter.next();
// 如果是读取事件就绪。说明有客户端向服务器发送数据了。
if (key.isReadable()) {
// 先获取到客户端channel
socketChannel = (SocketChannel) key.channel();
buf.clear();
// 利用buffer读取数据。
int len = socketChannel.read(buf);
if (len > 0) {
byte[] str = new byte[len];
buf.rewind();
buf.get(str, 0, len);
buf.clear();
System.out.println("获取客户端数据:" + new String(str));

// 给客户端回复数据
String temp = "服务器回复: 已经收到您发送的数据,祝您一路平安!";
buf.clear();
buf.put(temp.getBytes());
buf.flip();
socketChannel.write(buf);
System.out.println("已经向客户端回复");
//此处可以利用ChannelManager向其他所有通道广播数据。只要在ChannelManager中写一个广播方法即可
}
} else if (key.isAcceptable()) { // 如果是接受客户端连接就绪
// 从key中获取对应的通道
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 接受这个连接。
SocketChannel socketChannel = serverChannel.accept();
// 如果连接不为空,则控制台打印 连接的客户端地址。
if (socketChannel != null) {
// 由于关闭selector的时候,并不会关闭通道,最好使用一个容器,将通道都保存起来
//然后开启心跳连接,如果通道出现异常则关闭此通道,当应用程序关闭的时候,关闭所有的通道。
ChannelManager.addChannel(socketChannel);

System.out.println(String.format("接收到客户端的连接:IP[%s]-端口[%s]",
socketChannel.socket().getPort(), socketChannel.socket().getInetAddress()));
// 把这个通道设置为非阻塞模式,然后又注册到selector中,等待事件发生
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
} //写事件在缓冲区直接达到阈值时候出发,一般不注册写事件。
// 通道处理完毕后 将此通道删除。如果下次又有此时间,会有一个新的key,所以记得删除处理过的key。
iter.remove();
}

}
}
} catch (IOException e) {
System.out.println("服务器断开连接");
}

try {
// 注意此处只会关闭,selector使注册到琪上面的selectionKeys无效,通道本身不会关闭。
selector.close();
ChannelManager.closeChannles();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}通道管理类:
import java.io.IOException;
import java.nio.channels.Channel;
import java.util.LinkedList;

public class ChannelManager {
private static LinkedList<Channel> list = new LinkedList<>();

private static Thread thread; //用于开启心跳测试通道连接,实现省略

public static void addChannel(Channel channel) {
list.add(channel);
}

//关闭所有的通道连接
public static void closeChannles() {
for (Channel channel : list) {
try {
channel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("关闭通道失败");
}
list.remove();
}
}
}

一个简单的客户端类: 这个脚本可以直接通过cmd 编译运行。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

public class MainChannel {

public static void main(String[] args) {
// TODO Auto-generated method stub
SocketChannel socketChannel = null;
boolean flag = true;
Scanner in = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
socketChannel = SocketChannel.open();

// 设置连接为非阻塞模式,可能未连接完成就返回
// socketChannel.configureBlocking(false);
socketChannel.socket().connect(new InetSocketAddress("192.168.1.100", 8881));

// 判断是否连接成功,等待连接成功
while (!socketChannel.finishConnect()) {
}

while (flag) {

String temp = in.nextLine();

buffer.clear();

buffer.put(temp.getBytes());

// limit指针 移动到 position位置
buffer.flip();

// 当buffer中有足够空间,则写到buffer中
while (buffer.hasRemaining())
socketChannel.write(buffer);

if ("exit".equals(temp))
flag = false;
}

} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("与服务断开连接");
}

if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}

上述代码,可以实现客户端向服务器发送消息,然后服务器接受到消息,在自己的控制台打印输出。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息