简单的分布式RPC框架 《blackRpc》 四(服务端启动流程)
2018-01-23 10:38
330 查看
上一节说了blackRpc的组成,这一节我们直接来撸代码。
先帖上项目gitHup地址:https://github.com/wangshiyu/blackRpc
先帖上pom.xml文件的引用
引入的java包很简单,三种序列化fastjson、msgpack、protostuff ,万金油spring,通讯框架netty , 分布式应用程序协调服务 zookeeper 以及日志包slf4j,下面先帖上对应组件。
netty服务端:
服务端Handler
ZooKeeper 操作
Spring 上下文监听
rpc服务端核心
这里先简单介绍一下,如果想跑起来,请去git上下载源码。
1.当项目跑起来的时候,首先会进入ContextRefreshedListener 这个上下文监听,监听里的onApplicationEvent方法会遍历spring上下文中所有的bean,检测各个bean中是否包含@RegisterService,如果包含,说明这个是一个需要发布的服务,获取对应信息(别名,权值,序列化方式)存储到ProvideServiceCache缓存当中。这里如果为定义别名,会以该类实现的接口的名称为别名,存储进入缓存当中。
2.ContextRefreshedListener 执行完之后,会去调用ServiceCodeInit(服务端核心)类的onApplicationEvent方法。这里根据配置,和ProvideServiceCache缓存情况判断是否初始化ZooKeeperOperation(zookeeper操作) 和NettyTcpService(netty服务端)。一旦ZooKeeperOperation初始化完成,回去发布ProvideServiceCache缓存里的服务。
先帖上项目gitHup地址:https://github.com/wangshiyu/blackRpc
先帖上pom.xml文件的引用
<dependencies> <!-- JUnit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!-- SLF4J --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <!-- Spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.9.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.3.9.RELEASE</version> <scope>test</scope> </dependency> <!-- Netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.0.Final</version> </dependency> <!-- Protostuff --> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-core</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>io.protostuff</groupId> <artifactId>protostuff-runtime</artifactId> <version>1.4.0</version> </dependency> <!-- msgpack--> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack-core</artifactId> <version>0.8.13</version> </dependency> <!--msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>jackson-dataformat-msgpack</artifactId> <version>0.8.13</version> </dependency> <!-- ZooKeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.0-alpha</version> </dependency> <!-- Json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.6</version> </dependency> </dependencies>
引入的java包很简单,三种序列化fastjson、msgpack、protostuff ,万金油spring,通讯框架netty , 分布式应用程序协调服务 zookeeper 以及日志包slf4j,下面先帖上对应组件。
netty服务端:
package com.black.blackrpc.communication.netty.tcp.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /*** * netty tcp 服务端 * @author v_wangshiyu * */ public class NettyTcpService { private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class); private String host; private int port; public NettyTcpService(String host,int port) throws Exception{ this.host=host; this.port=port; } /**用于分配处理业务线程的线程组个数 */ private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认 /** 业务出现线程大小*/ private static final int BIZTHREADSIZE = 4; /* * NioEventLoopGroup实际上就是个线程, * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件, * 每一个NioEventLoop负责处理m个Channel, * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel */ private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE); public void start() throws Exception { log.info("Netty Tcp Service Run..."); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChanne 4000 l(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new ByteArrayDecoder()); pipeline.addLast("encoder", new ByteArrayEncoder()); // pipeline.addLast(new Encoder()); // pipeline.addLast(new Decoder()); pipeline.addLast(new TcpServerHandler()); } }); b.bind(host, port).sync(); log.info("Netty Tcp Service Success!"); } /** * 停止服务并释放资源 */ public void shutdown() { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
服务端Handler
package com.black.blackrpc.communication.netty.tcp.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.black.blackrpc.code.base.entity.ProvideServiceBase; import com.black.blackrpc.code.base.entity.RpcRequest; import com.black.blackrpc.code.base.entity.RpcResponse; import com.black.blackrpc.code.cache.ProvideServiceCache; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.code.invoking.BeanInvoking; import com.black.blackrpc.common.constant.ErrorConstant; import com.black.blackrpc.communication.message.Head; import com.black.blackrpc.communication.message.HeadAnalysis; import com.black.blackrpc.serialization.SerializationIntegrate; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class TcpServerHandler extends SimpleChannelInboundHandler<Object>{ private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse =new RpcResponse(); byte[] head_data=(byte[])msg; HeadAnalysis headAnalysis =new HeadAnalysis(head_data); if(head_data.length!=headAnalysis.getLength()+8){ throw new Exception("TcpServer Receive Data Length is not Agreement!!!"); } byte[] data=new byte[headAnalysis.getLength()]; System.arraycopy(head_data,8,data,0,data.length); SerializationTypeEnum serializationType= SerializationTypeEnum.getSerializationTypeEnum(headAnalysis.getSerializationType()); RpcRequest rpcRequest= SerializationIntegrate.deserialize(data, RpcRequest.class,serializationType); //如果参数数组为null,添加一个空的数组,有些序列化不识别空的数组 rpcRequest.setParameterTypes(rpcRequest.getParameterTypes()==null?new Class<?>[0]:rpcRequest.getParameterTypes()); log.debug("Tcp Server receive head:"+headAnalysis+"Tcp Server receive data:" +rpcRequest); ProvideServiceBase provideServiceBase= ProvideServiceCache.provideServiceMap.get(rpcRequest.getServiceName()); rpcResponse.setRequestId(rpcRequest.getRequestId()); if(provideServiceBase!=null){ rpcResponse.setResult(BeanInvoking.invoking(provideServiceBase.getBean(), rpcRequest.getMethodName(), rpcRequest.getParameterTypes(), rpcRequest.getParameters())); }else{ rpcResponse.setError(ErrorConstant.Servicer222); } /********定义报文头,组装数据*******/ byte[] response_data=SerializationIntegrate.serialize(rpcResponse, serializationType); Head response_head =new Head(response_data.length,0,0,serializationType.getValue()); byte[] response_head_data=response_head.getHeadData(); System.arraycopy(response_data,0,response_head_data,8,response_data.length); /********定义报文头,组装数据*******/ ctx.channel().writeAndFlush(response_head_data); log.debug("Tcp Server send head:"+response_head+"Tcp Server send data:" +rpcResponse); } }
ZooKeeper 操作
package com.black.blackrpc.zk; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.black.blackrpc.code.cache.InvokingServiceCache; import com.black.blackrpc.common.constant.ZkConstant; import com.black.blackrpc.common.util.MapUtil; /** * ZooKeeper 操作 * wangshiyu */ public class ZooKeeperOperation { private static final Logger log = LoggerFactory.getLogger(ZooKeeperOperation.class); private CountDownLatch latch = new CountDownLatch(1); private ZooKeeper zk; private String zkAddress; public ZooKeeperOperation(String zkAddress) { this.zkAddress = zkAddress; } /** * 连接服务器 * @return */ public boolean connectServer() { try { zk = new ZooKeeper(zkAddress, ZkConstant.ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (IOException e) { log.error(e.toString()); return false; }catch (InterruptedException ex){ log.error(ex.toString()); return false; } return true; } /** * 添加root节点 * @return */ public boolean AddRootNode(){ try { Stat s = zk.exists(ZkConstant.ZK_RPC_DATA_PATH, false); if (s == null) { //同步创建临时持久节点 zk.create(ZkConstant.ZK_RPC_DATA_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { log.error(e.toString()); return false; } catch (InterruptedException e) { log.error(e.toString()); return false; } return true; } /** * 创建node节点 * @param zk 105fb * @param data */ public boolean createNode(String node, String data) { try { byte[] bytes = data.getBytes(); //同步创建临时顺序节点 String path = zk.create(ZkConstant.ZK_RPC_DATA_PATH+"/"+node+"-", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); log.info("create zookeeper node ({} => {})", path, data); }catch (KeeperException e) { log.error("", e); return false; }catch (InterruptedException ex){ log.error("", ex); return false; } return true; } /** * 同步节点 * 这是一个通知模式 * syncNodes会通过级联方式,在每次watcher被触发后,就会再挂一次watcher。完成了一个类似链式触发的功能 * @param zk */ @SuppressWarnings("unchecked") public boolean syncNodes() { try { List<String> nodeList = zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { syncNodes(); } } }); Map<String,List<String>> map =new HashMap<String,List<String>>(); for (String node : nodeList) { byte[] bytes = zk.getData(ZkConstant.ZK_RPC_DATA_PATH + "/" + node, false, null); String key =node.substring(0, node.lastIndexOf(ZkConstant.DELIMITED_MARKER)); String value=new String(bytes); Object object =map.get(key); if(object!=null){ ((List<String>)object).add(value); }else { List<String> dataList = new ArrayList<String>(); dataList.add(value); map.put(key,dataList); } log.info("node:"+node+" data:"+new String(bytes)); } if(MapUtil.isNotEmpty(map)){/**修改连接的地址缓存*/ log.debug("invoking service cache updateing...."); InvokingServiceCache.updataInvokingServiceMap(map); } return true; } catch (KeeperException | InterruptedException e) { log.error(e.toString()); return false; } } /** * 停止服务 * @return */ public boolean zkStop(){ if(zk!=null){ try { zk.close(); return true; } catch (InterruptedException e) { log.error(e.toString()); return false; } } return true; } }
Spring 上下文监听
package com.black.blackrpc.code.spring.listener; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import com.black.blackrpc.code.annotation.RegisterService; import com.black.blackrpc.code.annotation.RegisterServiceInContext; import com.black.blackrpc.code.annotation.SerializationType; import com.black.blackrpc.code.annotation.Weight; import com.black.blackrpc.code.base.entity.ProvideServiceBase; import com.black.blackrpc.code.cache.ProvideServiceCache; import com.black.blackrpc.code.enums.SerializationTypeEnum; import com.black.blackrpc.common.constant.ZkConstant; import com.black.blackrpc.common.util.ListUtil; import com.black.blackrpc.common.util.math.Arith; import com.black.blackrpc.communication.netty.tcp.service.NettyTcpService; /** * 上下文监听 * * @author v_wangshiyu * */ @Component @Order(1)//使其在服务端核心初始化之前加载 public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> { private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class); @Override public void onApplicationEvent(ContextRefreshedEvent event){ log.info("Context Refreshed Listener Is Run!"); // 根容器为Spring容器 if (event.getApplicationContext().getParent() == null) { /**监听被标记@RegisterService 注解的类,同时将其加入缓存**/ Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(RegisterService.class); beans.putAll(event.getApplicationContext().getBeansWithAnnotation(RegisterServiceInContext.class)); ProvideServiceCache.provideServiceMapInit();// 初始化服务缓存 for (Object bean : beans.values()) { RegisterService registerService = bean.getClass().getAnnotation(RegisterService.class); SerializationType serializationType = bean.getClass().getAnnotation(SerializationType.class); Weight weight = bean.getClass().getAnnotation(Weight.class); if (!"".equals(registerService.value())) {//是否存在别名 ProvideServiceBase provideServiceBase = new ProvideServiceBase(); provideServiceBase.setServiceName(registerService.value()); provideServiceBase.setServiceClass(bean.getClass()); provideServiceBase.setBean(bean); if(serializationType==null){ provideServiceBase.setSerializationType(SerializationTypeEnum.Protostuff); }else{ provideServiceBase.setSerializationType(serializationType.value()); } if(weight==null){ provideServiceBase.setWeight(1.0); }else{ try { provideServiceBase.setWeight(Arith.round(weight.value(), 1)); } catch (Exception e) { provideServiceBase.setWeight(1.0); e.printStackTrace(); } } //发布的名称加上序列化方式 ProvideServiceCache.provideServiceMap.put(registerService.value()+ZkConstant.DELIMITED_MARKER+serializationType.value(), provideServiceBase); } else { Class<?>[] classs = bean.getClass().getInterfaces(); if (ListUtil.isNotEmpty(classs)) { for (Class<?> class_ : classs) { ProvideServiceBase provideServiceBase = new ProvideServiceBase(); provideServiceBase.setServiceName(class_.getName()); provideServiceBase.setServiceClass(bean.getClass()); if(serializationType==null){ provideServiceBase.setSerializationType(SerializationTypeEnum.Protostuff); }else{ provideServiceBase.setSerializationType(serializationType.value()); } if(weight==null){ provideServiceBase.setWeight(1.0); }else{ try { provideServiceBase.setWeight(Arith.round(weight.value(), 1)); } catch (Exception e) { provideServiceBase.setWeight(1.0); e.printStackTrace(); } } provideServiceBase.setBean(bean); //发布的名称加上序列化方式 ProvideServiceCache.provideServiceMap.put(class_.getName(), provideServiceBase); } } } } /**监听被标记@RegisterService 注解的类,同时将其加入缓存**/ } } }
rpc服务端核心
package com.black.blackrpc.code.service; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import com.black.blackrpc.code.base.entity.ProvideServiceBase; import com.black.blackrpc.code.cache.ObjectCache; import com.black.blackrpc.code.cache.ProvideServiceCache; import com.black.blackrpc.common.configure.BreakRpcConfigure; import com.black.blackrpc.common.util.MapUtil; import com.black.blackrpc.common.util.StringUtil; import com.black.blackrpc.communication.netty.tcp.service.NettyTcpService; import com.black.blackrpc.zk.ZooKeeperOperation; /** * 服务端核心初始化 * @author v_wangshiyu * */ @Component @Order(2)//使其在上下文监听之后加载 public class ServiceCodeInit implements ApplicationListener<ContextRefreshedEvent> { private static final Logger log = LoggerFactory.getLogger(ServiceCodeInit.class); @Autowired private BreakRpcConfigure breakRpcConfigure; /*** * 初始化操作涉及: * 启动 netty, * 连接 zk, * 注册本地服务。 * 执行时间:上下文监听之后加载 * @throws Exception */ @Override public void onApplicationEvent(ContextRefreshedEvent event) { log.info("Service Code Init...."); //有需要发布的服务时开启netty服务端 和服务注册 if(MapUtil.isNotEmpty(ProvideServiceCache.provideServiceMap)){ try { NettyTcpService nettyTcpService = new NettyTcpService("localhost",8888); nettyTcpService.start(); } catch (Exception e) { e.printStackTrace(); } } //服务端开启并且有需要发布的服务时开启服务注册 if(MapUtil.isNotEmpty(ProvideServiceCache.provideServiceMap)&&breakRpcConfigure.getServerOpen()!=null&&breakRpcConfigure.getServerOpen()){ String zkAddress= breakRpcConfigure.getZkAddress(); if(StringUtil.isNotEmpty(zkAddress)){ ZooKeeperOperation zo =new ZooKeeperOperation(zkAddress); if(zo.connectServer()){//连接 zo.AddRootNode(); } ObjectCache.zooKeeperOperation =zo; //zo.syncNodes();//测试使用 }else{ throw new RuntimeException("zookeeper address is null!"); } Map<String,ProvideServiceBase> provideServiceMap=ProvideServiceCache.provideServiceMap; for(String key:provideServiceMap.keySet()){ ProvideServiceBase provideServiceBase= provideServiceMap.get(key); //发布的时候加上权值 ObjectCache.zooKeeperOperation.createNode(key, breakRpcConfigure.getServerTcpAddress()+provideServiceBase.getZkDate()); } } log.info("Service Code Init Success!"); } }
这里先简单介绍一下,如果想跑起来,请去git上下载源码。
1.当项目跑起来的时候,首先会进入ContextRefreshedListener 这个上下文监听,监听里的onApplicationEvent方法会遍历spring上下文中所有的bean,检测各个bean中是否包含@RegisterService,如果包含,说明这个是一个需要发布的服务,获取对应信息(别名,权值,序列化方式)存储到ProvideServiceCache缓存当中。这里如果为定义别名,会以该类实现的接口的名称为别名,存储进入缓存当中。
2.ContextRefreshedListener 执行完之后,会去调用ServiceCodeInit(服务端核心)类的onApplicationEvent方法。这里根据配置,和ProvideServiceCache缓存情况判断是否初始化ZooKeeperOperation(zookeeper操作) 和NettyTcpService(netty服务端)。一旦ZooKeeperOperation初始化完成,回去发布ProvideServiceCache缓存里的服务。
相关文章推荐
- 简单的分布式RPC框架 《blackRpc》 五(客户端启动及调用)
- 简单的分布式RPC框架 《blackRpc》 三
- 简单的分布式RPC框架 《blackRpc》 一
- 简单的分布式RPC框架 《blackRpc》 二
- 基于Netty的RPC简单框架实现(二):RPC服务端
- 分布式学习笔记1通过Java自己实现简单的HTTP RPC框架
- Zookeeper实现简单的分布式RPC框架
- MFC简单框架启动流程(CWinApp,CFrameWnd)
- Zookeeper实现简单的分布式RPC框架
- Nginx启动框架处理流程
- 轻量级分布式 RPC 框架
- 一个简单RPC框架是如何炼成的(I)——开局篇
- 简单介绍google protobuf rpc框架使用方法
- 浅谈分布式框架的搭建(一) 简单工程的搭建
- 轻量级分布式 RPC 框架
- 轻量级分布式 RPC 框架
- 轻量级分布式 RPC 框架
- selenium之多线程启动grid分布式测试框架封装(三)
- Android启动流程的简单分析
- RPC框架/高性能远程同步调用框架/分布式服务框架