简单的分布式RPC框架 《blackRpc》 五(客户端启动及调用)
2018-01-23 11:38
369 查看
警告:本文仅仅适合初探RPC的小伙伴,对于业界老鸟的话,就不建议吐槽了。。。。
上节说到了服务端启动流程,本节说下客户端启动流程。直接上代码。
netty客户端:
客户端Handler
Spring Bean预先初始化
JDK动态代理Proxy
客户端核心ClientCodeInit
客户端启动之后,会直接去看下缓存是否已经存在zookeeper实例,如果没有尝试启动zookeeper并缓存。然后初始化NettyConnectCache和SyncFutureCatch 两个缓存。NettyConnectCache用于存储当前的客户端连接,SyncFutureCatch 用于同步请求结果。
调用
举个栗子:
一个HelloWord 接口类和一个HelloWordInvoking 类,HelloWordInvoking对 HelloWord 进行了引用。这里因为HelloWord 没有具体的实现,所有没有对齐用@Autowired注解标记注入,更重要的是另外一个问题,我们是一个远程调用,存在本地实现的话,还运程调用个毛。
这里当HelloWordInvoking 构建的时候,会进入MyBeanPostProcessor的bean 初始化的后置处理postProcessAfterInitialization 方法。该方法会循环需要构建的bean是否存在InvokingService标记的属性。如果存在,回去BeanProxyCache中查看是否存在对应的代理实例,如果有,反射设置给当前属性。如果不存在,构建动态代理,并存储到BeanProxyCache缓存中。
当调用
上节说到了服务端启动流程,本节说下客户端启动流程。直接上代码。
netty客户端:
package com.black.blackrpc.communication.netty.tcp.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.util.concurrent.Future; /** * netty tcp 客户端 * @author v_wangshiyu * */ public class NettyTcpClient { private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class); private String host; private int port; private Bootstrap bootstrap; private Channel channel; private EventLoopGroup group; public NettyTcpClient(String host,int port){ bootstrap=getBootstrap(); channel= getChannel(host,port); this.host=host; this.port=port; } public String getHost() { return host; } public int getPort() { return port; } /** * 初始化Bootstrap * @return */ public final Bootstrap getBootstrap(){ group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // pipeline.addLast(new Encoder()); // pipeline.addLast(new Decoder()); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new ByteArrayDecoder()); pipeline.addLast("encoder", new ByteArrayEncoder()); pipeline.addLast("handler", new TcpClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; } /** * 连接,获取Channel * @param host * @param port * @return */ public final Channel getChannel(String host,int port){ Channel channel = null; try { channel = bootstrap.connect(host, port).sync().channel(); return channel; } catch (Exception e) { log.info(String.format("connect Server(IP[%s],PORT[%s]) fail!", host,port)); return null; } } /** * 发送消息 * @param msg * @throws Exception */ public boolean sendMsg(Object msg) throws Exception { if(channel!=null){ channel.writeAndFlush(msg).sync(); log.debug("msg flush success"); return true; }else{ log.debug("msg flush fail,connect is null"); return false; } } /** * 连接断开 * 并且释放资源 * @return */ public boolean disconnectConnect(){ //channel.close().awaitUninterruptibly(); Future<?> future =group.shutdownGracefully();//shutdownGracefully释放所有资源,并且关闭所有当前正在使用的channel future.syncUninterruptibly(); return true; } }
客户端Handler
package com.black.blackrpc.communication.netty.tcp.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.black.blackrpc.code.base.entity.RpcResponse; import com.black.blackrpc.code.cache.SyncFutureCatch; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.code.synchronization.SyncFuture; import com.black.blackrpc.communication.message.HeadAnalysis; import com.black.blackrpc.serialization.SerializationIntegrate; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TcpClientHandler extends SimpleChannelInboundHandler<Object>{ private static final Logger log = LoggerFactory.getLogger(TcpClientHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] head_data=(byte[])msg; HeadAnalysis headAnalysis =new HeadAnalysis(head_data); if(head_data.length!=headAnalysis.getLength()+8){ throw new Exception("TcpClien Receive Data Length is not Agreement!!!"); } byte[] data=new byte[headAnalysis.getLength()]; System.arraycopy(head_data,8,data,0,data.length); RpcResponse rpcResponse=null; try { rpcResponse= SerializationIntegrate.deserialize(data, RpcResponse.class, SerializationTypeEnum.getSerializationTypeEnum(headAnalysis.getSerializationType())); } catch (Exception e) { e.printStackTrace(); } log.debug("Tcp Client receive head:"+headAnalysis+"Tcp Client receive data:" +rpcResponse); SyncFuture<RpcResponse> syncFuture= SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId()); if(syncFuture!=null){ syncFuture.setResponse(rpcResponse); } } }
Spring Bean预先初始化
package com.black.blackrpc.code.spring.beanStructure; import java.lang.reflect.Field; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.stereotype.Component; import com.black.blackrpc.code.annotation.InvokingService; import com.black.blackrpc.code.annotation.LoadBalanceStrategy; import com.black.blackrpc.code.annotation.SerializationType; import com.black.blackrpc.code.annotation.TimeOut; import com.black.blackrpc.code.cache.BeanProxyCache; import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.code.proxy.JdkProxy; import com.black.blackrpc.code.service.ServiceCodeInit; import com.black.blackrpc.common.configure.BreakRpcConfigure; import com.black.blackrpc.common.constant.SyncFutureConstant; import com.black.blackrpc.common.constant.ZkConstant; import com.black.blackrpc.common.util.ListUtil; import com.black.blackrpc.common.util.MapUtil; /** * BeanPostProcessor 实现 * @author v_wangshiyu * */ @Component public class MyBeanPostProcessor implements BeanPostProcessor { @Autowired private BreakRpcConfigure breakRpcConfigure; private static final Logger log = LoggerFactory.getLogger(ServiceCodeInit.class); public MyBeanPostProcessor() { super(); /*********初始化代理类缓存********/ if(MapUtil.isEmpty(BeanProxyCache.beanProxyMap)){ BeanProxyCache.beanProxyMapInit(); } /*********初始化代理类缓存********/ log.info("BeanPostProcessor实现类构造器初始化完成!"); } // Bean 实例化之前进行的处理 public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } // Bean 实例化之后进行的处理 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Map<String,Object> beanProxyMap= BeanProxyCache.beanProxyMap; if(breakRpcConfigure!=null&&breakRpcConfigure.getClientOpen()){ Field[] fields= bean.getClass().getDeclaredFields();//获取所有属性 if(ListUtil.isNotEmpty(fields)){ for(Field field:fields){ InvokingService invokingService=field.getAnnotation(InvokingService.class); if(invokingService!=null){ Object jdkMethodProxy= beanProxyMap.get(beanName+ZkConstant.DELIMITED_MARKER+field.getName()); if(jdkMethodProxy!=null){ try { field.setAccessible(true); field.set(bean,jdkMethodProxy);//注入动态代理 field.setAccessible(false); } catch (Exception e) { e.printStackTrace(); } }else{ SerializationType serializationType = field.getAnnotation(SerializationType.class); TimeOut timeOut = field.getAnnotation(TimeOut.class); LoadBalanceStrategy loadBalanceStrategy = field.getAnnotation(LoadBalanceStrategy.class); String serviceName ="".equals(invokingService.value())?field.getType().getName():invokingService.value(); SerializationTypeEnum serializationType_=serializationType==null?SerializationTypeEnum.Protostuff:serializationType.value(); long timeOut_=timeOut==null?SyncFutureConstant.TimeOut:timeOut.value(); if(timeOut!=null){ if(timeOut.value()>SyncFutureConstant.maxTimeOut){//修改系统内最大的超时时间 SyncFutureConstant.maxTimeOut=timeOut.value(); } } LoadBalanceStrategyEnum loadBalanceStrategy_=loadBalanceStrategy==null?LoadBalanceStrategyEnum.Polling:loadBalanceStrategy.value(); Object jdkMethodProxy_= JdkProxy.getInstance(field.getType(),serviceName,serializationType_,timeOut_,loadBalanceStrategy_); try { field.setAccessible(true); field.set(bean,jdkMethodProxy_);//注入动态代理 field.setAccessible(false); } catch (Exception e) { e.printStackTrace(); } beanProxyMap.put(beanName+ZkConstant.DELIMITED_MARKER+field.getName(), jdkMethodProxy_); } } } } }else if(breakRpcConfigure!=null&&!breakRpcConfigure.getClientOpen()){//回收beanProxyMap BeanProxyCache.beanProxyMap=null; } return bean; } }
JDK动态代理Proxy
package com.black.blackrpc.code.proxy; import java.lang.reflect.Proxy; import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum; import com.black.blackrpc.code.enums.SerializationTypeEnum; /** * jdk创建代理 * java自带的动态代理方法 * @author wangshiyu */ public class JdkProxy { public static Object getInstance(Class<?> cls,String serviceName,SerializationTypeEnum serializationType,long timeOut,LoadBalanceStrategyEnum LoadBalanceStrategy){ JdkMethodProxy invocationHandler = new JdkMethodProxy(); invocationHandler.setServiceName(serviceName); invocationHandler.setSerializationType(serializationType); invocationHandler.setTimeOut(timeOut); invocationHandler.setLoadBalanceStrategy(LoadBalanceStrategy); Object newProxyInstance = Proxy.newProxyInstance( cls.getClassLoader(), new Class[] { cls }, invocationHandler); return (Object)newProxyInstance; } }
package com.black.blackrpc.code.proxy; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.code.invoking.RemoteInvoking; public class JdkMethodProxy implements InvocationHandler { /** * 服务名称 */ private String serviceName; /** * 序列化方式 */ private SerializationTypeEnum serializationType; /** * 超时时间 */ private long timeOut; /** * 负载均衡策略 */ private LoadBalanceStrategyEnum loadBalanceStrategy; @Override public Object invoke(Object proxy, Method method, Object[] parameters) throws Throwable { //如果传进来是一个已实现的具体类 if (Object.class.equals(method.getDeclaringClass())) { try { return method.invoke(this, parameters); } catch (Throwable t) { t.printStackTrace(); } //如果传进来的是一个接口 } else { //实现接口的核心方法 return RemoteInvoking.invoking(serviceName, serializationType, timeOut,loadBalanceStrategy,method, parameters); } return null; } public long getTimeOut() { return timeOut; } public void setTimeOut(long timeOut) { this.timeOut = timeOut; } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; } public SerializationTypeEnum getSerializationType() { return serializationType; } public void setSerializationType(SerializationTypeEnum serializationType) { this.serializationType = serializationType; } public LoadBalanceStrategyEnum getLoadBalanceStrategy() { return loadBalanceStrategy; } public void setLoadBalanceStrategy(LoadBalanceStrategyEnum loadBalanceStrategy) { this.loadBalanceStrategy = loadBalanceStrategy; } }
客户端核心ClientCodeInit
package com.black.blackrpc.code.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import com.black.blackrpc.code.cache.NettyConnectCache; import com.black.blackrpc.code.cache.ObjectCache; import com.black.blackrpc.code.cache.SyncFutureCatch; import com.black.blackrpc.common.configure.BreakRpcConfigure; import com.black.blackrpc.common.util.StringUtil; import com.black.blackrpc.zk.ZooKeeperOperation; /** * 客户端端核心初始化 * @author v_wangshiyu * */ @Component @Order(3)//使其在上下文监听之和服务注册之后加载 public class ClientCodeInit implements ApplicationListener<ContextRefreshedEvent> { private static final Logger log = LoggerFactory.getLogger(ClientCodeInit.class); @Autowired private BreakRpcConfigure breakRpcConfigure; /*** * 初始化操作涉及: * 连接 zk, * 发现远程服务。 * 创建代理 * 执行时间:上下文监听之后加载 * @throws Exception */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(!breakRpcConfigure.getClientOpen()) return; log.info("Client Code Init..."); /************连接zookeepre 并同步记录**************/ String zkAddress= breakRpcConfigure.getZkAddress(); if(StringUtil.isNotEmpty(zkAddress)){ if(ObjectCache.zooKeeperOperation==null){ ZooKeeperOperation zo =new ZooKeeperOperation(zkAddress); zo.connectServer(); ObjectCache.zooKeeperOperation =zo; } ObjectCache.zooKeeperOperation.syncNodes();//同步Nodes }else{ throw new RuntimeException("zookeeper address is null!"); } /************连接zookeepre 并同步记录**************/ /************初始化netty连接缓存**************/ NettyConnectCache.tcpConnectCacheInit(); /************初始化netty连接缓存**************/ /************初始化同步结果缓存**************/ SyncFutureCatch.syncFutureMapInit(); /************初始化同步结果缓存**************/ log.info("Client Code Init Success!"); } }
客户端启动之后,会直接去看下缓存是否已经存在zookeeper实例,如果没有尝试启动zookeeper并缓存。然后初始化NettyConnectCache和SyncFutureCatch 两个缓存。NettyConnectCache用于存储当前的客户端连接,SyncFutureCatch 用于同步请求结果。
调用
举个栗子:
package com.black.blackrpc.test; import java.util.List; public interface HelloWord { public String han(); public void zijihan(); public String chang(String a,int b,List<String> c); public String chang(String a,double b,List<String> c); }
package com.black.blackrpc.test.invoking; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.black.blackrpc.code.annotation.InvokingService; import com.black.blackrpc.code.annotation.SerializationType; import com.black.blackrpc.code.annotation.TimeOut; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.test.HelloWord; @Component @Scope("prototype") public class HelloWordInvoking { @InvokingService @TimeOut(60000) @SerializationType(SerializationTypeEnum.Protostuff) private HelloWord helloWord; @InvokingService @TimeOut(60000) @SerializationType(SerializationTypeEnum.Protostuff) private HelloWord helloWord1; //@InvokingService("helloWord2") @TimeOut(60000) private HelloWord helloWord2; public void run(){ helloWord.zijihan(); System.err.println("helloWord1:"+helloWord1.han()); //System.err.println("helloWord2:"+helloWord2.han()); } }
一个HelloWord 接口类和一个HelloWordInvoking 类,HelloWordInvoking对 HelloWord 进行了引用。这里因为HelloWord 没有具体的实现,所有没有对齐用@Autowired注解标记注入,更重要的是另外一个问题,我们是一个远程调用,存在本地实现的话,还运程调用个毛。
这里当HelloWordInvoking 构建的时候,会进入MyBeanPostProcessor的bean 初始化的后置处理postProcessAfterInitialization 方法。该方法会循环需要构建的bean是否存在InvokingService标记的属性。如果存在,回去BeanProxyCache中查看是否存在对应的代理实例,如果有,反射设置给当前属性。如果不存在,构建动态代理,并存储到BeanProxyCache缓存中。
当调用
helloWord.zijihan();这个接口的方法的时候。会进行如下操作。代理类会回去调用
RemoteInvoking.invoking()方法。
package com.black.blackrpc.code.invoking; import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.black.blackrpc.code.base.entity.RpcRequest; import com.black.blackrpc.code.base.entity.RpcResponse; import com.black.blackrpc.code.cache.SyncFutureCatch; import com.black.blackrpc.code.enums.LoadBalanceStrategyEnum; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.code.synchronization.SyncFuture; import com.black.blackrpc.common.util.ListUtil; import com.black.blackrpc.common.util.UUIDGenerator; import com.black.blackrpc.communication.message.Head; import com.black.blackrpc.communication.netty.tcp.client.NettyTcpClient; import com.black.blackrpc.communication.netty.tcp.service.TcpServerHandler; import com.black.blackrpc.lb.LoadBalance; import com.black.blackrpc.serialization.SerializationIntegrate; /** * 远程调用 * @author v_wangshiyu * */ public class RemoteInvoking { private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class); /** * 调用 * @param serviceName * @param serializationType * @param method * @param args * @return * @throws Exception */ public static Object invoking(String serviceName,SerializationTypeEnum serializationType,long timeOut,LoadBalanceStrategyEnum loadBalanceStrategy,Method method,Object[] parameters) throws Exception{ RpcRequest rpcRequest =new RpcRequest(); rpcRequest.setServiceName(serviceName); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameterTypes(ListUtil.isEmpty(method.getParameterTypes())?null:method.getParameterTypes());//如果参数数组为空,添加一个null,有些序列化不识别空的数组 rpcRequest.setParameters(parameters); rpcRequest.setRequestId(UUIDGenerator.generate()); /********定义报文头,组装数据*******/ byte[] data=SerializationIntegrate.serialize(rpcRequest, serializationType); Head head =new Head(data.length,0,0,serializationType.getValue()); byte[] head_data=head.getHeadData(); System.arraycopy(data,0,head_data,8,data.length); /********定义报文头,组装数据*******/ /*******LB*******/ NettyTcpClient nettyTcpClient= LoadBalance.getTcpConnect(serviceName,loadBalanceStrategy); nettyTcpClient.sendMsg(head_data); log.debug("Tcp Client send head:"+head+"Tcp Client send data:" +rpcRequest); /*******LB*******/ SyncFuture<RpcResponse> syncFuture =new SyncFuture<RpcResponse>(); SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(), syncFuture); RpcResponse rpcResponse= syncFuture.get(timeOut,TimeUnit.MILLISECONDS); SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId()); return rpcResponse.getResult(); } }
package com.black.blackrpc.code.synchronization; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 同步结果 * @author Administrator * * @param <T> */ public class SyncFuture<T> implements Future<T> { // 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。 private CountDownLatch latch = new CountDownLatch(1); // 需要响应线程设置的响应结果 private T response; // Futrue的请求时间,用于计算Future是否超时 private long beginTime = System.currentTimeMillis(); public SyncFuture() { } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { if (response != null) { return true; } return false; } // 获取响应结果,直到有结果才返回。 @Override public T get() throws InterruptedException { latch.await(); return this.response; } // 获取响应结果,直到有结果或者超过指定时间就返回。 @Override public T get(long timeOut, TimeUnit unit) throws InterruptedException { if (latch.await(timeOut, unit)) { return this.response; } return null; } // 用于设置响应结果,并且做countDown操作,通知请求线程 public void setResponse(T response) { this.response = response; latch.countDown(); } public long getBeginTime() { return beginTime; } }
RemoteInvoking.invoking()会做如下操作,构建请求报文,然后调用LB获得一个NettyTcpClient 实例,同时生成一个唯一的RequestId,构建一个SyncFuture对象以key为RequestId ,value为这个对象存储到SyncFutureCatch缓存,同时将数据发送给客户端,然后当前线程被阻塞。如果超过指定时间,还未获得返回结果,反馈一个超时。指定时间内反馈了结果,客户端会根据回传的报文数据中的RequestId,获取对应SyncFuture实例,设置结果。同时一会激活被阻塞的线程。
RemoteInvoking.invoking()获取到对应结果反馈给调用者
helloWord.zijihan();。至此一个请求过程结束。
相关文章推荐
- 简单的分布式RPC框架 《blackRpc》 四(服务端启动流程)
- 简单的分布式RPC框架 《blackRpc》 一
- 简单的分布式RPC框架 《blackRpc》 二
- 简单的分布式RPC框架 《blackRpc》 三
- 基于Netty的RPC简单框架实现(一):RPC客户端
- RPC框架/高性能远程同步调用框架/分布式服务框架
- 【远程调用框架】如何实现一个简单的RPC框架(四)优化二:改变底层通信框架
- Zookeeper实现简单的分布式RPC框架
- 一个简单RPC框架是如何炼成的(III)——实现带参数的RPC调用
- 【远程调用框架】如何实现一个简单的RPC框架(一)想法与设计
- Java实现一个简单的RPC框架(七) 反射机制调用类函数
- Java实现一个简单的RPC框架(三) 带参数的本地调用
- 分布式远程服务调用(RPC)框架
- Zookeeper实现简单的分布式RPC框架
- 【远程调用框架】如何实现一个简单的RPC框架(二)实现与使用
- 【远程调用框架】如何实现一个简单的RPC框架(五)优化三:软负载中心设计与实现
- 轻量级分布式 RPC 框架 远程调用
- Java实现一个简单的RPC框架(一) 本地调用
- 分布式学习笔记1通过Java自己实现简单的HTTP RPC框架
- 【远程调用框架】如何实现一个简单的RPC框架(三)优化一:利用动态代理改变用户服务调用方式