您的位置:首页 > 编程语言 > Java开发

简单的分布式RPC框架 《blackRpc》 五(客户端启动及调用)

2018-01-23 11:38 369 查看
警告:本文仅仅适合初探RPC的小伙伴,对于业界老鸟的话,就不建议吐槽了。。。。

上节说到了服务端启动流程,本节说下客户端启动流程。直接上代码。

netty客户端:

package com.black.blackrpc.communication.netty.tcp.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.util.concurrent.Future;
/**
* netty tcp 客户端
* @author v_wangshiyu
*
*/
public class NettyTcpClient {
private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
private String host;
private int port;
private Bootstrap bootstrap;
private Channel channel;
private EventLoopGroup group;

public NettyTcpClient(String host,int port){
bootstrap=getBootstrap();
channel= getChannel(host,port);
this.host=host;
this.port=port;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

/**
* 初始化Bootstrap
* @return
*/
public final Bootstrap getBootstrap(){
group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// pipeline.addLast(new Encoder());
// pipeline.addLast(new Decoder());
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());

pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}

/**
* 连接,获取Channel
* @param host
* @param port
* @return
*/
public final Channel getChannel(String host,int port){
Channel channel = null;
try {
channel = bootstrap.connect(host, port).sync().channel();
return channel;
} catch (Exception e) {
log.info(String.format("connect Server(IP[%s],PORT[%s]) fail!", host,port));
return null;
}
}

/**
* 发送消息
* @param msg
* @throws Exception
*/
public boolean sendMsg(Object msg) throws Exception {
if(channel!=null){
channel.writeAndFlush(msg).sync();
log.debug("msg flush success");
return true;
}else{
log.debug("msg flush fail,connect is null");
return false;
}
}

/**
* 连接断开
* 并且释放资源
* @return
*/
public boolean disconnectConnect(){
//channel.close().awaitUninterruptibly();
Future<?> future =group.shutdownGracefully();//shutdownGracefully释放所有资源,并且关闭所有当前正在使用的channel
future.syncUninterruptibly();
return true;
}
}


客户端Handler

package com.black.blackrpc.communication.netty.tcp.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.black.blackrpc.code.base.entity.RpcResponse;
import com.black.blackrpc.code.cache.SyncFutureCatch;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.code.synchronization.SyncFuture;
import com.black.blackrpc.communication.message.HeadAnalysis;
import com.black.blackrpc.serialization.SerializationIntegrate;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TcpClientHandler extends SimpleChannelInboundHandler<Object>{
private static final Logger log = LoggerFactory.getLogger(TcpClientHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
byte[] head_data=(byte[])msg;
HeadAnalysis headAnalysis =new HeadAnalysis(head_data);
if(head_data.length!=headAnalysis.getLength()+8){
throw new Exception("TcpClien Receive Data Length is not Agreement!!!");
}
byte[] data=new byte[headAnalysis.getLength()];
System.arraycopy(head_data,8,data,0,data.length);
RpcResponse rpcResponse=null;
try {
rpcResponse= SerializationIntegrate.deserialize(data, RpcResponse.class, SerializationTypeEnum.getSerializationTypeEnum(headAnalysis.getSerializationType()));
} catch (Exception e) {
e.printStackTrace();
}
log.debug("Tcp Client receive head:"+headAnalysis+"Tcp Client receive data:" +rpcResponse);
SyncFuture<RpcResponse> syncFuture= SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());
if(syncFuture!=null){
syncFuture.setResponse(rpcResponse);
}
}
}


Spring Bean预先初始化

package com.black.blackrpc.code.spring.beanStructure;

import java.lang.reflect.Field;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;

import com.black.blackrpc.code.annotation.InvokingService;
import com.black.blackrpc.code.annotation.LoadBalanceStrategy;
import com.black.blackrpc.code.annotation.SerializationType;
import com.black.blackrpc.code.annotation.TimeOut;
import com.black.blackrpc.code.cache.BeanProxyCache;
import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.code.proxy.JdkProxy;
import com.black.blackrpc.code.service.ServiceCodeInit;
import com.black.blackrpc.common.configure.BreakRpcConfigure;
import com.black.blackrpc.common.constant.SyncFutureConstant;
import com.black.blackrpc.common.constant.ZkConstant;
import com.black.blackrpc.common.util.ListUtil;
import com.black.blackrpc.common.util.MapUtil;
/**
* BeanPostProcessor 实现
* @author v_wangshiyu
*
*/
@Component
public class MyBeanPostProcessor implements BeanPostProcessor {
@Autowired
private BreakRpcConfigure breakRpcConfigure;

private static final Logger log = LoggerFactory.getLogger(ServiceCodeInit.class);
public MyBeanPostProcessor() {
super();
/*********初始化代理类缓存********/
if(MapUtil.isEmpty(BeanProxyCache.beanProxyMap)){
BeanProxyCache.beanProxyMapInit();
}
/*********初始化代理类缓存********/
log.info("BeanPostProcessor实现类构造器初始化完成!");
}

// Bean 实例化之前进行的处理
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {

return bean;
}

// Bean 实例化之后进行的处理
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
Map<String,Object> beanProxyMap= BeanProxyCache.beanProxyMap;
if(breakRpcConfigure!=null&&breakRpcConfigure.getClientOpen()){
Field[] fields= bean.getClass().getDeclaredFields();//获取所有属性
if(ListUtil.isNotEmpty(fields)){
for(Field field:fields){
InvokingService invokingService=field.getAnnotation(InvokingService.class);
if(invokingService!=null){
Object jdkMethodProxy= beanProxyMap.get(beanName+ZkConstant.DELIMITED_MARKER+field.getName());
if(jdkMethodProxy!=null){
try {
field.setAccessible(true);
field.set(bean,jdkMethodProxy);//注入动态代理
field.setAccessible(false);
} catch (Exception e) {
e.printStackTrace();
}
}else{
SerializationType serializationType = field.getAnnotation(SerializationType.class);
TimeOut timeOut = field.getAnnotation(TimeOut.class);
LoadBalanceStrategy loadBalanceStrategy = field.getAnnotation(LoadBalanceStrategy.class);
String serviceName ="".equals(invokingService.value())?field.getType().getName():invokingService.value();
SerializationTypeEnum serializationType_=serializationType==null?SerializationTypeEnum.Protostuff:serializationType.value();
long timeOut_=timeOut==null?SyncFutureConstant.TimeOut:timeOut.value();
if(timeOut!=null){
if(timeOut.value()>SyncFutureConstant.maxTimeOut){//修改系统内最大的超时时间
SyncFutureConstant.maxTimeOut=timeOut.value();
}
}
LoadBalanceStrategyEnum loadBalanceStrategy_=loadBalanceStrategy==null?LoadBalanceStrategyEnum.Polling:loadBalanceStrategy.value();

Object jdkMethodProxy_= JdkProxy.getInstance(field.getType(),serviceName,serializationType_,timeOut_,loadBalanceStrategy_);
try {
field.setAccessible(true);
field.set(bean,jdkMethodProxy_);//注入动态代理
field.setAccessible(false);
} catch (Exception e) {
e.printStackTrace();
}
beanProxyMap.put(beanName+ZkConstant.DELIMITED_MARKER+field.getName(), jdkMethodProxy_);
}
}
}
}
}else if(breakRpcConfigure!=null&&!breakRpcConfigure.getClientOpen()){//回收beanProxyMap
BeanProxyCache.beanProxyMap=null;
}
return bean;
}
}


JDK动态代理Proxy

package com.black.blackrpc.code.proxy;

import java.lang.reflect.Proxy;

import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
/**
* jdk创建代理
* java自带的动态代理方法
* @author wangshiyu
*/
public class JdkProxy {

public static Object getInstance(Class<?> cls,String serviceName,SerializationTypeEnum serializationType,long timeOut,LoadBalanceStrategyEnum LoadBalanceStrategy){
JdkMethodProxy invocationHandler = new JdkMethodProxy();
invocationHandler.setServiceName(serviceName);
invocationHandler.setSerializationType(serializationType);
invocationHandler.setTimeOut(timeOut);
invocationHandler.setLoadBalanceStrategy(LoadBalanceStrategy);
Object newProxyInstance = Proxy.newProxyInstance(
cls.getClassLoader(),
new Class[] { cls },
invocationHandler);
return (Object)newProxyInstance;
}
}


package com.black.blackrpc.code.proxy;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.code.invoking.RemoteInvoking;

public class JdkMethodProxy implements InvocationHandler {
/**
* 服务名称
*/
private String serviceName;
/**
* 序列化方式
*/
private SerializationTypeEnum serializationType;
/**
* 超时时间
*/
private long timeOut;
/**
* 负载均衡策略
*/
private LoadBalanceStrategyEnum loadBalanceStrategy;

@Override
public Object invoke(Object proxy, Method method, Object[] parameters)  throws Throwable {
//如果传进来是一个已实现的具体类
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, parameters);
} catch (Throwable t) {
t.printStackTrace();
}
//如果传进来的是一个接口
} else {
//实现接口的核心方法
return RemoteInvoking.invoking(serviceName, serializationType, timeOut,loadBalanceStrategy,method, parameters);
}
return null;
}

public long getTimeOut() {
return timeOut;
}

public void setTimeOut(long timeOut) {
this.timeOut = timeOut;
}

public String getServiceName() {
return serviceName;
}

public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}

public SerializationTypeEnum getSerializationType() {
return serializationType;
}

public void setSerializationType(SerializationTypeEnum serializationType) {
this.serializationType = serializationType;
}

public LoadBalanceStrategyEnum getLoadBalanceStrategy() {
return loadBalanceStrategy;
}

public void setLoadBalanceStrategy(LoadBalanceStrategyEnum loadBalanceStrategy) {
this.loadBalanceStrategy = loadBalanceStrategy;
}

}


客户端核心ClientCodeInit

package com.black.blackrpc.code.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import com.black.blackrpc.code.cache.NettyConnectCache;
import com.black.blackrpc.code.cache.ObjectCache;
import com.black.blackrpc.code.cache.SyncFutureCatch;
import com.black.blackrpc.common.configure.BreakRpcConfigure;
import com.black.blackrpc.common.util.StringUtil;
import com.black.blackrpc.zk.ZooKeeperOperation;
/**
* 客户端端核心初始化
* @author v_wangshiyu
*
*/
@Component
@Order(3)//使其在上下文监听之和服务注册之后加载
public class ClientCodeInit  implements ApplicationListener<ContextRefreshedEvent>  {
private static final Logger log = LoggerFactory.getLogger(ClientCodeInit.class);
@Autowired
private BreakRpcConfigure breakRpcConfigure;
/***
* 初始化操作涉及:
* 连接 zk,
* 发现远程服务。
* 创建代理
* 执行时间:上下文监听之后加载
* @throws Exception
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if(!breakRpcConfigure.getClientOpen()) return;
log.info("Client Code Init...");
/************连接zookeepre 并同步记录**************/
String zkAddress= breakRpcConfigure.getZkAddress();
if(StringUtil.isNotEmpty(zkAddress)){
if(ObjectCache.zooKeeperOperation==null){
ZooKeeperOperation zo =new ZooKeeperOperation(zkAddress);
zo.connectServer();
ObjectCache.zooKeeperOperation =zo;
}
ObjectCache.zooKeeperOperation.syncNodes();//同步Nodes
}else{
throw new RuntimeException("zookeeper address is null!");
}
/************连接zookeepre 并同步记录**************/

/************初始化netty连接缓存**************/
NettyConnectCache.tcpConnectCacheInit();
/************初始化netty连接缓存**************/

/************初始化同步结果缓存**************/
SyncFutureCatch.syncFutureMapInit();
/************初始化同步结果缓存**************/

log.info("Client Code Init Success!");
}
}


客户端启动之后,会直接去看下缓存是否已经存在zookeeper实例,如果没有尝试启动zookeeper并缓存。然后初始化NettyConnectCache和SyncFutureCatch 两个缓存。NettyConnectCache用于存储当前的客户端连接,SyncFutureCatch 用于同步请求结果。

调用

举个栗子:

package com.black.blackrpc.test;

import java.util.List;

public interface HelloWord {

public String han();

public void zijihan();

public String chang(String a,int b,List<String> c);

public String chang(String a,double b,List<String> c);
}


package com.black.blackrpc.test.invoking;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.black.blackrpc.code.annotation.InvokingService;
import com.black.blackrpc.code.annotation.SerializationType;
import com.black.blackrpc.code.annotation.TimeOut;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.test.HelloWord;

@Component
@Scope("prototype")
public class HelloWordInvoking {
@InvokingService
@TimeOut(60000)
@SerializationType(SerializationTypeEnum.Protostuff)
private HelloWord helloWord;

@InvokingService
@TimeOut(60000)
@SerializationType(SerializationTypeEnum.Protostuff)
private HelloWord helloWord1;

//@InvokingService("helloWord2")
@TimeOut(60000)
private HelloWord helloWord2;

public void run(){
helloWord.zijihan();
System.err.println("helloWord1:"+helloWord1.han());
//System.err.println("helloWord2:"+helloWord2.han());
}

}


一个HelloWord 接口类和一个HelloWordInvoking 类,HelloWordInvoking对 HelloWord 进行了引用。这里因为HelloWord 没有具体的实现,所有没有对齐用@Autowired注解标记注入,更重要的是另外一个问题,我们是一个远程调用,存在本地实现的话,还运程调用个毛。

这里当HelloWordInvoking 构建的时候,会进入MyBeanPostProcessor的bean 初始化的后置处理postProcessAfterInitialization 方法。该方法会循环需要构建的bean是否存在InvokingService标记的属性。如果存在,回去BeanProxyCache中查看是否存在对应的代理实例,如果有,反射设置给当前属性。如果不存在,构建动态代理,并存储到BeanProxyCache缓存中。

当调用
helloWord.zijihan();
这个接口的方法的时候。会进行如下操作。代理类会回去调用
RemoteInvoking.invoking()
方法。

package com.black.blackrpc.code.invoking;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.black.blackrpc.code.base.entity.RpcRequest;
import com.black.blackrpc.code.base.entity.RpcResponse;
import com.black.blackrpc.code.cache.SyncFutureCatch;
import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.code.synchronization.SyncFuture;
import com.black.blackrpc.common.util.ListUtil;
import com.black.blackrpc.common.util.UUIDGenerator;
import com.black.blackrpc.communication.message.Head;
import com.black.blackrpc.communication.netty.tcp.client.NettyTcpClient;
import com.black.blackrpc.communication.netty.tcp.service.TcpServerHandler;
import com.black.blackrpc.lb.LoadBalance;
import com.black.blackrpc.serialization.SerializationIntegrate;

/**
* 远程调用
* @author v_wangshiyu
*
*/
public class RemoteInvoking {
private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class);

/**
* 调用
* @param serviceName
* @param serializationType
* @param method
* @param args
* @return
* @throws Exception
*/
public static Object invoking(String serviceName,SerializationTypeEnum serializationType,long timeOut,LoadBalanceStrategyEnum loadBalanceStrategy,Method method,Object[] parameters) throws Exception{
RpcRequest rpcRequest =new RpcRequest();
rpcRequest.setServiceName(serviceName);
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(ListUtil.isEmpty(method.getParameterTypes())?null:method.getParameterTypes());//如果参数数组为空,添加一个null,有些序列化不识别空的数组
rpcRequest.setParameters(parameters);
rpcRequest.setRequestId(UUIDGenerator.generate());
/********定义报文头,组装数据*******/
byte[] data=SerializationIntegrate.serialize(rpcRequest, serializationType);
Head head =new Head(data.length,0,0,serializationType.getValue());
byte[] head_data=head.getHeadData();
System.arraycopy(data,0,head_data,8,data.length);
/********定义报文头,组装数据*******/

/*******LB*******/
NettyTcpClient nettyTcpClient= LoadBalance.getTcpConnect(serviceName,loadBalanceStrategy);
nettyTcpClient.sendMsg(head_data);
log.debug("Tcp Client send head:"+head+"Tcp Client send data:" +rpcRequest);
/*******LB*******/

SyncFuture<RpcResponse> syncFuture =new SyncFuture<RpcResponse>();
SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture);
RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS);
SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
return rpcResponse.getResult();
}
}


package com.black.blackrpc.code.synchronization;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 同步结果
* @author Administrator
*
* @param <T>
*/
public class SyncFuture<T> implements Future<T> {
// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。
private CountDownLatch latch = new CountDownLatch(1);
// 需要响应线程设置的响应结果
private T response;
// Futrue的请求时间,用于计算Future是否超时
private long beginTime = System.currentTimeMillis();
public SyncFuture() {
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
if (response != null) {
return true;
}
return false;
}
// 获取响应结果,直到有结果才返回。
@Override
public T get() throws InterruptedException {
latch.await();
return this.response;
}
// 获取响应结果,直到有结果或者超过指定时间就返回。
@Override
public T get(long timeOut, TimeUnit unit) throws InterruptedException {
if (latch.await(timeOut, unit)) {
return this.response;
}
return null;
}
// 用于设置响应结果,并且做countDown操作,通知请求线程
public void setResponse(T response) {
this.response = response;
latch.countDown();
}
public long getBeginTime() {
return beginTime;
}
}


RemoteInvoking.invoking()
会做如下操作,构建请求报文,然后调用LB获得一个NettyTcpClient 实例,同时生成一个唯一的RequestId,构建一个SyncFuture对象以key为RequestId ,value为这个对象存储到SyncFutureCatch缓存,同时将数据发送给客户端,然后当前线程被阻塞。如果超过指定时间,还未获得返回结果,反馈一个超时。指定时间内反馈了结果,客户端会根据回传的报文数据中的RequestId,获取对应SyncFuture实例,设置结果。同时一会激活被阻塞的线程。
RemoteInvoking.invoking()
获取到对应结果反馈给调用者
helloWord.zijihan();
。至此一个请求过程结束。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息