您的位置:首页 > 其它

Dubbo原理简介------自己动手实现简单的RPC调用

2018-02-09 16:28 1141 查看
最近开始学习Dubbo框架.       Dubbo是阿里推出的一个开源的分布式服务框架,致力于提供高性能和透明化的RPC(Remote Procedure Call)远程服务调用方案,以及SOA治理方案. 简单的说,就是个远程服务调用的分布式框架.      RPC即远程过程调用,也就是一台机器上的应用调用另一台机器上的方法.由于两个应用不在同一个内存上,因此不能直接调用,需要借助于网络通信,将方法相关的信息如接类型,方法名,参数类型,参数列表等通过网络通信发送给服务提供者,由服务提供者调用方法,并将方法执行的结果发送给服务调用者.下图是Dubbo的架构图:


主要分为几个部分:1.Provider 服务提供者2.Consumer 调用服务的服务消费者3.Registry 服务注册与发现的注册中心4.统计服务的调用次调和调用时间的监控中心服务调用的过程:1).服务提供者向注册中心注册自己的服务2).服务消费者向注册中心订阅自己所需的服务3).注册中心将服务提供者的地址发送给服务消费者4).服务消费者和服务提供者建立连接,调用服务提供者的服务
为了更好的理解RPC,下面参考网上的信息,简单实现了一个RPC过程. 主要分为以下几个部分:1.ServiceProvider:服务提供者,对外提供服务
2.ServiceConsumer:服务消费者,调用服务提供者的服务3.Registry:注册中心,负责服务的注册和发现      这里主要利用了动态代理和socket通信的方法. 客户端通过动态代理获得代理对象;服务器创建真实对象;当客户端通过代理对象调用方法时, 在服务器和客户端之间建立socket连接, 将接口类型, 方法名, 参数类型, 参数对象发送给服务端. 服务端接收到相关信息后, 调用方法,并将方法返回的结果对象发送给客户端.
注册中心[java] view plain copy/** 
 * 服务注册中心 
 * @author Administrator 
 * */  
public class Registory {  
    private Map<String, ProviderConnection>map=new ConcurrentHashMap<>();  
    private static final int PORT_PROVIDER=12003;  
    private static final int PORT_CONSUMER=12206;  
    /**监听服务提供者的注册请求*/  
    public void register(){  
        ServerSocket server=null;  
        try {  
            server=new ServerSocket(PORT_PROVIDER);  
            System.out.println("开启provider监听");  
            while(true){  
                Socket socket=server.accept();  
                System.out.println("provider建立连接");  
                registService(socket);  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        } finally {  
            if(server!=null){  
                try {server.close();} catch (IOException e) {e.printStackTrace();}  
            }  
        }  
    }  
    /**provider连接,将接口名\主机名\端口号发送过来;Registry接收到后将信息保存在map中*/  
    private void registService(Socket socket) {  
        ObjectInputStream reader=null;  
        try{  
            reader=new ObjectInputStream(socket.getInputStream());  
            String interfaceName=reader.readUTF();  
            ProviderConnection provConn=(ProviderConnection) reader.readObject();  
            map.put(interfaceName, provConn);  
        }catch(IOException e){  
            e.printStackTrace();  
        } catch (ClassNotFoundException e) {  
            e.printStackTrace();  
        }finally {  
            if(reader!=null&&!socket.isClosed()){  
                try{reader.close();}catch(IOException e){e.printStackTrace();}  
            }  
            if(socket!=null&&!socket.isClosed()){  
                try{socket.close();}catch(IOException e){e.printStackTrace();}  
            }  
        }  
        System.out.println("服务注册成功");  
    }  
    /**监听服务消费者的请求*/  
    public void subscribe(){  
        ServerSocket server=null;  
        try {  
            server=new ServerSocket(PORT_CONSUMER);  
            System.out.println("开启consumer监听");  
            while(true){    //////////  
                Socket socket=server.accept();  
                System.out.println("消费者建立连接...");  
                subscribeService(socket);  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        } finally {  
            if(server!=null){  
                try {server.close();} catch (IOException e) {e.printStackTrace();}  
            }  
        }  
    }  
    /**接收消费者的信息,并将服务提供者对应的信息返回给消费者*/  
    private void subscribeService(Socket socket) {  
        ObjectInputStream reader=null;  
        ObjectOutputStream oos=null;  
        try{  
            reader=new ObjectInputStream(socket.getInputStream());  
            oos=new ObjectOutputStream(socket.getOutputStream());  
            String interfaceName=reader.readUTF();  
            System.out.println("interfaceName="+interfaceName);  
            ProviderConnection provConn=map.get(interfaceName);  
              
            System.out.println("provConn="+provConn);  
            oos.writeUTF(provConn.getHost());  
            System.out.println("host="+provConn.getHost());  
              
            Integer port=provConn.getPort();/////////  
            System.out.println("port="+port);  
            oos.writeInt(port); /////////  
            oos.flush();    /////////  
        }catch(IOException e){  
            e.printStackTrace();  
        }  
    }  
    public static void main(String[] args) {  
        Registory registory=new Registory();  
        Thread thread1=new Thread(new Runnable() {  
            @Override  
            public void run() {  
                registory.register();  
            }  
        });  
        Thread thread2=new Thread(new Runnable() {  
            @Override  
            public void run() {  
                registory.subscribe();  
            }  
        });  
        thread1.start();thread2.start();  
    }  
}  

服务提供者[java] view plain copy<span style="font-size:14px;">/** 
 * 服务提供者,负责对外提供服务 
 * @author Administrator 
 * */  
public class ServiceProvider {  
    private ExecutorService executor=Executors.newFixedThreadPool(10);  
    private boolean flag=false;  
      
    private Map<String,Object> map=new ConcurrentHashMap<>();  
    private static final String REGISTORY_HOST="localhost";  
    private static final String PROVIDER_HOST="localhost";  
    /**注册中心的端口号*/  
    private static final int REGISTOR_PORT=12003;  
    /**本机监听的端口号,用于和客户端建立连接*/  
    private static final int PROVIDER_PORT=12004;  
    public ServiceProvider() {}  
      
    /**将服务保存在本地*/  
    public void regist(Object object){  
        Class<?>[] cls=object.getClass().getInterfaces();  
        String interfaceName=cls[0].getName();  
        map.put(interfaceName, object);  
        System.out.println(interfaceName+"保存...");  
        registService(REGISTORY_HOST,REGISTOR_PORT,interfaceName);  
    }  
    /**将服务的接口名和服务提供者的信息注册到远程的注册中心*/  
    private void registService(String registoryHost, int registoryPort, String interfaceName) {  
        Socket socket=null;  
        ObjectOutputStream writer=null;  
        try {  
            socket=new Socket(registoryHost,registoryPort);  
            writer =new ObjectOutputStream(socket.getOutputStream());  
              
            writer.writeUTF(interfaceName);  
            writer.writeObject(new ProviderConnection(PROVIDER_HOST,PROVIDER_PORT));  
            writer.flush();  
            System.out.println(interfaceName+"注册...");  
        } catch (UnknownHostException e) {  
            e.printStackTrace();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }finally {  
            if(writer!=null){  
                try {writer.close();} catch (IOException e) {e.printStackTrace();}  
            }  
            if(socket!=null){  
                try {socket.close();} catch (IOException e) {e.printStackTrace();}  
            }  
        }  
    }  
    /**启动服务器*/  
    public void start(){  
        ServerSocket server=null;  
        System.out.println("服务器启动");  
        while(!flag){  
            try {  
                server=new ServerSocket(PROVIDER_PORT);  
                Socket socket=server.accept();  
                System.out.println("消费者建立连接");  
                Thread thread=new Thread(new ThreadA(socket));  
                executor.submit(thread);  
            } catch (IOException e) {  
                e.printStackTrace();  
            }finally {  
                if(server!=null){  
                    try{server.close();}catch(IOException e){e.printStackTrace();}  
                }  
            }  
        }  
        System.out.println("服务器关闭成功");  
    }  
    /**关闭服务器*/  
    public void stop(){  
        this.flag=true;  
    }  
    /** 
     * 1.服务消费者连接成功之后,接收消费者发送来的信息: 
     *      1).服务名称:接口名 
     *      2).方法名 
     *      3).参数类型(Class[]) 
     *      4).参数对象(Object[]) 
     * 2.调用方法 
     *      method.invoke(...); 
     * 3.返回: 
     *      方法执行后的result对象. 
     * */  
    class ThreadA implements Runnable{  
        private Socket socket;  
        public ThreadA(Socket socket) {  
            this.socket=socket;  
        }  
        @Override  
        public void run() {  
            ObjectOutputStream oos=null;  
            ObjectInputStream ois=null;  
            try {  
                oos=new ObjectOutputStream(socket.getOutputStream());  
                ois=new ObjectInputStream(socket.getInputStream());  
                  
                String interfaceName=ois.readUTF();  
                String methodName=ois.readUTF();  
                Class<?> argsCls[]=(Class<?>[]) ois.readObject();  
                Object[] args=(Object[]) ois.readObject();  
                  
                Object targetService =map.get(interfaceName);  
                Method method=targetService.getClass().getMethod(methodName, argsCls);  
                Object result=method.invoke(targetService, args);  
                  
                oos.writeObject(result);  
            }catch (Exception e) {  
                e.printStackTrace();  
            } finally {  
                if(ois!=null&&!socket.isClosed()){  
                    try{ois.close();}catch(IOException e){e.printStackTrace();}  
                }  
                if(oos!=null&&!socket.isClosed()){  
                    try{oos.close();}catch(IOException e){e.printStackTrace();}  
                }  
            }  
        }  
    }  
    public static void main(String[] args) {  
        HelloWorld helloWorld=new HelloWorld();  
        ServiceProvider provider=new ServiceProvider();  
        provider.regist(helloWorld);  
        provider.start();  
    }  
}</span><span style="font-size:18px;">  
</span>  

服务消费者[java] view plain copy/** 
 * 服务消费者 
 * @author Administrator 
 * */  
public class ServiceConsumer {  
    /*注册中心的端口号*/  
    private static final int<span style="font-size:12px;"> REGISTER_PORT=12</span>206;  
    /*注册中心的主机名*/  
    private static final String REGISTER_HOST="localhost";  
    /*服务提供者的主机名*/  
    private String host;  
    /*服务提供者的端口号*/  
    private int port;  
    /**从注册中心获取服务提供者的信息: 
     * 1.host服务提供者的主机名 
     * 2.port服务提供者的端口号 
     * */  
    public void getProvider(String interfaceName){  
        ObjectOutputStream oos=null;  
        ObjectInputStream ois=null;  
        Socket socket=null;  
        try {  
            socket=new Socket(REGISTER_HOST,REGISTER_PORT);  
            oos=new ObjectOutputStream(socket.getOutputStream());  
            ois=new ObjectInputStream(socket.getInputStream());  
            oos.writeUTF(interfaceName);  
            oos.flush();  
              
            String host=ois.readUTF();  
            this.host=host;  
            int port=ois.readInt();  
            this.port=port;  
            System.out.println("host="+host+"\nport="+port);  
        } catch (UnknownHostException e) {  
            e.printStackTrace();  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
    /** 
     * 利用动态代理的方式获取服务 
     *  1.服务消费者端持有代理类对象; 
     *  2.当消费者执行代理对象的方法时,建立和服务提供者的连接, 
     *  3.发送相关参数给服务提供者, 
     *  4.获得方法执行后的结果 
     * @param serviceInterface :服务的接口类型 
     * @return 
     */  
    @SuppressWarnings("unchecked")  
    public <T>T findService(Class<T> serviceInterface){  
          
        return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler(){  
            @Override  
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
                Socket socket=new Socket(host,port);  
                ObjectOutputStream oos=null;  
                ObjectInputStream ois=null;  
                try {  
                    oos=new ObjectOutputStream(socket.getOutputStream());  
                    ois=new ObjectInputStream(socket.getInputStream());  
                      
                    String methodName=method.getName();  
                    System.out.println("serviceInterface="+serviceInterface.getName());  
                    oos.writeUTF(serviceInterface.getName());  
                    oos.writeUTF(methodName);  
                    oos.writeObject(method.getParameterTypes());  
                    oos.writeObject(args);  
                    oos.flush();  
                      
                    Object result=ois.readObject();  
                    return result;  
                } catch (UnknownHostException e) {  
                    e.printStackTrace();  
                } catch (IOException e) {  
                    e.printStackTrace();  
                } finally {  
                    if(ois!=null&&!socket.isClosed()){  
                        try{ois.close();}catch(IOException e){e.printStackTrace();}  
                    }  
                    if(oos!=null&&!socket.isClosed()){  
                        try{oos.close();}catch(IOException e){e.printStackTrace();}  
                    }  
                    if(socket!=null){  
                        try{socket.close();}catch(IOException e){e.printStackTrace();}  
                    }  
                }  
                return null;  
            }  
        });  
    }  
    public static void main(String[] args) {  
        ServiceConsumer consumer=new ServiceConsumer();  
        consumer.getProvider("cn.dongchao.dubbo.day02.Hello");  
        Hello hello=consumer.findService(Hello.class);  
        String result=hello.sayHello("zhangsan");  
        System.out.println("result="+result);  
    }  
}  

用于测试的接口:[java] view plain copypublic interface Hello {  
    String sayHello(String message);  
    void sayYes(String msg);  
}  
实现类:[java] view plain copypublic class HelloWorld implements Hello{  
  
    @Override  
    public String sayHello(String message) {  
        System.out.println("hello..."+message+"!");  
        return "hello..."+message+"!";  
    }  
    @Override  
    public void sayYes(String msg) {  
        System.out.println("yes: "+msg+"...");  
          
    }  
}  
分别启动Registry\Provider\Consumer,  结果如下:注册中心:


provider:


Consumer:


可以看到,provider的服务成功注册到注册Registry, 然后Consumer从Registry获得服务提供者的相关信息, Consumer和provider建立连接; Consumer端调用方法,provider端执行方法,并将结果返回给Consumer. Consumer端成功接收到方法执行后的结果.本文章参考了以下博客的内容:http://blog.csdn.net/is_zhoufeng/article/details/21133643http://blog.csdn.net/ichsonx/article/details/39008519
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: