您的位置:首页 > 其它

Mina框架学习笔记(二)

2013-07-09 15:35 232 查看
上一篇只写了一个服务端。这一次来构建一个客户端。



首先,在引入 上一篇中讲到的几个jar包处,还要引入一个mina-example-2.0.0.jar



本程序的主要功能是,客户端向服务器发送几个数字,然后服务器给客户端 返回结果。文字就不多写了,我在上面都写了注释!



下面的服务端代码:

[java] view
plaincopy

import java.io.IOException;

import java.net.InetSocketAddress;

import org.apache.mina.example.sumup.codec.SumUpProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MinaServer {

private static final int PORT = 8389,BUF_SIZE = 2048;

// Set this to false to use object serialization instead of custom codec.

private static final boolean USE_CUSTOM_CODEC = true;

public static void main(String[] args) throws IOException {

NioSocketAcceptor acceptor = new NioSocketAcceptor();



// Add 'codec' filter

if(USE_CUSTOM_CODEC){

acceptor.getFilterChain().addLast("codec",

new ProtocolCodecFilter(

new SumUpProtocolCodecFactory(false)));

} else {

acceptor.getFilterChain().addLast("codec",

new ProtocolCodecFilter(

new ObjectSerializationCodecFactory()));

}



//This filter will log all information such as newly created

//sessions, messages received, messages sent, session closed

acceptor.getFilterChain().addLast("logger", new LoggingFilter());



acceptor.setHandler(new MinaServerHandler());



acceptor.getSessionConfig().setReadBufferSize(BUF_SIZE);

//the first parameter defines what actions to check for when

//determining if a session is idle, the second parameter defines

//the length of time in seconds that must occur before a session

//is deemed to be idle.

//acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);



acceptor.bind(new InetSocketAddress(PORT));

System.out.println("Listening on port:"+PORT);



}

}





它使用到了一个自定义的Handler来管理连接过程中的各个事件,代码如下:

[java] view
plaincopy

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.example.sumup.ServerSessionHandler;

import org.apache.mina.example.sumup.message.AddMessage;

import org.apache.mina.example.sumup.message.ResultMessage;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MinaServerHandler extends IoHandlerAdapter {

private static final String SUM_KEY = "sum";

private final static Logger LOGGER = LoggerFactory.getLogger(ServerSessionHandler.class);



@Override

public void exceptionCaught(IoSession session, Throwable cause)

throws Exception {

session.close(true);

cause.printStackTrace();

}

@Override

public void messageReceived(IoSession session, Object message)

throws Exception {

// client only sends AddMessage. otherwise, we will have to identify

// its type using instanceof operator.

AddMessage am = (AddMessage) message;



//Add the value to the current sum.

int sum = ((Integer) session.getAttribute(SUM_KEY)).intValue();

int value = am.getValue();

long expectedSum = (long) sum+value;



if(expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE){

//If the sum overflows or underflows , return error message.

ResultMessage rMessage = new ResultMessage();

rMessage.setSequence(am.getSequence()); // copy sequence

rMessage.setOk(false);

session.write(rMessage);

} else {

//sum up

sum = (int) expectedSum;

session.setAttribute(SUM_KEY,new Integer(sum));



//return the result message.

ResultMessage rmMessage = new ResultMessage();

rmMessage.setSequence(am.getSequence()); //copy sequece



rmMessage.setOk(true);

rmMessage.setValue(sum);

session.write(rmMessage);

}

}

@Override

public void messageSent(IoSession session, Object message) throws Exception {

System.out.println("Message sent:"+message);

}

@Override

public void sessionIdle(IoSession session, IdleStatus status)

throws Exception {

LOGGER.info("Disconnecting the idle...");

//disconnect an idle client

session.close(true);

}

@Override

public void sessionOpened(IoSession session) throws Exception {

//Set idle time to 60 seconds

session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);



//Initial sum is 0

session.setAttribute(SUM_KEY,new Integer(0));

}

}





接下去就是客户端了。一样的,先写一个主类,然后再写一上Handler来管理事件。



[java] view
plaincopy

import java.net.InetSocketAddress;

import org.apache.mina.core.RuntimeIoException;

import org.apache.mina.core.future.ConnectFuture;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.example.sumup.codec.SumUpProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class MinaClient {

private final static long DEF_TIMEOUT = 60*1000L; //1 minute

// Set this to false to use object serialization instead of custom codec.

private static final boolean USE_CUSTOM_CODEC = true;

// Server and port

private static final int PORT = 8389;

private static final String SERVER = "127.0.0.1";

private static IoSession session;



/**

* @param args

* @throws InterruptedException

*/

public static void main(String[] args) throws InterruptedException {

if (args.length == 0) {

System.out.println("Please specify the list of any integers");

return;

}



//prepare values to sum up

int len = args.length;

int[] values = new int[len];

for(int i = 0; i < len; i ++){

values[i] = Integer.parseInt(args[i]);

}



// Create TCP/IP connector.

NioSocketConnector connector = new NioSocketConnector();



//Set connect timeout

connector.setConnectTimeoutMillis(DEF_TIMEOUT);



// Add 'codec' filter

if(USE_CUSTOM_CODEC){

connector.getFilterChain().addLast("codec",

new ProtocolCodecFilter(

new SumUpProtocolCodecFactory(false)));

} else {

connector.getFilterChain().addLast("codec",

new ProtocolCodecFilter(

new ObjectSerializationCodecFactory()));

}



connector.getFilterChain().addLast("logger", new LoggingFilter());

//Start communication

connector.setHandler(new NetCatProtocolHandler(values));



//If it fails to connect to the server,

//retry it after 10 seconds!

while(true){

try{

ConnectFuture future = connector.connect(new InetSocketAddress(SERVER,PORT));

future.awaitUninterruptibly();

session = future.getSession();

break;

} catch (RuntimeIoException e) {

System.err.println("Fail to connect!");

e.printStackTrace();

Thread.sleep(10*1000L);

}

}



//Wait for the connection attempt to be finished.

session.getCloseFuture().awaitUninterruptibly();



connector.dispose();

}

}





Handler类

[java] view
plaincopy

import org.apache.mina.core.service.IoHandlerAdapter;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.example.sumup.ClientSessionHandler;

import org.apache.mina.example.sumup.message.AddMessage;

import org.apache.mina.example.sumup.message.ResultMessage;

import org.slf4j.LoggerFactory;

public class NetCatProtocolHandler extends IoHandlerAdapter {



private int[] values;

private boolean finished;

private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(ClientSessionHandler.class);



// provide a method for other class to judge whether it's finished.

public boolean isFinished() {

return finished;

}

public NetCatProtocolHandler(int[] values) {

this.values = values;

}

@Override

public void exceptionCaught(IoSession session, Throwable cause)

throws Exception {

session.close(true);

}

@Override

public void messageReceived(IoSession session, Object message)

throws Exception {

// IoBuffer buffer = (IoBuffer) message;

// while(buffer.hasRemaining()){

// System.out.println(buffer.getChar());

// }

// System.out.flush();





// server only sends ResultMessage. otherwise, we will have to identify

// its type using instanceof operator.

ResultMessage rm = (ResultMessage)message;

if(rm.isOk()){ // server returned OK code.

// if received the result message which has the last sequence

// number, it is time to disconnect.

if(rm.getSequence() == values.length - 1){

//Print the sum and disconnect.

LOGGER.warn("Server error, disconnecting...");

session.close(true);



finished = true;

}

}

}

@Override

public void messageSent(IoSession session, Object message) throws Exception {

session.write(message);

System.out.println("Message sent:"+message);

}

@Override

public void sessionClosed(IoSession session) throws Exception {

System.err.println("Total "+ session.getReadBytes()+" byte(s)");

}

@Override

public void sessionIdle(IoSession session, IdleStatus status)

throws Exception {

if(status == IdleStatus.READER_IDLE) {

session.close(true);

}

}

@Override

public void sessionOpened(IoSession session) throws Exception {

// Set reader idle time to 60 seconds.

// sessionIdle(...) method will be invoked when no data is read

// for 60 seconds.

//session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);



// send summation requests

for(int i = 0; i < values.length; i ++){

AddMessage message = new AddMessage();

message.setSequence(i);

message.setValue(values[i]);



session.write(message);

}

}



}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: