您的位置:首页 > 运维架构 > Apache

Apache Mina----基础入门

2016-07-17 13:34 597 查看

入门服务器端程序需要做什么?


1.需要NioSocketAcceptor

2.设置编码解码过滤器

3.设置一些session的属性

4.绑定一个端口

public class MinaServer {

private static final int PORT = 7080;
private static IoAcceptor accept = null;

public static void main(String[] args) {
try {
accept = new NioSocketAcceptor();
accept.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
accept.getFilterChain().addFirst("filter", new MyServerFilter());
accept.getSessionConfig().setReadBufferSize(1024);
accept.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
//自定义实现的handler类
accept.setHandler(new MyServerHandler());
accept.bind(new InetSocketAddress(PORT));
System.out.println("Server --- >> " + PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class MyServerHandler extends IoHandlerAdapter {

@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("sessionCreated");
}

@Override
public void sessionOpened(IoSession session) throws Exception {
System.out.println("sessionOpened");
}

@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("sessionClosed");
}

@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("sessionIdle");
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
System.out.println("exceptionCaught");
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String msg = (String)message;
System.out.println("服务器端接受到数据" + msg);
//      if(msg.equals("quit")){
//          session.close();
//      }
Date date = new Date();
session.write(date);
}

@Override
public void messageSent(IoSession session, Object message) throws Exception {
System.out.println("messageSent");
session.close();
}

}


入门级客户端程序需要什么?


1.需要NioSocketConnector

2.设置编码解码过滤器

3.连接一个远程的服务器端程序,并发送数据

public class MinaClient {

private static IoSession session = null;
private static final String host = "127.0.0.1";
private static final int port = 7080;

public static void main(String[] args) {
NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(3000);
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"),
LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
connector.getFilterChain().addFirst("filter", new MyClientFilter());
connector.setHandler(new MyClientHandler());
ConnectFuture future = connector.connect(new InetSocketAddress(host, port));
future.awaitUninterruptibly();//一直等待连接
session = future.getSession();
session.write("你好");
session.getCloseFuture().awaitUninterruptibly();//等待关闭连接
connector.dispose();
}
}
public class MyClientHandler extends IoHandlerAdapter {

@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
System.out.println("exceptionCaught");
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String msg = (String)message;
System.out.println("客户端收到数据" + msg);
}

}


Mina类图


长连接和短连接:

可以通过在自定义的业务处理handler的类的messagesend方法中加入session.close()方法,使得原本的长连接方式变成短连接方式

Mina常用的类和接口:

IOService接口:用于描述客户端和服务端接口,其常用子类是IoAcceptor和IoConnector

IoProcessor:多线程环境来处理我们的连接请求流程

IOFilter:提供数据的过滤工作:包括编解码,日志等信息的过滤

IOHandler:业务处理,自定义的handler需要继承IoHandlerAdapter

IOSession:客户端和服务端连接的描述,常常用于接受和发送数据

IOService的作用:


IOFilter:


10293
应用程序和网络这块的传输,就是二进制数据和对象之间的相互转换,有相应的编码和解码器,这也是我们的过滤器的一种,我们对过滤器还可以做日志,消息确认等功能

IOFilter类图:是在应用层和业务层之间的过滤层

完成自定义过滤器:就是在handler处理之前,需要调用相应的过滤器进行过滤

IOSession:

主要描述我们的网络通信双方所建立的连接之间的描述。

主要作用:可以完成对于连接的一些管理,可以发送或者读取数据,并且可以设置会话的上下文信息。

IOSessionConfig:

提供对于连接的配置信息的描述,比如读缓冲区的设置等等 。设置读写缓冲区的一些信息,读写的空闲时间,以及设置读写超时时间

IOProcessor:

以NIO为基础来实现的以多线程的方式来完成我们的读写操作

IOProcessor的作用:为filter读写原始数据提供了多线程的环境,如果使用NIO来实现,需要自己写一个非阻塞读写的多少线程环境

通过NioSocketAcceptor/NioSocketConnector(int processorCount)构造函数可以指定多线程的个数

IOBuffer:

基于javaNIO中的ByteBuffer做了封装,用户操作缓冲区中的数据,包括基本数据类

型以及字节数组和一些对象,其本质就是一个可动态扩展的byte数组

IOBuffer的索引属性:

Capacity:代表当前缓冲区的大小

Position:理解成当前读写位置,也可以理解成下一个可读数据单元的位置,一般Position<=Capacity的时候,可以完成数据的读写操作

Limit:就是下一个不可以被读写的缓冲区单元的位置,一般情况下,Limit<=Capacity

注意:filp():就是懒limit=position,position=0,为读取缓冲区中的数据做好准备,因为有时候,limit!=position,一般在发送数据之前调用

自定义协议

1.自定义的编解码工厂:要实现编解码工厂就要实现ProtocolCodecFactory接口

2.实现自定义的编解码器:解码器要实现ProtocolDecoder接口,编码器要实现ProtocolEncoder接口

3.为什么要使用自定义的编码器:因为在工作中往往不是通过一个字符串就可以传输所有的信息。我们传输的通常是自定义的协议包,并且能在应用程序和网络通信中存在对象和二进制流之间的转换关系,所有我们需要结合业务来编写自定义的编解码器

4.常用的自定义协议的方法:

1.定长的方式:比如AA,BB,CC,DD等。

2.定界符:比如helloworld|youxigu|….|…这种通过“|”特殊字符来区别消息的形式,这种方式会出现粘包,半包的情况,比如第一个包是hello,第二个包是world|youxigu,那么第二个包“|”前面的东西是不是一个完整的包就不能确定了。

3.自定义协议包的形式:分为包头和包体,包头中包括数据包的版本号,以及整个数据包(包头+包体)的长度,而包体就是实际的数据

实例:

1.采用的自定义协议:包头(length,flag),包体(content)

2.完成通过客户端不断的发送指定数目的自定义数据包,然后在服务器端解析,这个过程中需要解决半包的问题

实例实现:

自定义协议包

public class MyProtocolPack {

private int length;
private byte flag;
private String content;

public MyProtocolPack(byte flag, String content) {
this.flag = flag;
this.content = content;
//这里5就是包头中的length的4个字节和flag的1个字节的长度
this.length = 5 + content == null ? 0 : content.getBytes().length;
}

public int getLength() {
return length;
}

public void setLength(int length) {
this.length = length;
}

public byte getFlag() {
return flag;
}

public void setFlag(byte flag) {
this.flag = flag;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

@Override
public String toString() {
return "MyProtocolPack [length=" + length + ", flag=" + flag + ", content=" + content + "]";
}
}


自定义编码器,解码器,编解码器工厂

public class MyProtocolEncoder extends ProtocolEncoderAdapter {

private final Charset charset;

public MyProtocolEncoder(Charset charset){
this.charset = charset;
}

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
MyProtocolPack msg = (MyProtocolPack)message;
IoBuffer buffer = IoBuffer.allocate(msg.getLength());
buffer.setAutoExpand(true);
buffer.putInt(msg.getLength());
buffer.put(msg.getFlag());
if(msg.getContent() != null){
buffer.put(msg.getContent().getBytes());
}
buffer.flip();
out.write(buffer);
}

}
public class MyProtocolDecoder implements ProtocolDecoder {

private final AttributeKey CONTEXT = new AttributeKey(this.getClass(), "context");
private final Charset charset;
private int maxPackLength = 100;

public MyProtocolDecoder() {
this(Charset.defaultCharset());
}

public MyProtocolDecoder(Charset charset) {
this.charset = charset;
}

public int getMaxPackLength() {
return maxPackLength;
}

public void setMaxPackLength(int maxPackLength) {
if (maxPackLength < 0) {
throw new IllegalArgumentException("maxPackLength参数:" + maxPackLength);
}
this.maxPackLength = maxPackLength;
}

public Context getContext(IoSession session) {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx == null) {
ctx = new Context();
session.setAttribute(CONTEXT, ctx);
}
return ctx;
}

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
final int packHeadLength = 5;
Context ctx = this.getContext(session);
ctx.append(in);
IoBuffer buffer = ctx.getBuffer();
buffer.flip();
while (buffer.remaining() >= packHeadLength) {
buffer.mark();
int length = buffer.getInt();
byte flag = buffer.get();
if (length < 0 || length > maxPackLength) {
buffer.reset();
break;
} else if (length >= packHeadLength && length - packHeadLength <= buffer.remaining()) {
int oldLimit = buffer.limit();
buffer.limit(buffer.position() + length - packHeadLength);
String content = buffer.getString(ctx.getDecoder());
buffer.limit(oldLimit);
MyProtocolPack pack = new MyProtocolPack(flag, content);
out.write(pack);
} else {
// 这种情况下说明接受到的是一个半包
buffer.clear();
break;
}
}
if (buffer.hasRemaining()) {
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buffer);
temp.flip();
buffer.reset();
buffer.put(temp);
}else{
buffer.reset();
}
}

public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {

}

public void dispose(IoSession session) throws Exception {
Context ctx = (Context) session.getAttribute(CONTEXT);
if (ctx != null) {
session.removeAttribute(CONTEXT);
}
}

/**
* 这个类出现是为了可以检查到半包的情况
*/
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buffer;

public Context() {
decoder = charset.newDecoder();
buffer = IoBuffer.allocate(80).setAutoExpand(true);
}

public void append(IoBuffer buffer) {
this.getBuffer().put(buffer);
}

public void reset() {
decoder.reset();
}

public IoBuffer getBuffer() {
return buffer;
}

public void setBuffer(IoBuffer buffer) {
this.buffer = buffer;
}

public CharsetDecoder getDecoder() {
return decoder;
}

}
}

public class MyPtotocolFactory implements ProtocolCodecFactory {

private final MyProtocolDecoder decoder;
private final MyProtocolEncoder encoder;

public MyPtotocolFactory(Charset charset){
decoder = new MyProtocolDecoder(charset);
encoder = new MyProtocolEncoder(charset);
}

public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}

public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}

}


服务端实现

public class MinaProtocolServer {

private static final int PORT = 7080;

public static void main(String[] args) throws Exception {
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MyPtotocolFactory(Charset.forName("UTF-8"))));
acceptor.getSessionConfig().setReadBufferSize(1024);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
acceptor.setHandler(new MyHandler());
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("MinaProtocolServer start");
}
}

class MyHandler extends IoHandlerAdapter {

@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("server--->sessionIdle");
}

@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
System.out.println("server--->exceptionCaught");
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
MyProtocolPack pack = (MyProtocolPack) message;
System.out.println("服务器端接受数据" + pack);
}

}


客户端实现

public class MinaProtocolClient {

private static final String HOST = "127.0.0.1";
private static final int PORT = 7080;
private static long counter = 0;
private static final int fil = 100;
private static long start = 0;

public static void main(String[] args) {
start = System.currentTimeMillis();
IoConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new MyPtotocolFactory(Charset.forName("UTF-8"))));
connector.getSessionConfig().setReadBufferSize(1024);
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
connector.setHandler(new MyHandler());
ConnectFuture future = connector.connect(new InetSocketAddress(HOST, PORT));
future.addListener(new IoFutureListener<ConnectFuture>() {

public void operationComplete(ConnectFuture future) {
if (future.isConnected()) {
IoSession session = future.getSession();
sendData(session);
}
}

});
}

private static void sendData(IoSession session) {
for (int i = 0; i < fil; i++) {
String content = "helloworld" + i;
MyProtocolPack pack = new MyProtocolPack((byte) i, content);
session.write(pack);
System.out.println("客户端发送数据:" + pack);
}
}
}

class MyHandler extends IoHandlerAdapter {

@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
if (status == IdleStatus.READER_IDLE) {
session.close(true);
}
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
MyProtocolPack pack = (MyProtocolPack) message;
System.out.println("client --- > " + pack);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  apache 服务器