您的位置:首页 > 移动开发 > IOS开发

[MINA2.0源码](二)客户端发起连接请求——NioSocketConnector

2013-08-09 16:05 141 查看
NioSocketConnector负责客户端向服务端发起连接请求,会创建一个NioSocketSession来负责读写操作。功能与服务端的NioSocketAcceptor相对应,他们都是IoServer的子类,功能类似。

MINA客户端代码:

package mina;

import java.net.InetSocketAddress;
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class TestMinaClient {
private IoConnector connector;
private String host = "localhost";
private int port = 12340;
private InetSocketAddress address = new InetSocketAddress(host, port);
private IoSession session;
private IoFilter filter = new ProtocolCodecFilter(new TextLineCodecFactory());

public void connectServer() {
connector = new NioSocketConnector(1);

// 同步调用,即请求后需等待服务端返回消息
connector.getSessionConfig().setUseReadOperation(true);

// 编码过滤器
connector.getFilterChain().addLast("codec", filter);

ConnectFuture future = connector.connect(address);
//等待连接完成
future.awaitUninterruptibly();

session = future.getSession();
}

public void writeAndRead(){
//写数据
session.write("Hello,Server!I'm Client.");
//读数据
ReadFuture rf = session.read().awaitUninterruptibly();
System.out.println(rf.getMessage().toString());
session.close(true);
}

public static void main(String[] args){
TestMinaClient client = new TestMinaClient();
//建立连接
client.connectServer();
//读写操作
client.writeAndRead();
}
}
一、connector = new NioSocketConnector(1);
和NioSocketAcceptor一样,也会创建SimpleIoProcessorPool池存放NioProcessor对象,会初始化Selector对象。

二、ConnectFuture future = connector.connect(address);

开始连接服务端

此方法最终会调用到AbstractPollingIoConnector.connect0()

protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
H handle = null;
boolean success = false;
try {
handle = newHandle(localAddress);
if (connect(handle, remoteAddress)) {
ConnectFuture future = new DefaultConnectFuture();
T session = newSession(processor, handle);
initSession(session, future, sessionInitializer);
// Forward the remaining process to the IoProcessor.
session.getProcessor().add(session);
success = true;
return future;
}

success = true;
} catch (Exception e) {
return DefaultConnectFuture.newFailedFuture(e);
} finally {
if (!success && handle != null) {
try {
close(handle);
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
}
}
}

ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
connectQueue.add(request);
startupWorker();
wakeup();

return request;
}

(一)handle = newHandle(localAddress);方法实现在NioSocketConnector中

protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
SocketChannel ch = SocketChannel.open();

int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
if (receiveBufferSize > 65535) {
ch.socket().setReceiveBufferSize(receiveBufferSize);
}

if (localAddress != null) {
ch.socket().bind(localAddress);
}
ch.configureBlocking(false);
return ch;
}


该方法返回客户端通道对象:SocketChannel ch = SocketChannel.open();

(二)connect(handle, remoteAddress);方法实现在NioSocketConnector中

只有一行:return handle.connect(remoteAddress);也就是SocketChannel.connect();

因为开头设置了同步的方式connector.getSessionConfig().setUseReadOperation(true);所以会等请求建立后返回true,

接着初始化session。如果是异步方式,返回的是false,意味着连接还没完全建立,将不会执行之后的session初始化。

不过这个配置有时起不到作用,是MINA的bug。接着看看已非阻塞方式如何完成连接的建立。

(三)startupWorker();处理非阻塞的方式建立连接。

private void startupWorker() {
if (!selectable) {
connectQueue.clear();
cancelQueue.clear();
}

Connector connector = connectorRef.get();

if (connector == null) {
connector = new Connector();

if (connectorRef.compareAndSet(null, connector)) {
executeWorker(connector);
}
}
}
创建NioSocketConnector父类AbstractPollingIoConnector的多线程内部类Connector并启动。

public void run() {
assert (connectorRef.get() == this);

int nHandles = 0;

while (selectable) {
try {
// the timeout for select shall be smaller of the connect
// timeout or 1 second...
int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
int selected = select(timeout);

nHandles += registerNew();

// get a chance to get out of the connector loop, if we don't have any more handles
if (nHandles == 0) {
connectorRef.set(null);

if (connectQueue.isEmpty()) {
assert (connectorRef.get() != this);
break;
}

if (!connectorRef.compareAndSet(null, this)) {
assert (connectorRef.get() != this);
break;
}

assert (connectorRef.get() == this);
}

if (selected > 0) {
nHandles -= processConnections(selectedHandles());
}

processTimedOutSessions(allHandles());

nHandles -= cancelKeys();
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
break;
} catch (Throwable e) {
ExceptionMonitor.getInstance().exceptionCaught(e);

try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
ExceptionMonitor.getInstance().exceptionCaught(e1);
}
}
}

if (selectable && isDisposing()) {
selectable = false;
try {
if (createdProcessor) {
processor.dispose();
}
} finally {
try {
synchronized (disposalLock) {
if (isDisposing()) {
destroy();
}
}
} catch (Exception e) {
ExceptionMonitor.getInstance().exceptionCaught(e);
} finally {
disposalFuture.setDone();
}
}
}
}

阻塞在int selected = select(timeout);接着被AbstractPollingIoConnector.wakeup()唤醒后,继续执行。

和NioSocketAcceptor的父类AbstractPollingIoAcceptor中处理Acceptor一样的方式。

private int registerNew() {
int nHandles = 0;
for (;;) {
ConnectionRequest req = connectQueue.poll();
if (req == null) {
break;
}

H handle = req.handle;
try {
register(handle, req);
nHandles++;
} catch (Exception e) {
req.setException(e);
try {
close(handle);
} catch (Exception e2) {
ExceptionMonitor.getInstance().exceptionCaught(e2);
}
}
}
return nHandles;
}
register方法将在通道上绑定对OP_CONNECT的监听。

      protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
handle.register(selector, SelectionKey.OP_CONNECT, request);
}
等初始化完成后又阻塞在int selected = select(timeout);等服务端SocketServerChannel.accept();完成建立连接。

而主程序TestMinaClient.connectServer()中会阻塞在future.awaitUninterruptibly();等待连接建立。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: