自己实现简单RPC功能
2015-08-30 16:49
375 查看
最近对RMI RPC比较感兴趣, 所以自己做了一个简单的实现, 如果有时间,之后会继续完善。
RPC主要分为服务端与客户端。
服务端的实现如下:
[java] view
plaincopy
package com.zf.rpc.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class RPCServer {
private static final ExecutorService taskPool = Executors.newFixedThreadPool(50) ;
/**
* 服务接口对象库
* key:接口名 value:接口实现
*/
private static final ConcurrentHashMap<String, Object> serviceTargets =
new ConcurrentHashMap<String, Object>() ;
private static AtomicBoolean run = new AtomicBoolean(false) ;
/**
* 注册服务
* @param service
*/
public void registService(Object service){
Class<?>[] interfaces = service.getClass().getInterfaces() ;
if(interfaces == null){
throw new IllegalArgumentException("服务对象必须实现接口");
}
Class<?> interfacez = interfaces[0] ;
String interfaceName = interfacez.getName() ;
serviceTargets.put(interfaceName, service) ;
}
/**
* 启动Server
* @param port
*/
public void startServer(final int port){
Runnable lifeThread = new Runnable() {
@Override
public void run() {
ServerSocket lifeSocket = null ;
Socket client = null ;
ServiceTask serviceTask = null ;
try {
lifeSocket = new ServerSocket(port) ;
run.set(true) ;
while(run.get()){
client = lifeSocket.accept() ;
serviceTask = new ServiceTask(client);
serviceTask.accept() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
taskPool.execute(lifeThread) ;
System.out.println("服务启动成功...");
}
public void stopServer(){
run.set(false) ;
taskPool.shutdown() ;
}
public static final class ServiceTask implements Runnable{
private Socket client ;
public ServiceTask(Socket client){
this.client = client ;
}
public void accept(){
taskPool.execute(this) ;
}
@Override
public void run() {
InputStream is = null ;
ObjectInput oi = null ;
OutputStream os = null ;
ObjectOutput oo = null ;
try {
is = client.getInputStream() ;
os = client.getOutputStream() ;
oi = new ObjectInputStream(is);
String serviceName = oi.readUTF() ;
String methodName = oi.readUTF();
Class<?>[] paramTypes = (Class[]) oi.readObject() ;
Object[] arguments = (Object[]) oi.readObject() ;
System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
Object targetService = serviceTargets.get(serviceName) ;
if(targetService == null){
throw new ClassNotFoundException(serviceName + "服务未找到!") ;
}
Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes) ;
Object result = targetMethod.invoke(targetService, arguments) ;
oo = new ObjectOutputStream(os) ;
oo.writeObject(result) ;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}finally{
try {
if(oo != null){
oo.close() ;
}
if(os != null){
os.close() ;
}
if(is != null){
is.close() ;
}
if(oi != null){
oi.close() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端如下:
[java] view
plaincopy
package com.zf.rpc.client;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class RPCClient {
/**
* 根据接口类型得到代理的接口实现
* @param <T>
* @param host RPC服务器IP
* @param port RPC服务端口
* @param serviceInterface 接口类型
* @return 被代理的接口实现
*/
@SuppressWarnings("unchecked")
public static <T> T findService(final String host , final int port ,final Class<T> serviceInterface){
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
Socket socket = null ;
InputStream is = null ;
OutputStream os = null ;
ObjectInput oi = null ;
ObjectOutput oo = null ;
try {
socket = new Socket(host, port) ;
os = socket.getOutputStream() ;
oo = new ObjectOutputStream(os);
oo.writeUTF(serviceInterface.getName()) ;
oo.writeUTF(method.getName()) ;
oo.writeObject(method.getParameterTypes()) ;
oo.writeObject(args);
is = socket.getInputStream() ;
oi = new ObjectInputStream(is) ;
return oi.readObject() ;
} catch (Exception e) {
System.out.println("调用服务异常...");
return null ;
}finally{
if(is != null){
is.close() ;
}
if(os != null){
is.close() ;
}
if(oi != null){
is.close() ;
}
if(oo != null){
is.close() ;
}
if(socket != null){
is.close() ;
}
}
}
});
}
}
现在写一个接口和一个实现。
[java] view
plaincopy
package com.zf.rpc.test;
public interface IHelloWorld {
String sayHello(String name) ;
}
[java] view
plaincopy
package com.zf.rpc.test;
public class HelloWorld implements IHelloWorld{
@Override
public String sayHello(String name) {
return "hello " + name + "!";
}
}
下面就可以开始测试了。
先写RPC服务端,并启动
[java] view
plaincopy
package com.zf.rpc.test;
import com.zf.rpc.server.RPCServer;
public class RPCServerTest {
public static void main(String[] args) {
RPCServer server = new RPCServer() ;
server.registService(new HelloWorld()) ;
server.startServer(8080) ;
}
}
启动后,会看到输出
服务启动成功...
然后写RPC客户端,并启动
[java] view
plaincopy
package com.zf.rpc.test;
import com.zf.rpc.client.RPCClient;
public class RPCClientTest {
public static void main(String[] args) {
IHelloWorld helloWorld =
RPCClient.findService("127.0.0.1" , 8080 , IHelloWorld.class) ;
String result = helloWorld.sayHello("is_zhoufeng");
System.out.println(result );
}
}
会看到客户端输出:
hello is_zhoufeng!
到此, 一个远程调用就实现了。
RPC主要分为服务端与客户端。
服务端的实现如下:
[java] view
plaincopy
package com.zf.rpc.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class RPCServer {
private static final ExecutorService taskPool = Executors.newFixedThreadPool(50) ;
/**
* 服务接口对象库
* key:接口名 value:接口实现
*/
private static final ConcurrentHashMap<String, Object> serviceTargets =
new ConcurrentHashMap<String, Object>() ;
private static AtomicBoolean run = new AtomicBoolean(false) ;
/**
* 注册服务
* @param service
*/
public void registService(Object service){
Class<?>[] interfaces = service.getClass().getInterfaces() ;
if(interfaces == null){
throw new IllegalArgumentException("服务对象必须实现接口");
}
Class<?> interfacez = interfaces[0] ;
String interfaceName = interfacez.getName() ;
serviceTargets.put(interfaceName, service) ;
}
/**
* 启动Server
* @param port
*/
public void startServer(final int port){
Runnable lifeThread = new Runnable() {
@Override
public void run() {
ServerSocket lifeSocket = null ;
Socket client = null ;
ServiceTask serviceTask = null ;
try {
lifeSocket = new ServerSocket(port) ;
run.set(true) ;
while(run.get()){
client = lifeSocket.accept() ;
serviceTask = new ServiceTask(client);
serviceTask.accept() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
};
taskPool.execute(lifeThread) ;
System.out.println("服务启动成功...");
}
public void stopServer(){
run.set(false) ;
taskPool.shutdown() ;
}
public static final class ServiceTask implements Runnable{
private Socket client ;
public ServiceTask(Socket client){
this.client = client ;
}
public void accept(){
taskPool.execute(this) ;
}
@Override
public void run() {
InputStream is = null ;
ObjectInput oi = null ;
OutputStream os = null ;
ObjectOutput oo = null ;
try {
is = client.getInputStream() ;
os = client.getOutputStream() ;
oi = new ObjectInputStream(is);
String serviceName = oi.readUTF() ;
String methodName = oi.readUTF();
Class<?>[] paramTypes = (Class[]) oi.readObject() ;
Object[] arguments = (Object[]) oi.readObject() ;
System.out.println("serviceName:" + serviceName + " methodName:" + methodName);
Object targetService = serviceTargets.get(serviceName) ;
if(targetService == null){
throw new ClassNotFoundException(serviceName + "服务未找到!") ;
}
Method targetMethod = targetService.getClass().getMethod(methodName, paramTypes) ;
Object result = targetMethod.invoke(targetService, arguments) ;
oo = new ObjectOutputStream(os) ;
oo.writeObject(result) ;
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}finally{
try {
if(oo != null){
oo.close() ;
}
if(os != null){
os.close() ;
}
if(is != null){
is.close() ;
}
if(oi != null){
oi.close() ;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端如下:
[java] view
plaincopy
package com.zf.rpc.client;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
public class RPCClient {
/**
* 根据接口类型得到代理的接口实现
* @param <T>
* @param host RPC服务器IP
* @param port RPC服务端口
* @param serviceInterface 接口类型
* @return 被代理的接口实现
*/
@SuppressWarnings("unchecked")
public static <T> T findService(final String host , final int port ,final Class<T> serviceInterface){
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface}, new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
Socket socket = null ;
InputStream is = null ;
OutputStream os = null ;
ObjectInput oi = null ;
ObjectOutput oo = null ;
try {
socket = new Socket(host, port) ;
os = socket.getOutputStream() ;
oo = new ObjectOutputStream(os);
oo.writeUTF(serviceInterface.getName()) ;
oo.writeUTF(method.getName()) ;
oo.writeObject(method.getParameterTypes()) ;
oo.writeObject(args);
is = socket.getInputStream() ;
oi = new ObjectInputStream(is) ;
return oi.readObject() ;
} catch (Exception e) {
System.out.println("调用服务异常...");
return null ;
}finally{
if(is != null){
is.close() ;
}
if(os != null){
is.close() ;
}
if(oi != null){
is.close() ;
}
if(oo != null){
is.close() ;
}
if(socket != null){
is.close() ;
}
}
}
});
}
}
现在写一个接口和一个实现。
[java] view
plaincopy
package com.zf.rpc.test;
public interface IHelloWorld {
String sayHello(String name) ;
}
[java] view
plaincopy
package com.zf.rpc.test;
public class HelloWorld implements IHelloWorld{
@Override
public String sayHello(String name) {
return "hello " + name + "!";
}
}
下面就可以开始测试了。
先写RPC服务端,并启动
[java] view
plaincopy
package com.zf.rpc.test;
import com.zf.rpc.server.RPCServer;
public class RPCServerTest {
public static void main(String[] args) {
RPCServer server = new RPCServer() ;
server.registService(new HelloWorld()) ;
server.startServer(8080) ;
}
}
启动后,会看到输出
服务启动成功...
然后写RPC客户端,并启动
[java] view
plaincopy
package com.zf.rpc.test;
import com.zf.rpc.client.RPCClient;
public class RPCClientTest {
public static void main(String[] args) {
IHelloWorld helloWorld =
RPCClient.findService("127.0.0.1" , 8080 , IHelloWorld.class) ;
String result = helloWorld.sayHello("is_zhoufeng");
System.out.println(result );
}
}
会看到客户端输出:
hello is_zhoufeng!
到此, 一个远程调用就实现了。
相关文章推荐
- C++ 强制类型转换
- 数据结构中排序算法-交换排序(2)
- 分布式中Redis实现Session终结篇
- 常见问题总结篇 一 、Objective C 方法和 C方法的混合调用
- Merge k Sorted Lists
- GTK+浅谈之一Windows10下QtCreator中GTK+环境搭建
- 富文本Html.fromHtml方法
- 安卓ADT离线安装教程
- IE附图(Image对象)显示内存溢出解决方案
- java.lang.reflect.Invocation TargetException异常
- 删除二叉查找树的节点-LintCode
- 主席树模板
- JAVA变量在内存中的分配
- 用virtualenv管理python3运行环境
- global.asax文件的应用
- 《Unicode之痛》摘抄
- Xcode 注释插件
- C++内存管理
- jquery实现整屏翻屏效果:jquery.mousewheel(一)
- Win8下双系统win7 教程详解