您的位置:首页 > 编程语言 > Java开发

自定义的RPC的Java实现

2016-06-27 21:35 357 查看
http://jbm3072.iteye.com/blog/1088102

在看hadoop的源代码的时候,看到hadoop实现了一个自定义的RPC,于是有了自己写代码实现RPC的想法。

RPC的全名Remote Process Call,即远程过程调用。使用RPC,可以像使用本地的程序一样使用远程服务器上的程序。下面是一个简单的RPC 调用实例,从中可以看到RPC如何使用以及好处:

 

Java代码  


public class MainClient {  

    public static void main(String[] args) {  

        Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382);     

        System.out.println(echo.echo("hello,hello"));  

    }  

}  

Java代码  


  

 

Java代码  


public interface Echo {  

    public String echo(String string);  

}  

 

 

 

 

使用RPC.getProxy生成接口Echo的代理实现类。然后就可以像使用本地的程序一样来调用Echo中的echo方法。

使用RPC的好处是简化了远程服务访问。提高了开发效率。在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。

 

在看了RPCClient如何使用,我们再来定义一个RPC服务器的接口,看看服务器都提供什么操作:

 

 

Java代码  


public interface Server {  

    public void stop();  

    public void start();  

    public void register(Class interfaceDefiner,Class impl);  

    public void call(Invocation invo);  

    public boolean isRunning();  

    public int getPort();  

}  

 

 服务器提供了start和stop方法。使用register注册一个接口和对应的实现类。call方法用于执行Invocation指定的接口的方法名。isRunning返回了服务器的状态,getPort()则返回了服务器使用的端口。

 

来看看Invocation的定义:

 

 

Java代码  


public class Invocation implements Serializable{  

    /** 

     *  

     */  

    private static final long serialVersionUID = 1L;  

      

    private Class interfaces;  

    private Method method;  

    private Object[] params;  

    private Object result;  

      

      

    /** 

     * @return the result 

     */  

    public Object getResult() {  

        return result;  

    }  

    /** 

     * @param result the result to set 

     */  

    public void setResult(Object result) {  

        this.result = result;  

    }  

    /** 

     * @return the interfaces 

     */  

    public Class getInterfaces() {  

        return interfaces;  

    }  

    /** 

     * @param interfaces the interfaces to set 

     */  

    public void setInterfaces(Class interfaces) {  

        this.interfaces = interfaces;  

    }  

    /** 

     * @return the method 

     */  

    public Method getMethod() {  

        return method;  

    }  

    /** 

     * @param method the method to set 

     */  

    public void setMethod(Method method) {  

        this.method = method;  

    }  

    /** 

     * @return the params 

     */  

    public Object[] getParams() {  

        return params;  

    }  

    /** 

     * @param params the params to set 

     */  

    public void setParams(Object[] params) {  

        this.params = params;  

    }  

    @Override  

    public String toString() {  

        return interfaces.getName()+"."+method.getMethodName()+"("+Arrays.toString(params)+")";  

    }  

      

}  

 

 

 

     具体服务器实现类中的call方法是这样使用Invocation的:

 

 

 

Java代码  


@Override  

        public void call(Invocation invo) {  

            Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类  

            if(obj!=null) {  

                try {  

                    Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());  

                    Object result = m.invoke(obj, invo.getParams());  

                    invo.setResult(result);  

                } catch (Throwable th) {  

                    th.printStackTrace();  

                }  

            } else {  

                throw new IllegalArgumentException("has no these class");  

            }  

        }  

 

 

  下面来看服务器接收连接并处理连接请求的核心代码:

 

 

 

Java代码  


public class Listener extends Thread {  

    private ServerSocket socket;  

    private Server server;  

  

    public Listener(Server server) {  

        this.server = server;  

    }  

  

    @Override  

    public void run() {  

  

        System.out.println("启动服务器中,打开端口" + server.getPort());  

        try {  

            socket = new ServerSocket(server.getPort());  

        } catch (IOException e1) {  

            e1.printStackTrace();  

            return;  

        }  

        while (server.isRunning()) {  

            try {  

                  

                Socket client = socket.accept();  

                ObjectInputStream ois = new ObjectInputStream(client.getInputStream());  

                Invocation invo = (Invocation) ois.readObject();  

                server.call(invo);  

                ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());  

                oos.writeObject(invo);  

                oos.flush();  

                oos.close();  

                ois.close();  

            } catch (Exception e) {  

                // TODO Auto-generated catch block  

                e.printStackTrace();  

            }  

  

        }  

  

        try {  

            if (socket != null && !socket.isClosed())  

                socket.close();  

        } catch (IOException e) {  

            // TODO Auto-generated catch block  

            e.printStackTrace();  

        }  

    }  

}  

 

 

RPC具体的Server类是这样来使用Listener的:

 

Java代码  


public static class RPCServer implements Server{  

        private int port = 20382;  

        private Listener listener;   

        private boolean isRuning = true;  

          

        /** 

         * @param isRuning the isRuning to set 

         */  

        public void setRuning(boolean isRuning) {  

            this.isRuning = isRuning;  

        }  

  

        /** 

         * @return the port 

         */  

        public int getPort() {  

            return port;  

        }  

  

        /** 

         * @param port the port to set 

         */  

        public void setPort(int port) {  

            this.port = port;  

        }  

  

        private Map<String ,Object> serviceEngine = new HashMap<String, Object>();  

          

          

        @Override  

        public void call(Invocation invo) {  

            System.out.println(invo.getClass().getName());  

            Object obj = serviceEngine.get(invo.getInterfaces().getName());  

            if(obj!=null) {  

                try {  

                    Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());  

                    Object result = m.invoke(obj, invo.getParams());  

                    invo.setResult(result);  

                } catch (Throwable th) {  

                    th.printStackTrace();  

                }  

            } else {  

                throw new IllegalArgumentException("has no these class");  

            }  

        }  

  

        @Override  

        public void register(Class interfaceDefiner, Class impl) {  

            try {  

                this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());  

                System.out.println(serviceEngine);  

            } catch (Throwable e) {  

                // TODO Auto-generated catch block  

                e.printStackTrace();  

            }   

        }  

  

        @Override  

        public void start() {  

            System.out.println("启动服务器");  

            listener = new Listener(this);  

            this.isRuning = true;  

            listener.start();  

        }  

  

        @Override  

        public void stop() {  

            this.setRuning(false);  

        }  

  

        @Override  

        public boolean isRunning() {  

            return isRuning;  

        }  

          

    }  

 

    服务器端代码搞定后,来看看客户端的代码,先看看我们刚开始使用RPC.getProxy方法:

 

Java代码  


public static <T> T getProxy(final Class<T> clazz,String host,int port) {  

          

        final Client client = new Client(host,port);  

        InvocationHandler handler = new InvocationHandler() {  

              

            @Override  

            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  

                Invocation invo = new Invocation();  

                invo.setInterfaces(clazz);  

                invo.setMethod(new org.jy.rpc.protocal.Method(method.getName(),method.getParameterTypes()));  

                invo.setParams(args);  

                client.invoke(invo);  

                return invo.getResult();  

            }  

        };  

        T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);  

        return t;  

    }  

 

Client类的代码如下:

 

Java代码  


public class Client {  

    private String host;  

    private int port;  

    private Socket socket;  

    private ObjectOutputStream oos;  

    private ObjectInputStream ois;  

  

    public String getHost() {  

        return host;  

    }  

  

  

    public void setHost(String host) {  

        this.host = host;  

    }  

  

    public int getPort() {  

        return port;  

    }  

    public void setPort(int port) {  

        this.port = port;  

    }  

  

    public Client(String host, int port) {  

        this.host = host;  

        this.port = port;  

    }  

  

    public void init() throws UnknownHostException, IOException {  

        socket = new Socket(host, port);  

        oos = new ObjectOutputStream(socket.getOutputStream());  

    }  

  

    public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {  

        init();  

        System.out.println("写入数据");  

        oos.writeObject(invo);  

        oos.flush();  

        ois = new ObjectInputStream(socket.getInputStream());  

        Invocation result = (Invocation) ois.readObject();  

        invo.setResult(result.getResult());  

    }  

  

}  

 

    至此,RPC的客户端和服务器端代码完成,启动服务器的代码如下:

 

Java代码  


public class Main {  

    public static void main(String[] args) {  

        Server server = new RPC.RPCServer();  

        server.register(Echo.class, RemoteEcho.class);  

        server.start();  

    }  

  

}  

 

   现在先运行服务器端代码,再运行客户端代码,就可以成功运行。

   详细的代码,参考附件的源代码。

 

    在写这个RPC时,没有想太多。在数据串行化上,使用了java的标准io序列化机制,虽然不能跨平台,但是做DEMO还是不错的;另外在处理客户端请求上,使用了ServerSocket,而没有使用ServerSocketChannel这个java nio中的新特性;在动态生成接口的实现类上,使用了java.lang.reflet中的Proxy类。他可以动态创建接口的实现类。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: