您的位置:首页 > 编程语言 > Java开发

Java实现简易RPC框架(二)

2014-12-03 16:41 106 查看

 

上一遍blog我们已经说过了RPC的相关知识,接下来我们利用socket来实现RPC。

一、利用socket、动态代理实现RPC

参考阿里梁飞同学网上的例子,并调整单服务多线程模式,首先是框架代码

package org.bird.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
/**
* RPC框架
* @author liangjf
*
*/
public class RpcFramework {
static ServerSocket server = null;
static Map<String,Object> serviceMap = new HashMap<String,Object>();
/**
* 暴露服务
* @param <C>
*
* @param service 服务实现
* @param port 服务端口
* @throws Exception
*/
public static <C> void export(final Object service, final int port) throws Exception {
if (service == null) {
throw new IllegalArgumentException("service instance == null");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println("Export service " + service.getClass().getName() + " on port " + port);

if(server == null) {
server = new ServerSocket(port);
}else if(server.getLocalPort() != port) {
server = new ServerSocket(port);
}
Class<Object>[] classs = (Class<Object>[]) service.getClass().getInterfaces();
for(Class<Object> c : classs){
serviceMap.put(c.getName() + "|" + port, service);
}

new Thread(new Runnable(){//创建后台线程进行socket监听
public void run() {
while(true) {
try {
final Socket socket = server.accept();
try {
try {
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
String interfaceName = input.readUTF();//客户端调用的接口
String methodName = input.readUTF();//service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称
Class<?>[] parameterTypes = (Class<?>[])input.readObject();//获得参数的类型
Object[] arguments = (Object[])input.readObject();//获得参数

Object remoter = RpcFramework.getService(interfaceName + "|" + port);
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
Method method = remoter.getClass().getMethod(methodName, parameterTypes);//通过反射机制获得方法
Object result = method.invoke(remoter, arguments);//通过反射机制获得类的方法,并调用这个方法
output.writeObject(result);//将结果发送
} catch (Throwable t) {
output.writeObject(t);
} finally {
output.close();
}
} finally {
input.close();
}
} finally {
socket.close();
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}).start();
}

/**
* 引用服务
*
* @param <T> 接口泛型
* @param interfaceClass 接口类型
* @param host 服务器主机名
* @param port 服务器端口
* @return 远程服务
* @throws Exception
*///原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果
@SuppressWarnings("unchecked")
public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
if (interfaceClass == null) {
throw new IllegalArgumentException("Interface class == null");
}
if (! interfaceClass.isInterface()) {
throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
}
if (host == null || host.length() == 0) {
throw new IllegalArgumentException("Host == null!");
}
if (port <= 0 || port > 65535) {
throw new IllegalArgumentException("Invalid port " + port);
}
System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {//jdk动态代理,内部类
public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
Socket socket = new Socket(host, port);
try {
ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
try {
output.writeUTF(interfaceClass.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(arguments);
ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
try {
Object result = input.readObject();
if (result instanceof Throwable) {
throw (Throwable) result;
}
return result;
} finally {
input.close();
}
} finally {
output.close();
}
} finally {
socket.close();
}
}
});
}

private static Object getService(String name) {
return serviceMap.get(name);
}
}

 接着写两个需要暴露的服务接口

package org.bird.rpc;

/**
* 服务接口
* @author liangjf
*
*/
public interface HelloService {

String hello(String name);
}

 

package org.bird.rpc;
/**
* 服务接口
* @author liangjf
*
*/
public interface GoodByeService {

public String sayGoodBye(String name);
}

 实现服务类

package org.bird.rpc.impl;

import org.bird.rpc.HelloService;

public class HelloServiceImpl implements HelloService {

public String hello(String name) {
return "Hello " + name;
}
}

 

package org.bird.rpc.impl;

import org.bird.rpc.GoodByeService;

public class GoodByeServiceImpl implements GoodByeService {

public String sayGoodBye(String name) {
return "Good Bye " + name;
}
}

 接着写一个服务器测试类

package org.bird.rpc;

import org.bird.rpc.impl.GoodByeServiceImpl;
import org.bird.rpc.impl.HelloServiceImpl;
/**
* 服务器
* @author liangjf
*
*/
public class Service {

public  static void main(String[] args) throws Exception {
HelloService helloService = new HelloServiceImpl();
GoodByeService goodbyeService = new GoodByeServic
2ac90
eImpl();
RpcFramework.export(helloService, Util.SP);
RpcFramework.export(goodbyeService, Util.SP);
}
}

 最后写一个远程客户端测试类

package org.bird.rpc;

/**
* 客户端
* @author liangjf
*
*/
public class Client {

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HelloService service = RpcFramework.refer(HelloService.class,Util.SIP, Util.SP);
for (int i = 0; i < 5; i++) {
String hello = service.hello("World" + i);
System.out.println(hello);
Thread.sleep(2000);
}
GoodByeService goodbyeService = RpcFramework.refer(GoodByeService.class, Util.SIP, Util.SP);
for(int i = 0 ; i < 5; i++) {
String goodbye = goodbyeService.sayGoodBye("ljf" + i);
System.out.println(goodbye);
Thread.sleep(2000);
}
}

}

 IP跟端口辅助类

package org.bird.rpc;

public class Util {
static String SIP = "127.0.0.1";
static int SP = 8888;
}

 1、首先将RpcFramework、HelloService、GoodByeService、HelloServiceImpl、GoodByeServiceImpl、Service、Util这些类一起打包成可运行的server.jar;将RpcFramework、HelloService、GoodByeService、Client、Util这些类一起打包成两个可运行的client1.jar、client2.jar。

 

2、然后在命令行模式下通过java -jar server.jar命令启动server端

 3、再打开一个命令行窗口启动客户端client1


 我们再看看server端的变化

 4、另打开一个命令行窗口启动客户端client2

 再看看服务器端

从上面的测试结果来看我们只需要将接口暴露给远程客户端就可以进行远程RPC调用了,就像本地调用一样透明简单。主要实现原理是通过JDK动态代理动态生成代理实例。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: