(四)Mina源码解析之IoSession
2016-04-11 00:42
471 查看
本篇文章主要剖析IoSession的在mina中的作用,IoSession的继承结构图如下:
在Mina中所有的连接都被封装成了IoSession(关于Mina是如何将连接封装成IoSession的,请参见Mina源码解析之IoService),所有的Session的信息都存放到一个叫IoServiceListenerSupport的类中,这里是IoSession和IoServiceListener(这是一个监听器,这里我们暂且不用管和监听器有关的东西,后面会有专门的文章来讲解)的大本营,该类的部分代码如下:
一旦连接建立成功后,数据的读写都是在process线程中进行管理的(如果想了解详细的细节可以看Mina源码解析之IoService),在AbstractPollingIoProcessor类中有个read(S)方法,该方法就是专门负责读取数据用的,代码如下:
在Mina中所有的连接都被封装成了IoSession(关于Mina是如何将连接封装成IoSession的,请参见Mina源码解析之IoService),所有的Session的信息都存放到一个叫IoServiceListenerSupport的类中,这里是IoSession和IoServiceListener(这是一个监听器,这里我们暂且不用管和监听器有关的东西,后面会有专门的文章来讲解)的大本营,该类的部分代码如下:
public class IoServiceListenerSupport { /** The {@link IoService} that this instance manages. */ private final IoService service; /** 所有的监听器都存放到这里. */ private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>(); /**所有的session都放到这个map中 */ private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>(); //① /** 只读的session */ private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions); private final AtomicBoolean activated = new AtomicBoolean(); /** listenerSupport最近一次被激活的时间 */ private volatile long activationTime; /** listenerSupport创建以后session连接的峰值 */ private volatile int largestManagedSessionCount = 0; /** A global counter to count the number of sessions managed since the start */ private AtomicLong cumulativeManagedSessionCount = new AtomicLong(0);代码1处存放着Server端管理的所有session,AbstractIoService的属性listeners就是一个IoServiceListenerSupport对象,当创建session时,server端的mina会调用IoServiceListenerSupport的fireSessionCreated方法,该方法的代码如下:
public void fireSessionCreated(IoSession session) { boolean firstSession = false; if (session.getService() instanceof IoConnector) { synchronized (managedSessions) { firstSession = managedSessions.isEmpty(); } } // If already registered, ignore. if (managedSessions.putIfAbsent(session.getId(), session) != null) {//①这里会将新建立的session放到managedSessions属性中 return; } // If the first connector session, fire a virtual service activation event. if (firstSession) { fireServiceActivated(); } // Fire session events. IoFilterChain filterChain = session.getFilterChain(); filterChain.fireSessionCreated(); //这里调用过滤器链的sessionCreated方法 filterChain.fireSessionOpened();//这里调用过滤器链的sessionOpened方法 int managedSessionCount = managedSessions.size(); if (managedSessionCount > largestManagedSessionCount) { largestManagedSessionCount = managedSessionCount;//更新session连接的峰值数 } cumulativeManagedSessionCount.incrementAndGet(); // Fire listener events. for (IoServiceListener l : listeners) { try { l.sessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }当session连接断开时会调用IoServiceListenerSupport类的fireSessionDestroyed方法,该方法会将session从managedSessions属性中移除,这里就不列出代码了,请同学们自己查看源代码,session在哪里管理我们已经知道了,但是每个session连接都会有很多信息,比如当前session的ID、从这个session总共接收了多少个字节的信息、session上次读写信息是在什么时候等等,这些信息都保持在哪里呢?答案是在AbstractIoSession中(因为这些属性都是session的固有特性,所以存在这里),由于属性太多就不一一列出了,那么这些属性都是什么时候更新的呢,当然是涉及到它们的特性发生变化的时候了,举例来说,sessionId属性是唯一标示一个session的,那么这个属性一定是创建session的时候设置的,readBytes属性记录读取的字节数,那么这个属性就是接收数据时更新的,这写信息的管理都很简单,这里就不一一列出了,如果感兴趣的同学可以看看AbstractIoSession的源代码。上面说了这么多都是再讲session的管理。session的固有特性,但是对于session来说最重要的都不是这些,对于session来说最重要的是读写数据,下面就一起来看看session中是如何读写数据的
一旦连接建立成功后,数据的读写都是在process线程中进行管理的(如果想了解详细的细节可以看Mina源码解析之IoService),在AbstractPollingIoProcessor类中有个read(S)方法,该方法就是专门负责读取数据用的,代码如下:
private void read(S session) { IoSessionConfig config = session.getConfig(); int bufferSize = config.getReadBufferSize(); IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();//对于socket连接来说这个值为true try { int readBytes = 0; int ret; try { if (hasFragmentation) { while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); //① 动态减少数据缓冲区ReadBufferSize } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize();//② 动态增加数据缓存区ReadBufferSize } } } if (ret < 0) { // scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireInputClosed(); } } catch (Exception e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { scheduleRemove(session); } } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }其实这就是一段用nio来读取数据的方法,如果对nio还不太熟的同学建议去补习一下nio的知识,这里主要说两个问题,第一如果启动readBufferSize设置的很小或者不设置其实都没有关系,因为在代码1、 2处会根据设置的或者默认的readBufferSize和接收的字节数的大小进行比较来进行动态调整。第二这里接收的数据都是二进制的数据,一般情况下我们的应用都不会直接处理二进制的数据的,所以我们要将接收的数据进行转换,在mina中我们使用编解码器来完成数据的转换,包括数据的粘包、断包等问题的处理都是在编解码器中解决的,关于编辑码器的源码剖析会在下一篇文章中进行。
相关文章推荐
- 使用ruby部署工具mina快速部署nodejs应用教程
- Mina源代码阅读—概述
- Avaya G700 媒体网关
- 常见条形码大全
- Apache Mina 2.0.9 Quick Start Guide
- 基于Google Protobuff和Mina的RPC
- 服务器及中间件:Apache MINA 线程模型配置
- mina 使用手记 1
- mina 使用手记 2
- mina 使用手记3
- mina 使用手记 4
- Mina 服务器启动报错,“SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder" “的解决方案
- 在Java中寻找ConcurrentHashSet吗?
- Apache Mina的SSL连接API
- java的nio框架
- 基于mina框架的语音聊天服务器
- 课程1
- 分享一个自己在用的apache mina的解码器decoder源码
- 从代码实现角度来分析mina