您的位置:首页 > 其它

rpc系列-rpc02

2019-06-10 18:03 1796 查看
版权声明:本文为博主(李孟lm)原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_19968255/article/details/82896520

承接rpc系列-rpc01:https://blog.csdn.net/qq_19968255/article/details/82894381

示例

1.结构

2.代码

客户端:

rpc-client

[code]/**
* 框架的RPC 客户端(用于发送 RPC 请求)
*/
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

private static final Logger logger = LoggerFactory
.getLogger(RpcClient.class);

private String host;
private int port;

private RpcResponse response;

private final Object obj = new Object();

public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 链接服务端,发送消息
*/
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel)
throws Exception {
// 向pipeline中添加编码、解码、业务处理的handler
channel.pipeline()
.addLast(new RpcEncoder(RpcRequest.class))  //out-1
.addLast(new RpcDecoder(RpcResponse.class)) //in-1
.addLast(RpcClient.this);                   //in-2
}
}).option(ChannelOption.SO_KEEPALIVE, true);
// 链接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
//将request对象写入outbundle处理后发出(即RpcEncoder编码器)
future.channel().writeAndFlush(request).sync();

// 用线程等待的方式决定是否关闭连接
// 其意义是:先在此阻塞,等待获取到服务端的返回后,被唤醒,从而关闭网络连接
synchronized (obj) {
obj.wait();
}
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}

/**
* 读取服务端的返回结果
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response)
throws Exception {
this.response = response;

synchronized (obj) {
obj.notifyAll();
}
}

/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
logger.error("client caught exception", cause);
ctx.close();
}
}
[code]/**
* RPC 代理(用于创建 RPC 服务代理)
*/
public class RpcProxy {
//服务地址
private String serverAddress;
//自动加载,查找服务
private ServiceDiscovery serviceDiscovery;

public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}

public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}

/**
* 创建代理
*
* @param interfaceClass
* @return
*/
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, new InvocationHandler() {
public Object invoke(Object proxy, Method method,
Object[] args) throws Throwable {
//创建RpcRequest,封装被代理类的属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
//拿到声明这个方法的业务接口名称
request.setClassName(method.getDeclaringClass()
.getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//查找服务
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover();
}
//随机获取服务的地址
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
//创建Netty实现的RpcClient,链接服务端
RpcClient client = new RpcClient(host, port);
//通过netty向服务端发送请求
RpcResponse response = client.send(request);
//返回信息
if (response.isError()) {
throw response.getError();
} else {
return response.getResult();
}
}
});
}
}

 

服务端:

rpc-server

[code]/**
* 处理具体的业务调用
* 通过构造时传入的“业务接口及实现”handlerMap,来调用客户端所请求的业务方法
* 并将业务方法返回值封装成response对象写入下一个handler(即编码handler——RpcEncoder)
*/
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

private static final Logger logger = LoggerFactory
.getLogger(RpcHandler.class);

private final Map<String, Object> handlerMap;

RpcHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}

/**
* 接收消息,处理消息,返回结果,
*/
@Override
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request)
throws Exception {
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
//根据request来处理具体的业务调用
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
//写入 outbundle(即RpcEncoder)进行下一步处理(即编码)后发送到channel中给客户端
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

/**
* 根据request来处理具体的业务调用
* 调用是通过反射的方式来完成
*/
private Object handle(RpcRequest request) throws Throwable {
String className = request.getClassName();

//拿到实现类对象
Object serviceBean = handlerMap.get(className);

//拿到要调用的方法名、参数类型、参数值
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();

//拿到接口类
Class<?> forName = Class.forName(className);

System.out.println(serviceBean.toString()+"  "+ Arrays.toString(parameters));
//调用实现类对象的指定方法并返回结果
Method method = forName.getMethod(methodName, parameterTypes);
return method.invoke(serviceBean, parameters);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(cause.getMessage());
ctx.close();
}
}
[code]/**
* 框架的RPC 服务器(用于将用户系统的业务类发布为 RPC 服务)
* 使用时可由用户通过spring-bean的方式注入到用户的业务系统中
* 由于本类实现了ApplicationContextAware InitializingBean
* spring构造本对象时会调用setApplicationContext()方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现
* 还会调用afterPropertiesSet()方法,在方法中启动netty服务器
* 顺序
* 1.setApplicationContext 报错会中间截断
* 2.afterPropertiesSet
*/
public class RpcServer implements ApplicationContextAware, InitializingBean {

private static final Logger logger = LoggerFactory
.getLogger(RpcServer.class);

private String serverAddress;
private ServiceRegistry serviceRegistry;

//用于存储业务接口和实现类的实例对象(由spring所构造)
private Map<String, Object> handlerMap = new HashMap<String, Object>();

public RpcServer(String serverAddress) {
this.serverAddress = serverAddress;
}

//服务器绑定的地址和端口由spring在构造本类时从配置文件中传入
public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
this.serverAddress = serverAddress;
//用于向zookeeper注册名称服务的工具类
this.serviceRegistry = serviceRegistry;
}

/**
* 通过注解,获取标注了rpc服务注解的业务类的----接口及impl对象,将它放到handlerMap中
*/
public void setApplicationContext(ApplicationContext ctx)
throws BeansException {
Map<String, Object> serviceBeanMap = ctx
.getBeansWithAnnotation(RpcService.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
//从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名
String interfaceName = serviceBean.getClass()
.getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}

/**
* 在此启动netty服务,绑定handle流水线:
* 1、接收请求数据进行反序列化得到request对象
* 2、根据request中的参数,让RpcHandler从handlerMap中找到对应的业务imple,调用指定方法,获取返回结果
* 3、将业务调用结果封装到response并序列化后发往客户端
*/
public void afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel)
throws Exception {
channel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class))// in-1
.addLast(new RpcEncoder(RpcResponse.class))// out-1
.addLast(new RpcHandler(handlerMap));// in-2
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);

ChannelFuture future = bootstrap.bind(host, port).sync();
logger.debug("server started on port {}", port);

if (serviceRegistry != null) {
serviceRegistry.register(serverAddress);
}

future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
[code]/**
* RPC 请求注解(标注在服务实现类上)
*/
@Target({ ElementType.TYPE })//注解用在接口上
@Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
@Component
public @interface RpcService {

Class<?> value();
}

运行结果:

zkCli数据:

服务端

sample—server调用服务端接口实现:
Hello! World

客户端

服务端返回结果:
Hello! World

代码运行路径:

 

代码下载

地址:https://download.csdn.net/download/qq_19968255/10696211

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: