java 非阻塞通信的例子
2016-03-01 22:57
351 查看
package 创建非阻塞的EchoClient;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class EchoClient1{
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);
private int port=8000;
private SocketChannel socketChannel;
private Selector selector;
private Charset charset=Charset.forName("GBK");
public EchoClient1(){
try {
socketChannel=SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
socketChannel.configureBlocking(false);
System.out.println("客户端已经与服务器建立连接!");
selector=Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void LocalRead() throws IOException{
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
ByteBuffer buffer = charset.encode(msg+"\r\n");
synchronized(sendBuffer){
sendBuffer.put(buffer);
}
if(msg.equals("bye")) break;
}
}
public void talk(){
SelectionKey key=null;
try {
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while(selector.select()>0){
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isReadable()){
Receive(key);
}
if(key.isWritable())
Send(key);
}
}
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
void Send(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized(sendBuffer){
sendBuffer.flip();
socketChannel.write(sendBuffer);
sendBuffer.compact();
}
}
void Receive(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String data = charset.decode(receiveBuffer).toString();
if((data.indexOf("\r\n")==-1)) return ;
String outputData = data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);
ByteBuffer buffer = charset.encode(outputData);
receiveBuffer.position(buffer.limit());
receiveBuffer.compact();
if(outputData.equals("echo:bye")){
key.cancel();
key.channel().close();
System.out.println("与服务器断开连接");
selector.close();
System.exit(0);
}
}
public static void main(String[] args) {
EchoClient1 client = new EchoClient1();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
client.LocalRead();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
client.talk();
}
}
package 混合模式;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.net.*;
/*主线程负责接收和发送数据(非阻塞)
* 一个线程负责接受连接(阻塞)
* */
public class EchoServer {
private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
private int port = 8000;
private Charset charset = Charset.forName("GBK");
private Object gate=new Object();
private SelectionKey key;
public EchoServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.socket().setReuseAddress(true);
System.out.println("服务器已经启动!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void accept() {
try {
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(
"接受来自:" + socketChannel.socket().getInetAddress() + "端口:" + socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized (gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void service() {
while (true) {
try {
synchronized (gate) {}
//就算主线程先进来,主线程将在这里进行阻塞,当接受线程进入同步代码块时,将唤醒selector
//当接受线程在执行登记事件时,主线将在同步代码块中阻塞,带接受线程完成阻塞事件
int n = selector.select();
if (n == 0)
continue;
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(32);
socketChannel.read(readBuffer);
readBuffer.flip();
buffer.limit(buffer.capacity());
buffer.put(readBuffer);
}
public void send(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if (data.indexOf("\r\n") == -1)
return;
String outputData = data.substring(0, data.indexOf("\n") + 1);
System.out.println(outputData);
ByteBuffer reply = charset.encode("echo:" + outputData);
while(reply.hasRemaining())
socketChannel.write(reply);
ByteBuffer del = charset.encode(outputData);
buffer.position(del.limit());
buffer.compact();
if(outputData.equals("bye\r\n"))
{
if(key!=null)
{
key.cancel();
key.channel().close();
System.out.println("关闭与客户端的连接!");
}
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
EchoServer server = new EchoServer();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
server.accept();
}
}).start();
server.service();
}
}
package non_blocking;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;;
public class EchoServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int port=8000;
private Charset charset=Charset.forName("GBK");
public EchoServer(){
try {
selector= Selector.open();
serverSocketChannel= serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务端已经启动!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void service(){
SelectionKey key=null;
try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select()>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept();
System.out.println("接受来自"+socketChannel.socket().getInetAddress()+"端口:"+socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer=ByteBuffer.allocate(1024);
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if(key.isReadable()) { receive(key);}
if(key.isWritable()) { send(key);}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
public void receive(SelectionKey key) throws IOException{
ByteBuffer buffer = (ByteBuffer) key.attachment();
ByteBuffer buff = ByteBuffer.allocate(32);
SocketChannel socketChannel= (SocketChannel) key.channel();
socketChannel.read(buff);
buff.flip();
buffer.limit(buffer.capacity());
buffer.put(buff);
}
public void send(SelectionKey key) throws IOException{
ByteBuffer buffer= (ByteBuffer) key.attachment();
SocketChannel socketChannel= (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if(data.indexOf("\r\n")==-1) return;
String outputData=data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);
ByteBuffer echoBuffer=charset.encode("echo:"+outputData);
while(echoBuffer.hasRemaining())
socketChannel.write(echoBuffer);
ByteBuffer encode = charset.encode(outputData);
buffer.position(encode.limit());
buffer.compact();
if(outputData.equals("bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("关闭服务器之间的连接");
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new EchoServer().service();
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class EchoClient1{
private ByteBuffer sendBuffer=ByteBuffer.allocate(1024);
private ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);
private int port=8000;
private SocketChannel socketChannel;
private Selector selector;
private Charset charset=Charset.forName("GBK");
public EchoClient1(){
try {
socketChannel=SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), port));
socketChannel.configureBlocking(false);
System.out.println("客户端已经与服务器建立连接!");
selector=Selector.open();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void LocalRead() throws IOException{
BufferedReader localReader=new BufferedReader(new InputStreamReader(System.in));
String msg=null;
while((msg=localReader.readLine())!=null){
ByteBuffer buffer = charset.encode(msg+"\r\n");
synchronized(sendBuffer){
sendBuffer.put(buffer);
}
if(msg.equals("bye")) break;
}
}
public void talk(){
SelectionKey key=null;
try {
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
while(selector.select()>0){
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isReadable()){
Receive(key);
}
if(key.isWritable())
Send(key);
}
}
}
catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
void Send(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized(sendBuffer){
sendBuffer.flip();
socketChannel.write(sendBuffer);
sendBuffer.compact();
}
}
void Receive(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String data = charset.decode(receiveBuffer).toString();
if((data.indexOf("\r\n")==-1)) return ;
String outputData = data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);
ByteBuffer buffer = charset.encode(outputData);
receiveBuffer.position(buffer.limit());
receiveBuffer.compact();
if(outputData.equals("echo:bye")){
key.cancel();
key.channel().close();
System.out.println("与服务器断开连接");
selector.close();
System.exit(0);
}
}
public static void main(String[] args) {
EchoClient1 client = new EchoClient1();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
client.LocalRead();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
client.talk();
}
}
package 混合模式;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.net.*;
/*主线程负责接收和发送数据(非阻塞)
* 一个线程负责接受连接(阻塞)
* */
public class EchoServer {
private Selector selector = null;
private ServerSocketChannel serverSocketChannel = null;
private int port = 8000;
private Charset charset = Charset.forName("GBK");
private Object gate=new Object();
private SelectionKey key;
public EchoServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.socket().setReuseAddress(true);
System.out.println("服务器已经启动!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void accept() {
try {
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(
"接受来自:" + socketChannel.socket().getInetAddress() + "端口:" + socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized (gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void service() {
while (true) {
try {
synchronized (gate) {}
//就算主线程先进来,主线程将在这里进行阻塞,当接受线程进入同步代码块时,将唤醒selector
//当接受线程在执行登记事件时,主线将在同步代码块中阻塞,带接受线程完成阻塞事件
int n = selector.select();
if (n == 0)
continue;
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(32);
socketChannel.read(readBuffer);
readBuffer.flip();
buffer.limit(buffer.capacity());
buffer.put(readBuffer);
}
public void send(SelectionKey key) throws IOException {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if (data.indexOf("\r\n") == -1)
return;
String outputData = data.substring(0, data.indexOf("\n") + 1);
System.out.println(outputData);
ByteBuffer reply = charset.encode("echo:" + outputData);
while(reply.hasRemaining())
socketChannel.write(reply);
ByteBuffer del = charset.encode(outputData);
buffer.position(del.limit());
buffer.compact();
if(outputData.equals("bye\r\n"))
{
if(key!=null)
{
key.cancel();
key.channel().close();
System.out.println("关闭与客户端的连接!");
}
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
EchoServer server = new EchoServer();
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
server.accept();
}
}).start();
server.service();
}
}
package non_blocking;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;;
public class EchoServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int port=8000;
private Charset charset=Charset.forName("GBK");
public EchoServer(){
try {
selector= Selector.open();
serverSocketChannel= serverSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服务端已经启动!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void service(){
SelectionKey key=null;
try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while(selector.select()>0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept();
System.out.println("接受来自"+socketChannel.socket().getInetAddress()+"端口:"+socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
ByteBuffer buffer=ByteBuffer.allocate(1024);
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if(key.isReadable()) { receive(key);}
if(key.isWritable()) { send(key);}
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(key!=null){
key.cancel();
try {
key.channel().close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
public void receive(SelectionKey key) throws IOException{
ByteBuffer buffer = (ByteBuffer) key.attachment();
ByteBuffer buff = ByteBuffer.allocate(32);
SocketChannel socketChannel= (SocketChannel) key.channel();
socketChannel.read(buff);
buff.flip();
buffer.limit(buffer.capacity());
buffer.put(buff);
}
public void send(SelectionKey key) throws IOException{
ByteBuffer buffer= (ByteBuffer) key.attachment();
SocketChannel socketChannel= (SocketChannel) key.channel();
buffer.flip();
String data = charset.decode(buffer).toString();
if(data.indexOf("\r\n")==-1) return;
String outputData=data.substring(0, data.indexOf("\n")+1);
System.out.println(outputData);
ByteBuffer echoBuffer=charset.encode("echo:"+outputData);
while(echoBuffer.hasRemaining())
socketChannel.write(echoBuffer);
ByteBuffer encode = charset.encode(outputData);
buffer.position(encode.limit());
buffer.compact();
if(outputData.equals("bye\r\n")){
key.cancel();
socketChannel.close();
System.out.println("关闭服务器之间的连接");
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new EchoServer().service();
}
}
相关文章推荐
- Class.forName()、ClassLoader.loadClass()
- Java回调学习笔记
- spring动态数据源配置以及以及利用AOP自动设置
- Struts2的Action与ServletAPI的解耦
- Java学习笔记之面向对象的几个知识点
- Spring进阶之路(5)-Spring创建Bean的三种方式
- 支撑Spring的基础技术:反射,动态代理,cglib等
- windows下配置环境java环境变量
- javaWeb的session实现购物车
- Java多线程之Executor框架(2)
- Spring连接两个以上数据库
- struts(tags)
- Eclipse 使用 maven 构建demo
- struts(result)
- Struts(ognl)
- struts(一)
- STS: Spring Boot and MyBatis
- Spring学习笔记2016-3-1
- Java 集合类详解
- spring 中的@Resource