您的位置:首页 > 编程语言 > Java开发

JAVA多进程服务器与客户机实现收发异步

2016-09-24 18:44 344 查看
核心: 非阻塞NIO的使用,发送和接收使用不同的Buffer

icesongqiang

java

客户端类

/**
*************************************************************
* 1. 客户端采用阻塞模式接入服务器
* 2. 接入后使用NIO事件机制一直查询可以读或者写的事件---主进程
* 3. 写入的数据 = 要传数据,也跟随主线程一直运行,但是写的内容可以由用户自己决定:
* *3.1.  使用send_data(String data)将用户数据写入sendBuffer中
* *3.2.  在主线程检测可以写的时候,调用send(SelectionKey key)函数
*  把sendBuffer中的内容写入该key关联的SocketChannel中
* 4. 读数据 : 跟随主线程一直运行,可以读就直接读
*******************参考java网络编程精解 例程4-4 ***************
************************************** -- [icesongqiang]
*****
*/
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;                // for Set

class EchoClient{
/**
* Selector类, 只要ServerSocketChannel及SocketChannel向
* Selector 注册了特定的事件,Selector类就会监控这些事件是否发生
* register()方法,返回SelectionKey对象
*/
private Selector selector = null;
private SocketChannel socketChannel = null;
private int s_port = 8000;
private String s_IP = "192.168.100.141";
private Charset charset = Charset.forName("GBK");
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);

public EchoClient(String s_IP, int s_port) throws IOException{
this.s_IP = s_IP;
this.s_port = s_port;
socketChannel = SocketChannel.open();
//InetAddress ia = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(this.s_IP, this.s_port);
// 采用阻塞模式连接服务器
socketChannel.connect(isa);
// 设置为非阻塞模式
socketChannel.configureBlocking(false);
System.out.println("与服务器的连接建立成功");
selector = Selector.open();
}

/*// 接收用户从控制台输入的数据,把它放到sendBuffer中
public void receiveFromUser(){
try{
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
synchronized(sendBuffer){
sendBuffer.put(encode(msg + "\r\n"));
}
if(msg.equals("bye"))
break;
}
}catch(IOException e){
e.printStackTrace();
}
}*/

// 接收指定的数据,把它放到sendBuffer中
public void send_data(String msg){
if(msg!= null){
synchronized(sendBuffer){
System.out.println("write to sendBuffer...");
sendBuffer.clear();                        // 把极限设为容量,再把位置设为0
sendBuffer.put(encode(msg+"\n"));
}
}
}

// 接收和发送数据
public void talk() throws IOException{
System.out.println(socketChannel.toString());
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while(selector.select()>0){
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> it = readyKeys.iterator();
while(it.hasNext()){
SelectionKey key = null;
try{
key = it.next();
it.remove();

if(key.isReadable()){receive(key);}
if(key.isWritable()){send(key);}

}catch(IOException e){
e.printStackTrace();
try{
if(key!=null){
key.cancel();
key.channel().close();
}
}catch(IOException ex){ex.printStackTrace();}
}
}
}
}

private void send(SelectionKey key)throws IOException{
// 发送sendBuffer中的数据
//System.out.println("sending to server...");
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized(sendBuffer){
sendBuffer.flip();                    // 把极限设为位置,再把位置设为0
socketChannel.write(sendBuffer);
sendBuffer.compact();                 // 删除已经发送的数据
}
}

private void receive(SelectionKey key)throws IOException{
// 接收EchoServer发送的数据,把它放到receiveBuffer中
// 如果receiveBuffer中有一行数据,就打印这行数据,然后把它从recviveBuffer中删除
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData = decode(receiveBuffer);
if(receiveData.indexOf("\n")==-1) return;

String outputData = receiveData.substring(0, receiveData.indexOf("\n")+1);
System.out.print(outputData);

if(outputData.equals("echo:bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("关闭与服务器的连接");
selector.close();
System.exit(0);
}

ByteBuffer temp = encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact();                    // 删除已经打印的数据
}

/**解码*/
private String decode(ByteBuffer buffer){
CharBuffer charbuff = charset.decode(buffer);
return charbuff.toString();
}

/**编码*/
private ByteBuffer encode(String str){
return charset.encode(str);
}

/*public void main(String[] argv)throws IOException{
try {
echoclient = new EchoClient(m_ServerIP, m_ServerPort);
Thread c_talk = new Thread(){
public void run(){
try {
echoclient.talk();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
c_talk.start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}*/

}


服务器

服务器类

/**
* 1. 参考EchoClient类的实现,只是现在服务器为了接收多个客户端,
* 2. 继承了Thread类,其run()方法就是负责阻塞的accept()SocketChannel,
* 3. 然后根据accept到的每个SocketChannel的不同,建立doEchoServer的类,
*    专门用于处理其中的一个连接,每一个连接就和CLient是相同的用法
* ************************************* --[icesongqiang]
*/

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;                // for Set

class EchoServer extends Thread{
/**
* Selector类, 只要ServerSocketChannel及SocketChannel向
* Selector 注册了特定的事件,Selector类就会监控这些事件是否发生
* register()方法,返回SelectionKey对象
*/

private ServerSocketChannel serverSocketChannel = null;
private int port = 8000;
private int count =0;
private List<DoEchoServer> list_DoEchoServer = new ArrayList<DoEchoServer>();

public EchoServer() throws IOException{
// 静态方法:返回一个ServerSocketChannel对象, 没有与任何端口绑定,并且处于阻塞模式
s
10535
erverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);                  // 复用端口
serverSocketChannel.configureBlocking(true);                         // 等待接入。默认阻塞模式,false 非阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(port));      // 绑定端口
System.out.println("服务器启动");
}

public void run(){
for(;;){
try{
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("接收到客户连接,来自:" +
socketChannel.socket().getInetAddress() +
":" + socketChannel.socket().getPort());
// 连接了一个SocketChannel, 开启一个相应的进程
++ count;
DoEchoServer doEchoServer = new DoEchoServer(socketChannel);
list_DoEchoServer.add(doEchoServer);
Thread s_talk = new Thread(){
public void run(){
try {
doEchoServer.talk();
} catch (IOException ex) {
// TODO Auto-generated catch block
ex.printStackTrace();
}
}
};
s_talk.start();

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

Iterator<DoEchoServer> it = list_DoEchoServer.iterator();
while(it.hasNext()){
DoEchoServer ds = it.next();
if(ds.get_iswork()){ds.send_data("welcome to connect, now " + count + " clients");}
else{
it.remove();
--count;
}
}

}catch(IOException e){e.printStackTrace();}
}
}

public void broadcast(){
while(true){
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

Iterator<DoEchoServer> it = list_DoEchoServer.iterator();
while(it.hasNext()){
DoEchoServer ds = it.next();
if(ds.get_iswork()){ds.send_data("BroadCast: now " + count + " clients");}
else{
it.remove();
--count;
}
}
}
}

}


服务器分支进程

/**
* 客户相应的执行方法, 主线程利用为每个用户都开了这样的一个进程*/
class DoEchoServer {
private SocketChannel socketChannel = null;
private String clientIP= null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK");
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
private boolean iswork = true;                                 // 监测当前连接是否正常工作(发送&&接收)

public DoEchoServer(SocketChannel socketChannel) throws IOException {
// TODO Auto-generated constructor stub
this.socketChannel = socketChannel;
clientIP = socketChannel.socket().getInetAddress().toString();
clientIP = clientIP.substring(1);
//System.out.println("DoEchoServer: " + socketChannel.toString());
// 为SocketChannel监控接收连接就绪事件
selector = Selector.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
}

public SocketChannel get_SocketChannel(){ return this.socketChannel;}

public boolean get_iswork(){return iswork;}

// 发送和接收数据
public void talk() throws IOException{
while(selector.select()>0){
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> it = readyKeys.iterator();
while(it.hasNext()){
SelectionKey key = null;
try{
key = it.next();
it.remove();
if(key.isWritable()){send(key);}
if(key.isReadable()){receive(key);}
}catch(IOException e){
iswork = false;      // 出现异常,表示该SocketChannel已经断开
e.printStackTrace();
try{
if(key!=null){
key.cancel();
key.channel().close();
}
}catch(IOException ex){ex.printStackTrace();}
}
}
}
}

// 接收指定的数据,把它放到sendBuffer中
public void send_data(String msg){
if(msg!= null){
synchronized(sendBuffer){
sendBuffer.clear();                        // 把极限设为容量,再把位置设为0
sendBuffer.put(encode(msg+"\r\n"));
}
}
}

// 将sendBuffer中的内容发送到与key绑定的信道上
private void send(SelectionKey key)throws IOException{
// 发送sendBuffer中的数据
//System.out.println("sending to server...");
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized(sendBuffer){
sendBuffer.flip();                    // 把极限设为位置,再把位置设为0
socketChannel.write(sendBuffer);
sendBuffer.compact();                 // 删除已经发送的数据
}
}

private void receive(SelectionKey key)throws IOException{
// 接收EchoClient发送的数据,把它放到receiveBuffer中
// 如果receiveBuffer中有一行数据,就打印这行数据,然后把它从recviveBuffer中删除
SocketChannel socketChannel = (SocketChannel) key.channel();
//System.out.println(socketChannel.toString());
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData = decode(receiveBuffer);
if(receiveData.indexOf("\n")==-1) return;
String outputData = receiveData.substring(0, receiveData.indexOf("\n")+1);
dosomething(outputData);
/*
if(outputData.equals("echo:bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("关闭与客户端的连接");
selector.close();
System.exit(0);
}*/

ByteBuffer temp = encode(outputData);
receiveBuffer.position(temp.limit());
receiveBuffer.compact();                    // 删除已经打印的数据
}

/**解码*/
private String decode(ByteBuffer buffer){
CharBuffer charbuff = charset.decode(buffer);
return charbuff.toString();
}

/**编码*/
private ByteBuffer encode(String str){
return charset.encode(str);
}

/**wrete your own code there */
private void dosomething(String str){
// wrete your own code there
// 这里还可以配合进程处理进行其他进程的开关,具体可以参考[java 多进程]
}

}


服务器启动

import java.io.IOException;

public class MyServer {

public static void main(String[] args) {
// TODO Auto-generated method stub
try {
EchoServer echoServer = new EchoServer();
Thread s_accept = new Thread(){
public void run(){
echoServer.run();
}
};
s_accept.start();

echoServer.broadcast();

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息