健壮的、便捷的、异步的SocketChannel实现
2014-12-01 20:02
260 查看
http://www.oschina.net/question/54100_33530
Socket通信比较常见的问题有如下几种:
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信。
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
// Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)
package asynchronizedchannel;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
final class TcpChannel
{
private long endTime;
private SelectionKey key;
public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException
{
boolean done = false;
Selector selector = null;
this.endTime = endTime;
try {
selector = Selector.open();
channel.configureBlocking(false);
key = channel.register(selector, op);
done = true;
} finally {
if (!done && selector != null) {
selector.close();
}
if (!done) {
channel.close();
}
}
}
static void blockUntil(SelectionKey key, long endTime) throws IOException
{
long timeout = endTime - System.currentTimeMillis();
int nkeys = 0;
if (timeout > 0) {
nkeys = key.selector().select(timeout);
} else if (timeout == 0) {
nkeys = key.selector().selectNow();
}
if (nkeys == 0) {
throw new SocketTimeoutException();
}
}
void cleanup()
{
try {
key.selector().close();
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
void bind(SocketAddress addr) throws IOException
{
SocketChannel channel = (SocketChannel) key.channel();
channel.socket().bind(addr);
}
void connect(SocketAddress addr) throws IOException
{
SocketChannel channel = (SocketChannel) key.channel();
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
try {
if (!key.isConnectable()) {
blockUntil(key, endTime);
}
if (!channel.connect(addr) && !channel.finishConnect()) {
throw new ConnectException();
}
} finally {
if (key.isValid()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
}
}
}
void send(ByteBuffer buffer) throws IOException
{
Send.operate(key, buffer, endTime);
}
void recv(ByteBuffer buffer) throws IOException
{
Recv.operate(key, buffer, endTime);
}
}
interface Operator
{
class Operation
{
static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException
{
final SocketChannel channel = (SocketChannel) key.channel();
final int total = buffer.capacity();
key.interestOps(op);
try {
while (buffer.position() < total) {
if (System.currentTimeMillis() > endTime) {
throw new SocketTimeoutException();
}
if ((key.readyOps() & op) != 0) {
if (optr.io(channel, buffer) < 0) {
throw new EOFException();
}
} else {
TcpChannel.blockUntil(key, endTime);
}
}
} finally {
if (key.isValid()) {
key.interestOps(0);
}
}
}
}
int io(SocketChannel channel, ByteBuffer buffer) throws IOException;
}
class Send implements Operator
{
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
{
return channel.write(buffer);
}
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
{
Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);
}
public static final Send operator = new Send();
}
class Recv implements Operator
{
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
{
return channel.read(buffer);
}
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
{
Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);
}
public static final Recv operator = new Recv();
}
使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:
Client端,代码演示:
重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。
Socket通信比较常见的问题有如下几种:
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信。
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
// Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)
package asynchronizedchannel;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
final class TcpChannel
{
private long endTime;
private SelectionKey key;
public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException
{
boolean done = false;
Selector selector = null;
this.endTime = endTime;
try {
selector = Selector.open();
channel.configureBlocking(false);
key = channel.register(selector, op);
done = true;
} finally {
if (!done && selector != null) {
selector.close();
}
if (!done) {
channel.close();
}
}
}
static void blockUntil(SelectionKey key, long endTime) throws IOException
{
long timeout = endTime - System.currentTimeMillis();
int nkeys = 0;
if (timeout > 0) {
nkeys = key.selector().select(timeout);
} else if (timeout == 0) {
nkeys = key.selector().selectNow();
}
if (nkeys == 0) {
throw new SocketTimeoutException();
}
}
void cleanup()
{
try {
key.selector().close();
key.channel().close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
void bind(SocketAddress addr) throws IOException
{
SocketChannel channel = (SocketChannel) key.channel();
channel.socket().bind(addr);
}
void connect(SocketAddress addr) throws IOException
{
SocketChannel channel = (SocketChannel) key.channel();
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
try {
if (!key.isConnectable()) {
blockUntil(key, endTime);
}
if (!channel.connect(addr) && !channel.finishConnect()) {
throw new ConnectException();
}
} finally {
if (key.isValid()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
}
}
}
void send(ByteBuffer buffer) throws IOException
{
Send.operate(key, buffer, endTime);
}
void recv(ByteBuffer buffer) throws IOException
{
Recv.operate(key, buffer, endTime);
}
}
interface Operator
{
class Operation
{
static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException
{
final SocketChannel channel = (SocketChannel) key.channel();
final int total = buffer.capacity();
key.interestOps(op);
try {
while (buffer.position() < total) {
if (System.currentTimeMillis() > endTime) {
throw new SocketTimeoutException();
}
if ((key.readyOps() & op) != 0) {
if (optr.io(channel, buffer) < 0) {
throw new EOFException();
}
} else {
TcpChannel.blockUntil(key, endTime);
}
}
} finally {
if (key.isValid()) {
key.interestOps(0);
}
}
}
}
int io(SocketChannel channel, ByteBuffer buffer) throws IOException;
}
class Send implements Operator
{
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
{
return channel.write(buffer);
}
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
{
Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);
}
public static final Send operator = new Send();
}
class Recv implements Operator
{
public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
{
return channel.read(buffer);
}
public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
{
Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);
}
public static final Recv operator = new Recv();
}
使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:
package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.MessageDigest; import java.util.Iterator; public class Server { /** * 服务端通信范例程序主函数 * * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // Create the selector final Selector selector = Selector.open(); final ServerSocketChannel server = ServerSocketChannel.open(); server.configureBlocking(false); server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5); // Register both channels with selector server.register(selector, SelectionKey.OP_ACCEPT); new Thread(new Daemon(selector)).start(); } } class Daemon implements Runnable { private final Selector selector; Daemon(Selector selector) { this.selector = selector; } public void run() { while (true) { try { // Wait for an event selector.select(); // Get list of selection keys with pending events Iterator<SelectionKey> it = selector.selectedKeys().iterator(); // Process each key while (it.hasNext()) { // Get the selection key SelectionKey selKey = it.next(); // Remove it from the list to indicate that it is being processed it.remove(); // Check if it's a connection request if (selKey.isAcceptable()) { // Get channel with connection request ServerSocketChannel server = (ServerSocketChannel) selKey.channel(); // Accept the connection request. // If serverSocketChannel is blocking, this method blocks. // The returned channel is in blocking mode. SocketChannel channel = server.accept(); // If serverSocketChannel is non-blocking, sChannel may be null if (channel != null) { // Use the socket channel to communicate with the client new Thread(new ServerHandler(channel)).start(); } else { System.out.println("---No Connection---"); // There were no pending connection requests; try again later. // To be notified of connection requests, } } } } catch (Exception ex) { ex.printStackTrace(); } } } } class ServerHandler implements Runnable { private static final long timeout = 30 * 1000; // 设置超时时间为30秒 private static int counter = 0; private final TcpChannel channel; private final MessageDigest md; ServerHandler(SocketChannel channel) throws Exception { this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ); md = MessageDigest.getInstance("md5"); } public void run() { try { while (true) { work(); synchronized (ServerHandler.class) { if ((++counter & 65535) == 0) { System.out.println(counter); } } } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException { // 模拟工作流程 byte[] cache = new byte[256], reply = new byte[5]; read(cache, reply); } private void read(byte[] cache, byte[] reply) throws IOException { // 从套接字读入数据 channel.recv(ByteBuffer.wrap(cache)); md.reset(); md.update(cache, 0, 240); byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码 if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较 reply[0] = '?'; System.out.println("MISMATCH!"); } else { reply[0] = '.'; } channel.send(ByteBuffer.wrap(reply)); // 返回接收结果 } } final class ExtArrays { private ExtArrays() { } public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len) { // 字节数组的部分比较 if (a == null || b == null) { return false; } if (offset_a + len > a.length || offset_b + len > b.length) { return false; } for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) { if (a[i] != b[j]) { return false; } } return true; } }
Client端,代码演示:
package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.DigestException; import java.security.MessageDigest; import java.util.Random; public class Client { private static int id = 0; /** * 客户端通信范例程序主函数 * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); } } class ClientHandler implements Runnable { private static final long timeout = 30 * 1000; // 设置超时时间为30秒 private final TcpChannel channel; private final int id; private final MessageDigest md; private final Random rand; ClientHandler(int id) throws Exception { this.id = id; channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE); md = MessageDigest.getInstance("md5"); rand = new Random(); } @Override public void run() { try { channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656)); int i = 0; while (true) { work(); if ((++i & 16383) == 0) { System.out.println(String.format("client(%1$d): %2$d", id, i)); } Thread.yield(); } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException, DigestException { byte[] cache = new byte[256], reply = new byte[5]; write(cache, reply); } private void write(byte[] cache, byte[] reply) throws DigestException, IOException { rand.nextBytes(cache); // 只用前面的240字节 md.reset(); md.update(cache, 0, 240); md.digest(cache, 240, 16); // MD5校验码占后面16字节 ByteBuffer buffer = ByteBuffer.wrap(cache); channel.send(buffer); buffer = ByteBuffer.wrap(reply); channel.recv(buffer); if (reply[0] != '.') { // 若接收的结果不正确,可以考虑尝试再次发送 System.out.println("MISMATCH!"); } } }
重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。
相关文章推荐
- 通过Socket实现进程间异步通讯(一)
- 通过Socket实现进程间异步通讯(一)
- 文件异步操作的实现
- 在C#中使用异步Socket编程实现TCP网络服务的C/S的通讯构架(一)----基础类库部分
- 用jms实现异步web服务操作
- 在C#中使用异步Socket编程实现TCP网络服务的C/S的通讯构架(一)----基础类库部分
- 使用XmlHttp结合ASP实现网页的异步调用
- 在C#中使用异步Socket编程实现TCP网络服务的C/S的通讯构架(一)----基础类库部分
- 动态网页中javascript的异步提交实现
- 通过Socket实现进程间异步通讯(二)
- 通过Socket实现进程间异步通讯(四)
- 实现支持文件分块多点异步上传的 Web Services 及其客户端(非Web)应用程序调用相关异步执行的 Web Method
- 文件异步操作的实现
- 异步方法的实现
- 通过Socket实现进程间异步通讯(三)
- c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码)
- 通过Socket实现进程间异步通讯(三)
- 使用XmlHttp结合ASP实现网页的异步调用
- 一个COM异步实现接口列集(Marshal)源代码实例
- Windows Sockets API实现网络异步通讯