[MINA2.0源码](二)客户端发起连接请求——NioSocketConnector
2013-08-09 16:05
141 查看
NioSocketConnector负责客户端向服务端发起连接请求,会创建一个NioSocketSession来负责读写操作。功能与服务端的NioSocketAcceptor相对应,他们都是IoServer的子类,功能类似。
MINA客户端代码:
和NioSocketAcceptor一样,也会创建SimpleIoProcessorPool池存放NioProcessor对象,会初始化Selector对象。
二、ConnectFuture future = connector.connect(address);
开始连接服务端
此方法最终会调用到AbstractPollingIoConnector.connect0()
(一)handle = newHandle(localAddress);方法实现在NioSocketConnector中
该方法返回客户端通道对象:SocketChannel ch = SocketChannel.open();
(二)connect(handle, remoteAddress);方法实现在NioSocketConnector中
只有一行:return handle.connect(remoteAddress);也就是SocketChannel.connect();
因为开头设置了同步的方式connector.getSessionConfig().setUseReadOperation(true);所以会等请求建立后返回true,
接着初始化session。如果是异步方式,返回的是false,意味着连接还没完全建立,将不会执行之后的session初始化。
不过这个配置有时起不到作用,是MINA的bug。接着看看已非阻塞方式如何完成连接的建立。
(三)startupWorker();处理非阻塞的方式建立连接。
阻塞在int selected = select(timeout);接着被AbstractPollingIoConnector.wakeup()唤醒后,继续执行。
和NioSocketAcceptor的父类AbstractPollingIoAcceptor中处理Acceptor一样的方式。
而主程序TestMinaClient.connectServer()中会阻塞在future.awaitUninterruptibly();等待连接建立。
MINA客户端代码:
package mina; import java.net.InetSocketAddress; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.ReadFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class TestMinaClient { private IoConnector connector; private String host = "localhost"; private int port = 12340; private InetSocketAddress address = new InetSocketAddress(host, port); private IoSession session; private IoFilter filter = new ProtocolCodecFilter(new TextLineCodecFactory()); public void connectServer() { connector = new NioSocketConnector(1); // 同步调用,即请求后需等待服务端返回消息 connector.getSessionConfig().setUseReadOperation(true); // 编码过滤器 connector.getFilterChain().addLast("codec", filter); ConnectFuture future = connector.connect(address); //等待连接完成 future.awaitUninterruptibly(); session = future.getSession(); } public void writeAndRead(){ //写数据 session.write("Hello,Server!I'm Client."); //读数据 ReadFuture rf = session.read().awaitUninterruptibly(); System.out.println(rf.getMessage().toString()); session.close(true); } public static void main(String[] args){ TestMinaClient client = new TestMinaClient(); //建立连接 client.connectServer(); //读写操作 client.writeAndRead(); } }一、connector = new NioSocketConnector(1);
和NioSocketAcceptor一样,也会创建SimpleIoProcessorPool池存放NioProcessor对象,会初始化Selector对象。
二、ConnectFuture future = connector.connect(address);
开始连接服务端
此方法最终会调用到AbstractPollingIoConnector.connect0()
protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) { H handle = null; boolean success = false; try { handle = newHandle(localAddress); if (connect(handle, remoteAddress)) { ConnectFuture future = new DefaultConnectFuture(); T session = newSession(processor, handle); initSession(session, future, sessionInitializer); // Forward the remaining process to the IoProcessor. session.getProcessor().add(session); success = true; return future; } success = true; } catch (Exception e) { return DefaultConnectFuture.newFailedFuture(e); } finally { if (!success && handle != null) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer); connectQueue.add(request); startupWorker(); wakeup(); return request; }
(一)handle = newHandle(localAddress);方法实现在NioSocketConnector中
protected SocketChannel newHandle(SocketAddress localAddress) throws Exception { SocketChannel ch = SocketChannel.open(); int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize(); if (receiveBufferSize > 65535) { ch.socket().setReceiveBufferSize(receiveBufferSize); } if (localAddress != null) { ch.socket().bind(localAddress); } ch.configureBlocking(false); return ch; }
该方法返回客户端通道对象:SocketChannel ch = SocketChannel.open();
(二)connect(handle, remoteAddress);方法实现在NioSocketConnector中
只有一行:return handle.connect(remoteAddress);也就是SocketChannel.connect();
因为开头设置了同步的方式connector.getSessionConfig().setUseReadOperation(true);所以会等请求建立后返回true,
接着初始化session。如果是异步方式,返回的是false,意味着连接还没完全建立,将不会执行之后的session初始化。
不过这个配置有时起不到作用,是MINA的bug。接着看看已非阻塞方式如何完成连接的建立。
(三)startupWorker();处理非阻塞的方式建立连接。
private void startupWorker() { if (!selectable) { connectQueue.clear(); cancelQueue.clear(); } Connector connector = connectorRef.get(); if (connector == null) { connector = new Connector(); if (connectorRef.compareAndSet(null, connector)) { executeWorker(connector); } } }创建NioSocketConnector父类AbstractPollingIoConnector的多线程内部类Connector并启动。
public void run() { assert (connectorRef.get() == this); int nHandles = 0; while (selectable) { try { // the timeout for select shall be smaller of the connect // timeout or 1 second... int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L); int selected = select(timeout); nHandles += registerNew(); // get a chance to get out of the connector loop, if we don't have any more handles if (nHandles == 0) { connectorRef.set(null); if (connectQueue.isEmpty()) { assert (connectorRef.get() != this); break; } if (!connectorRef.compareAndSet(null, this)) { assert (connectorRef.get() != this); break; } assert (connectorRef.get() == this); } if (selected > 0) { nHandles -= processConnections(selectedHandles()); } processTimedOutSessions(allHandles()); nHandles -= cancelKeys(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop break; } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } }
阻塞在int selected = select(timeout);接着被AbstractPollingIoConnector.wakeup()唤醒后,继续执行。
和NioSocketAcceptor的父类AbstractPollingIoAcceptor中处理Acceptor一样的方式。
private int registerNew() { int nHandles = 0; for (;;) { ConnectionRequest req = connectQueue.poll(); if (req == null) { break; } H handle = req.handle; try { register(handle, req); nHandles++; } catch (Exception e) { req.setException(e); try { close(handle); } catch (Exception e2) { ExceptionMonitor.getInstance().exceptionCaught(e2); } } } return nHandles; }register方法将在通道上绑定对OP_CONNECT的监听。
protected void register(SocketChannel handle, ConnectionRequest request) throws Exception { handle.register(selector, SelectionKey.OP_CONNECT, request); }等初始化完成后又阻塞在int selected = select(timeout);等服务端SocketServerChannel.accept();完成建立连接。
而主程序TestMinaClient.connectServer()中会阻塞在future.awaitUninterruptibly();等待连接建立。
相关文章推荐
- [MINA2.0源码](三)服务端建立连接——NioSocketSession
- [MINA2.0源码](一)服务端建立监听——NioSocketAccepter
- mina2.0 源码分析--- 基于nio的服务端socket监听过程
- [MINA2.0源码](四)客户端完成建立连接
- 【深入剖析Linux协议栈】socket connect 发起连接请求
- MINA源码分析---对客户端设置连接间隔时间的过滤器
- 9.1 客户端发起请求源码
- Android Bluetooth蓝牙客户端发起对服务端连接建立请求过程(高版本Android兼容)
- [MINA2.0源码](五)客户端写数据——FilterChain
- Android-Mina-Java.lang.NoclassDefFoundError:org.apache.mina.transport.socket.NioSocketConnector
- android mina客户端连接抛java.net.SocketException: Tra...
- mina框架-------NioSocketAcceptor和NioSocketConnector对象
- mycat2.0源码分析01-接受客户端连接并发送握手报文
- java.lang.NoClassDefFoundError: org.apache.mina.transport.socket.nio.NioSocketConnector问题解决?
- Android Bluetooth蓝牙客户端发起对服务端连接建立请求过程(高版本Android兼容)
- Android 使用Socket实现服务器与手机客户端的长连接五:使用队列封装请求
- tcp socket客户端发送请求连接http服务
- Java NIO SocketChannel客户端例子(支持连接失败后自动重连)
- Linux网络协议栈 -- socket connect 发起连接请求
- mina的NioSocketAcceptor和NioSocketConnector