您的位置:首页 > 编程语言 > Java开发

使用Java实现简易RPC框架

2017-05-14 23:24 507 查看
RPC其全程为Remote Process Call,即为远程过程调用。RPC将传统的本地调用转换为调用远端的服务器的方法,给系统的处理能力和吞吐量带来了极大的提升。

随着项目的发展,业务越来越复杂,单个项目的话,会非常复杂,且不易维护,如果单个项目挂了就会导致整个系统都无法使用,但是把复杂的业务拆分为多个小业务就方便系统的维护与开发。这种情况下就会有系统之间的相互调用。当日调用量不大的时候我们可以直接使用http(使用的最多的是appache的HttpClient),调用不同应用之间的服务,但是当调用量大的时候,因为使用http的调用一般使用json数据格式,所以数据传输量大,这种调用方法是基于http协议的,并且http的连接与释放需要占用大量的宝贵资源,并且不好做服务路由与负载均衡。所以出现了RPC框架,比如Dubbo。一般RPC框架都是基于TCP协议的长连接,传输的数据系列化,大大的减少了数据的传输量。



RPC远程调用,即不同应用之间的调用,必然涉及到ipc进程之间的通讯,进程之间的通讯有管道方式,共享内存方式,文件共享方式,管道方式,套接字方式。进程间的通讯可以参考这篇文章

这里RPC框架,进程间的通讯是使用java的套接字socket。这里实现的简易RPC框架,并没有处理负载均衡,服务的路由,服务的监控等,服务的注册等等,仅仅是实现服务的调用。

代码是参考这篇博客写的,博主是Dubbo的源码提供者。

消费者:

package com.xfl.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;

/**
* Created by XFL
* time on 2017/5/14 22:12
* description:消费者,调用服务
* http://javatar.iteye.com/blog/1123915 */
public class Consumer {
public static <T> T refer(final Class<T> interfaceClass,
final String host, final int port) throws Exception {
if (interfaceClass == null)
throw new IllegalArgumentException("Interface class == null");
if (!interfaceClass.isInterface())
throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
if (host == null || host.length() == 0)
throw new IllegalArgumentException("Host == null!");
if (port <= 0 || port > 65535)
throw new IllegalArgumentException("Invalid port " + port);
System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
//生成动态代理对象
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}

public static void main(String[] args) throws Exception {
//此处返回的是动态代理对象
HelloService service = refer(HelloService.class, "127.0.0.1", 1234);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
//调用hello方法时会调用代理对象的invoke方法
String hello = service.hello("World" + i);
System.out.println(hello);
Thread.sleep(1000);
}
}
}


生产者:

package com.xfl.rpc;

import java.net.ServerSocket;
import java.net.Socket;

/**
* Created by XFL
* time on 2017/5/14 21:22
* description:生产者暴露服务
* http://javatar.iteye.com/blog/1123915 */
public class Provider {
public static void export(final Object service, int port) throws Exception {
//服务校验
if (service == null) {
throw new IllegalArgumentException("service must not be null");
}
//端口校验
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port:" + port + "a valid port must between 0 and 65535");
}
//向操作系统注册服务
ServerSocket serverSocket = new ServerSocket(port);
//循环启动监听
while (true) {
Socket socket = serverSocket.accept();
//
c138
开启独立的线程处理服务调用
new ServerThread(socket, service).start();
}
}

public static void main(String[] args) throws Exception {
HelloService service = new HelloServiceImpl();
export(service, 1234);
}
}


package com.xfl.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;

/**
* Created by XFL
* time on 2017/5/14 21:51
* description: 单独处理调用的服务
*/
public class ServerThread extends Thread {
private Socket socket;
private Object service;

public ServerThread(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}

@Override
public void run() {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
//获取服务调用的方法
String methodName = input.readUTF();
//获取服务调用的参数类型
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
//获取服务调用的参数
Object[] arguments = (Object[]) input.readObject();
//获取输出响应结果流
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Method method;
//利用反射调用服务
method = service.getClass().getMethod(methodName, parameterTypes);
Object result = method.invoke(service, arguments);
//返回调用结果
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
}

input.close();
} catch (IOException e) {

} catch (ClassNotFoundException e) {
e.printStackTrace();
} finally {
//关闭连接
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}


接口

package com.xfl.rpc;

/**
* Created by XFL
* time on 2017/5/14 21:16
* description: 接口定义
*/
public interface HelloService {
String hello(String name);
}


package com.xfl.rpc;

/**
* Created by XFL
* time on 2017/5/14 21:18
* description:接口实现
*/
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "hello:" + name;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java RPC