JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
2012-05-09 13:04
555 查看
Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。
Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。
本文结构:
(1)客户端和服务器代码 ;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。
(2)超时 和Session的几个实际问题
(3)心跳 ,纠正几个错误
既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。
请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html 。
版本2.0 RC1
1.1 服务器端
view plain
NioDatagramAcceptor acceptor =
new NioDatagramAcceptor();
acceptor.setHandler(new
MyIoHandlerAdapter()); //你的业务处理,最简单的,可以extends IoHandlerAdapter
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
chain.addLast("keep-alive"
, new HachiKeepAliveFilterInMina());
//心跳
chain.addLast("toMessageTyep" ,
new MyMessageEn_Decoder());
//将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本
//acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
chain.addLast("logger" ,
new LoggingFilter());
DatagramSessionConfig dcfg = acceptor.getSessionConfig();
dcfg.setReuseAddress(true );
acceptor.bind(new InetSocketAddress(ClusterContext.getHeartBeatPort()));
1.2 客户端
view plain
NioDatagramConnector connector =
new NioDatagramConnector();
connector.setConnectTimeoutMillis(60000L);
connector.setConnectTimeoutCheckInterval(10000
);
connector.setHandler(handler);
DefaultIoFilterChainBuilder chain = connector.getFilterChain();
chain.addLast("keep-alive"
, new HachiKeepAliveFilterInMina());
//心跳
chain.addLast("toMessageTyep" ,
new MyMessageEn_Decoder());
chain.addLast("logger" ,
new LoggingFilter());
ConnectFuture connFuture = connector.connect(new
InetSocketAddress( "10.1.1.1" ,
8001 ));
connFuture.awaitUninterruptibly();
IoSession session = connFuture.getSession();
//发送消息长整型 1000
IoBuffer buffer = IoBuffer.allocate(8
);
buffer.putLong(1000
);
buffer.flip();
session.write(buffer);
//关闭连接
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
2. 超时的几个经验总结:
udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。
Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。
3. 心跳 机制
在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。
在上面代码实现中的HachiKeepAliveFilterInMina如下:
view plain
public
class HachiKeepAliveFilterInMina extends
KeepAliveFilter {
private
static final
int INTERVAL = 30 ;
//in seconds
private
static final
int TIMEOUT = 10 ;
//in seconds
public HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {
super (messageFactory, IdleStatus.BOTH_IDLE,
new ExceptionHandler(), INTERVAL, TIMEOUT);
}
public HachiKeepAliveFilterInMina() {
super (
new KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE,
new ExceptionHandler(), INTERVAL, TIMEOUT);
this .setForwardEvent(
false ); //此消息不会继续传递,不会被业务层看见
}
}
class ExceptionHandler
implements KeepAliveRequestTimeoutHandler {
public
void keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session)
throws Exception {
System.out.println("Connection lost, session will be closed"
);
session.close(true );
}
}
/**
* 继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息
* @author Liu Liu
*
*/
class KeepAliveMessageFactoryImpl
implements KeepAliveMessageFactory {
private
static final
byte int_req = - 1 ;
private
static final
byte int_rep = - 2 ;
private
static final IoBuffer KAMSG_REQ = IoBuffer.wrap(
new byte
[]{int_req});
private
static final IoBuffer KAMSG_REP = IoBuffer.wrap(
new byte
[]{int_rep});
public Object getRequest(IoSession session) {
return KAMSG_REQ.duplicate();
}
public Object getResponse(IoSession session, Object request) {
return KAMSG_REP.duplicate();
}
public
boolean isRequest(IoSession session, Object message) {
if (!(message
instanceof IoBuffer))
return
false ;
IoBuffer realMessage = (IoBuffer)message;
if (realMessage.limit() !=
1 )
return
false ;
boolean result = (realMessage.get() == int_req);
realMessage.rewind();
return result;
}
public
boolean isResponse(IoSession session, Object message) {
if (!(message
instanceof IoBuffer))
return
false ;
IoBuffer realMessage = (IoBuffer)message;
if (realMessage.limit() !=
1 )
return
false ;
boolean result = (realMessage.get() == int_rep);
realMessage.rewind();
return result;
}
}
有人说:心跳机制的filter只需要服务器端具有即可——这是错误 的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现 。
另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。
更多可参考:
http://www.ibm.com/developerworks/cn/java/j-lo-mina2
Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。
本文结构:
(1)客户端和服务器代码 ;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。
(2)超时 和Session的几个实际问题
(3)心跳 ,纠正几个错误
既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。
请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html 。
版本2.0 RC1
1.1 服务器端
view plain
NioDatagramAcceptor acceptor =
new NioDatagramAcceptor();
acceptor.setHandler(new
MyIoHandlerAdapter()); //你的业务处理,最简单的,可以extends IoHandlerAdapter
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
chain.addLast("keep-alive"
, new HachiKeepAliveFilterInMina());
//心跳
chain.addLast("toMessageTyep" ,
new MyMessageEn_Decoder());
//将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本
//acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
chain.addLast("logger" ,
new LoggingFilter());
DatagramSessionConfig dcfg = acceptor.getSessionConfig();
dcfg.setReuseAddress(true );
acceptor.bind(new InetSocketAddress(ClusterContext.getHeartBeatPort()));
1.2 客户端
view plain
NioDatagramConnector connector =
new NioDatagramConnector();
connector.setConnectTimeoutMillis(60000L);
connector.setConnectTimeoutCheckInterval(10000
);
connector.setHandler(handler);
DefaultIoFilterChainBuilder chain = connector.getFilterChain();
chain.addLast("keep-alive"
, new HachiKeepAliveFilterInMina());
//心跳
chain.addLast("toMessageTyep" ,
new MyMessageEn_Decoder());
chain.addLast("logger" ,
new LoggingFilter());
ConnectFuture connFuture = connector.connect(new
InetSocketAddress( "10.1.1.1" ,
8001 ));
connFuture.awaitUninterruptibly();
IoSession session = connFuture.getSession();
//发送消息长整型 1000
IoBuffer buffer = IoBuffer.allocate(8
);
buffer.putLong(1000
);
buffer.flip();
session.write(buffer);
//关闭连接
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
2. 超时的几个经验总结:
udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。
Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。
3. 心跳 机制
在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。
在上面代码实现中的HachiKeepAliveFilterInMina如下:
view plain
public
class HachiKeepAliveFilterInMina extends
KeepAliveFilter {
private
static final
int INTERVAL = 30 ;
//in seconds
private
static final
int TIMEOUT = 10 ;
//in seconds
public HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {
super (messageFactory, IdleStatus.BOTH_IDLE,
new ExceptionHandler(), INTERVAL, TIMEOUT);
}
public HachiKeepAliveFilterInMina() {
super (
new KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE,
new ExceptionHandler(), INTERVAL, TIMEOUT);
this .setForwardEvent(
false ); //此消息不会继续传递,不会被业务层看见
}
}
class ExceptionHandler
implements KeepAliveRequestTimeoutHandler {
public
void keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session)
throws Exception {
System.out.println("Connection lost, session will be closed"
);
session.close(true );
}
}
/**
* 继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息
* @author Liu Liu
*
*/
class KeepAliveMessageFactoryImpl
implements KeepAliveMessageFactory {
private
static final
byte int_req = - 1 ;
private
static final
byte int_rep = - 2 ;
private
static final IoBuffer KAMSG_REQ = IoBuffer.wrap(
new byte
[]{int_req});
private
static final IoBuffer KAMSG_REP = IoBuffer.wrap(
new byte
[]{int_rep});
public Object getRequest(IoSession session) {
return KAMSG_REQ.duplicate();
}
public Object getResponse(IoSession session, Object request) {
return KAMSG_REP.duplicate();
}
public
boolean isRequest(IoSession session, Object message) {
if (!(message
instanceof IoBuffer))
return
false ;
IoBuffer realMessage = (IoBuffer)message;
if (realMessage.limit() !=
1 )
return
false ;
boolean result = (realMessage.get() == int_req);
realMessage.rewind();
return result;
}
public
boolean isResponse(IoSession session, Object message) {
if (!(message
instanceof IoBuffer))
return
false ;
IoBuffer realMessage = (IoBuffer)message;
if (realMessage.limit() !=
1 )
return
false ;
boolean result = (realMessage.get() == int_rep);
realMessage.rewind();
return result;
}
}
有人说:心跳机制的filter只需要服务器端具有即可——这是错误 的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现 。
另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。
更多可参考:
http://www.ibm.com/developerworks/cn/java/j-lo-mina2
相关文章推荐
- JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
- JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
- JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
- JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)
- 《Java从入门到放弃》框架入门篇:使用注解的方式配置hibernate映射关系
- 【原创】NIO框架入门(二):服务端基于MINA2的UDP双向通信Demo演示
- MJRefresh框架使用的几个小细节
- Java Struts2 框架入门详解(一)MVC架构详解以及Struts基本概述
- Java使用mina作为网络框架
- java mina框架使用
- java语言基础(75)——集合框架(Set集合的概述及使用)
- java语言基础(71)——集合框架(增强for的概述和使用)
- Java Ehcache缓存框架入门级使用实例
- 【JUnit4】JUnit4——Java单元测试必备工具(第1章概述 第2章入门 第3章 JUnit4使用详解 )
- (转)Java任务调度框架Quartz入门教程指南(二) 使用job、trigger、schedule调用定时任务
- Java基础知识强化之集合框架笔记32:集合之可变参数的概述和使用
- java语言基础(68)——集合框架(泛型概述和使用)
- 【原创】NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战
- Java任务调度框架Quartz入门教程指南(二) 使用job、trigger、schedule调用定时任务
- java之集合框架使用细节及常用方法