您的位置:首页 > 其它

使用Mina实现RPC调用,消息通知,广播。

2013-12-12 23:12 435 查看
在上一篇的基础上,增强功能。

common包,定义通用接口,和传递中使用的对象,对象需要实现序列化接口。

接口:

package mina.common;
public interface RpcInterface {
    public String getStringValue(String arg0,int arg1,Apple arg2);
    public int getIntValue();
    public void printPrice();
}

bean类,定义了一只苹果。

package mina.common;

import java.awt.Color;
import java.io.Serializable;
import java.util.Date;

public class Apple implements Serializable{

    private Color color = Color.BLACK;
    private double weight = 1.1;
    private double dia = 2.33;
    
    private int num = 3;
    private String name = "aaapple";
    
    private Date pdate = new Date();
    
    public Date getPdate() {
        return pdate;
    }
    public void setPdate(Date pdate) {
        this.pdate = pdate;
    }
    public Color getColor() {
        return color;
    }
    public void setColor(Color color) {
        this.color = color;
    }
    public double getDia() {
        return dia;
    }
    public void setDia(double dia) {
        this.dia = dia;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getNum() {
        return num;
    }
    public void setNum(int num) {
        this.num = num;
    }
    public double getWeight() {
        return weight;
    }
    public void setWeight(double weight) {
        this.weight = weight;
    }
}

server包,

Server类,启动监听,

package mina.server;

import java.net.InetSocketAddress;

import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;

public class Server {

    /** Choose your favorite port number. */
    private static final int PORT = 8080;

    private static final boolean USE_SSL = false;

    public static void main(String[] args) throws Exception {
        IoAcceptor acceptor = new SocketAcceptor();
        IoAcceptorConfig config = new SocketAcceptorConfig();
        DefaultIoFilterChainBuilder chain = config.getFilterChain();

        ((SocketSessionConfig) config.getSessionConfig()).setReuseAddress(true);
        // Add SSL filter if SSL is enabled.
        if (USE_SSL) {
            // addSSLSupport(chain);
        }

        // Bind
        acceptor.bind(new InetSocketAddress(PORT), new ServerHandler(), config);

        System.out.println("Listening on port " + PORT);
    }
}

RpcServerImpl实现类,实现common包中定义的接口,实际的处理的地方。

package mina.server;

import mina.common.Apple;
import mina.common.RpcInterface;

public class RpcServerImpl implements RpcInterface {

    public String getStringValue(String arg0,int arg1,Apple arg2) {
        System.out.println("apple time is "+arg2.getPdate());
        return "this is sign from server.";
    }

    public int getIntValue() {
        return 3;
    }

    public void printPrice() {
        System.out.println("******price**********");
        System.out.println("$10000000000000000000");
        System.out.println("*********************");
    }
}

ServerHandler类,处理客户端连接事务,和客户端通信处理的类。

package mina.server;

import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;
import org.apache.mina.integration.jmx.IoSessionManager;

public class ServerHandler extends StreamIoHandler {

    Hashtable sessionMgr = new Hashtable();
    List serverList = new ArrayList();
    ServerHandler() {
        serverList.add(new RpcServerImpl());
    }

    public void messageReceived(IoSession session, Object buf) {
        SocketAddress adr = session.getRemoteAddress();
        System.out.println("remote address is =" + adr.toString());
        System.out.println("buf=" + buf.toString());
        if (buf instanceof ByteBuffer) {
            ByteBuffer bb = (ByteBuffer) buf;
            System.out.println("bbb===" + bb);
            try {
                Properties prop = (Properties) bb.getObject();
                System.out.println("prop==" + prop);

                String interfaceName = (String) prop.get("interface");
                Iterator it = serverList.iterator();
                while (it.hasNext()) {// 查找实例
                    Object serobj = it.next();
                    Class[] clazz = serobj.getClass().getInterfaces();
                    if (isContains(clazz, interfaceName)) {// 找到相应实例
                        System.out.println("find.." + interfaceName);
                        int argc = Integer.parseInt(String.valueOf(prop
                                .get("argc")));
                        Class[] types = null;// 参数类型数组
                        Object[] args = null;// 参数对象数组

//                       不为最大值时表示有正常参数,否则为无参数。
                        if (argc != Integer.MAX_VALUE) {
                            // 重组参数列表
                            types = new Class[argc];
                            args = new Object[argc];
                            List typeList = (List) prop.get("types");
                            List argList = (List) prop.get("args");
                            for (int i = 0; i < argc; i++) {
                                args[i] = argList.get(i);
                                types[i] = (Class) typeList.get(i);
                                System.out.println("arg###=" + args[i]
                                        + "   type@@@=" + types[i]);
                            }
                        }
                        String methodName = prop.getProperty("method");

                        Method method = serobj.getClass().getMethod(methodName,
                                types);
                        //调用
                        Object resultObject = method.invoke(serobj, args);

                        //回复
                        replyCall(session.getRemoteAddress().toString(),resultObject);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    //广播操作,广播消息或事件.标记为"BROADCAST" 
    public void brocastMessage(Object obj){
        Enumeration enu = sessionMgr.keys();
        while (enu.hasMoreElements()) {
            String addr = (String) enu.nextElement();
            sendMessage("BROADCAST",addr,obj);
        }
    }
    
    //通知操作,通知消息或事件。标记为"NOTIFY"
    public void notify(String addr,Object obj){
        sendMessage("NOTIFY",addr,obj);
    }
    
    //响应RPC调用。
    public void replyCall(String addr,Object obj){
        sendMessage("REPLYCALL",addr,obj);
    }
    
    private void sendMessage(String mark,String addr,Object obj){
        Object sessionObj = sessionMgr.get(addr);
        if(sessionObj!=null){
            IoSession session = (IoSession)sessionObj;
            if(!session.isConnected()){
                return;
            }
            Properties resultProp = new Properties();
            resultProp.setProperty("mark",mark);
            if(obj!=null) //对于无返回的情况。
                resultProp.put("Object",obj);
            ByteBuffer bb = ByteBuffer.allocate(16);
            bb.setAutoExpand(true);
            bb.putObject(resultProp);
            bb.flip();
            session.write(bb);
        }else{
            System.out.println("session null.addr="+addr);
        }
    }
    
    protected void processStreamIo(IoSession session, InputStream ins,
            OutputStream ous) {
        System.out.println("processStreamIo is called.");
    }

    private boolean isContains(Class[] clazz, String ifName) {
        for (int i = 0; i < clazz.length; i++) {
            if (clazz[i].getName().equals(ifName))
                return true;
        }
        return false;
    }

    public void sessionOpened(IoSession ssn) {
        System.out.println("session open for " + ssn.getRemoteAddress());
        sessionMgr.put(ssn.getRemoteAddress().toString(),ssn);
    }

    public void exceptionCaught(IoSession ssn, Throwable cause) {
        cause.printStackTrace();
        sessionMgr.remove(ssn.getRemoteAddress().toString());
        ssn.close();
    }

    public void sessionClosed(IoSession ssn) throws Exception {
        System.out.println("session closed from " + ssn.getRemoteAddress());
        sessionMgr.remove(ssn.getRemoteAddress().toString());
    }

}

client包,实现client功能 。连接服务端调用common接口提供的方法。

Client类,连接服务端,调用客户端的方法。

package mina.client;

import java.net.InetSocketAddress;

import mina.common.Apple;

import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketConnector;

public class Client {
    public static void main(String[] args) throws Exception {

        // Create TCP/IP connector.
        SocketConnector connector = new SocketConnector();

        // Set connect timeout.
        ((IoConnectorConfig) connector.getDefaultConfig())
                .setConnectTimeout(15);

        ClientIoHandler handler = new ClientIoHandler();

        // Start communication.
        ConnectFuture cf = connector.connect(new InetSocketAddress("localhost",
                8080), handler);

        // Wait for the connection attempt to be finished.
        System.out.println("start to join");
        cf.join();
        cf.getSession();

        System.out.println("test get value="
                + RpcClientImpl.getInstance().getStringValue("adsf", 222,
                        new Apple()));

        System.out.println("test get int value="
                + RpcClientImpl.getInstance().getIntValue());

        System.out.println("test call process start.");
        RpcClientImpl.getInstance().printPrice();
        System.out.println("test call process end.");
    }
}

AbstractClientImpl类,为client实现接口时使用的超类。

package mina.client;

import org.apache.mina.common.IoSession;

public abstract class AbstractClientImpl {
    
    IoSession session = null;
    ClientIoHandler handler = null;
    public IoSession getSession() {
        return session;
    }

    public void setSession(IoSession session) {
        this.session = session;
    }

    public ClientIoHandler getHandler() {
        return handler;
    }

    public void setHandler(ClientIoHandler handler) {
        this.handler = handler;
    }
}

RpcClientImpl类,客户端实现common中接口的类,实际上是一种伪实现,在各个方法中组装调用的接品,方法,参数等信息,传到服务端由服务端的通信处理类处理信息,实现服务端调用并返回结果。

package mina.client;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Properties;

import mina.common.Apple;
import mina.common.RpcInterface;

import org.apache.mina.common.ByteBuffer;

public class RpcClientImpl extends AbstractClientImpl implements RpcInterface {

    private static String IFNAME = RpcInterface.class.getName();
    
    private static RpcClientImpl instance = null;
    
    public static RpcClientImpl getInstance(){
        if(instance == null){
            instance = new RpcClientImpl();
        }
        return instance;
    }
    
    public String getStringValue(String arg0,int arg1,Apple arg2) {
        if(session!=null){
            Properties prop = new Properties();
            prop.setProperty("interface",IFNAME);
            prop.setProperty("method","getStringValue");
            prop.put("argc","3");//参数个数.
        
            //生成参数类型链表
            List typeList = new ArrayList();
            typeList.add(String.class);
            typeList.add(Integer.TYPE);
            typeList.add(Apple.class);
            
            //生成参数对象链表
            List argList = new ArrayList();
            argList.add(arg0);
            argList.add(arg1);
            argList.add(arg2);
            
            prop.put("types",typeList);
            prop.put("args",argList);
            
            ByteBuffer bb = ByteBuffer.allocate(16);
            bb.setAutoExpand( true );
            bb.putObject(prop);
            bb.flip();
            System.out.println("bbb==="+bb.toString());
            session.write(bb);
        }
        
        try {
            System.out.println("handler="+handler);
            System.out.println("lock="+handler.lock);
            printStamp("1");
            synchronized(handler.lock){
                printStamp("2");
                handler.lock.wait();
                printStamp("3");
                if(handler.resultObject!=null){
                    return String.valueOf(handler.resultObject);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        return null;
    }

    public int getIntValue() {
        if(session!=null){
            Properties prop = new Properties();
            prop.setProperty("interface",IFNAME);
            prop.setProperty("method","getIntValue");
            prop.put("argc",Integer.MAX_VALUE);
            //prop.put("object",new Object());
            ByteBuffer bb = ByteBuffer.allocate(16);
            bb.setAutoExpand( true );
            bb.putObject(prop);
            bb.flip();
            System.out.println("bbb==="+bb.toString());
            session.write(bb);
        }
        try {
            System.out.println("handler="+handler);
            System.out.println("lock="+handler.lock);
            printStamp("1");
            synchronized(handler.lock){
                printStamp("2");
                handler.lock.wait();
                printStamp("3");
                if(handler.resultObject!=null){
                    return Integer.parseInt(String.valueOf(handler.resultObject));
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        return Integer.MIN_VALUE;
    }

    
    public void printStamp(String str){
        System.out.println((new Date().toString())+" "+str);
    }

    public void printPrice() {
        if(session!=null){
            Properties prop = new Properties();
            prop.setProperty("interface",IFNAME);
            prop.setProperty("method","printPrice");
            prop.put("argc",Integer.MAX_VALUE);//参数个数
            
            ByteBuffer bb = ByteBuffer.allocate(16);
            bb.setAutoExpand( true );
            bb.putObject(prop);
            bb.flip();
            System.out.println("bbb==="+bb.toString());
            session.write(bb);
        }
        
        try {
            System.out.println("handler="+handler);
            System.out.println("lock="+handler.lock);
            printStamp("1");
            synchronized(handler.lock){
                printStamp("2");
                handler.lock.wait();
                printStamp("3");
                return;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return;
    }
}

ClientIoHandler类,处理服务端通信的类,用于和服务端建立通信,处理由服务端发送过来的消息。

package mina.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.handler.StreamIoHandler;

public class ClientIoHandler extends StreamIoHandler{

    IoSession session = null;
    
    List implList = new ArrayList();
    
    Object resultObject;
    Object lock = new Object();
    
    public ClientIoHandler(){
        //init impls.
        RpcClientImpl impl = RpcClientImpl.getInstance();
        implList.add(impl);
        //impl.setHandler(this);
    }
    
    private void initImpls(){
        Iterator it = implList.iterator();
        while(it.hasNext())
        {
            Object obj = it.next();
            if(obj instanceof AbstractClientImpl){
                AbstractClientImpl impl = (AbstractClientImpl)obj;
                impl.setSession(session);
                System.out.println("add this handler");
                impl.setHandler(this);
            }
        }
    }
    
    //收到服务端消息后的处理,框架内部为异步。在应用中改为同步。
    public void messageReceived(IoSession session, Object buf) {
        System.out.println("receive message.");
        
        System.out.println(buf.toString());
        try{
            if(buf instanceof ByteBuffer){
                Properties prop = (Properties)(((ByteBuffer)buf).getObject());
                System.out.println("received prop="+prop);
                String mark = prop.getProperty("mark");
                //根据mark类型,选择不同处理。
                if(mark.equals("REPLYCALL")){
                    Object obj = prop.get("Object");
                    synchronized(lock){
                        System.out.println("result="+obj);
                        resultObject = obj;
                        lock.notify();
                    }
                }else if(mark.equals("BROADCAST")){
                    Object obj = prop.get("Object");
                    System.out.println("BROADCAST obj="+obj);
                }else if(mark.equals("NOTIFY")){
                    Object obj = prop.get("Object");
                    System.out.println("NOTIFY obj="+obj);
                }else{
                    System.out.println("unknow mark.mark="+mark);
                }
            }else{
                System.out.println("class type error.");
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    
    protected void processStreamIo(IoSession session, InputStream is, OutputStream os) {
        System.out.println("process stream info,"+session.getRemoteAddress());
    }

    public void sessionOpened(IoSession session) {
        // Set reader idle time to 10 seconds.
        // sessionIdle(...) method will be invoked when no data is read
        // for 10 seconds.
       System.out.println("open session..");
        this.session = session;
       initImpls();
       session.setIdleTime(IdleStatus.READER_IDLE, 10);
    }

    public void sessionClosed(IoSession session) {
        // Print out total number of bytes read from the remote peer.
        System.err.println("Total " + session.getReadBytes() + " byte(s)");
    }

    public void sessionIdle(IoSession session, IdleStatus status) {
        // Close the connection if reader is idle.
        if (status == IdleStatus.READER_IDLE)
            session.close();
    }
}

需要改进的地方:

    在实际应用中,还需要精简客户端的结构,实现多个impl,一个handler,即实现一个将消息dispatch方法。服务端对客户端的主动消息还需要另外定义格式,以满足不同条件下的需要。或者需要定义内容比较详细的事件。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: