您的位置:首页 > 编程语言 > Java开发

简单的分布式RPC框架 《blackRpc》 四(服务端启动流程)

2018-01-23 10:38 330 查看
上一节说了blackRpc的组成,这一节我们直接来撸代码。

先帖上项目gitHup地址:https://github.com/wangshiyu/blackRpc

先帖上pom.xml文件的引用

<dependencies>
<!-- JUnit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.3.9.RELEASE</version>
<scope>test</scope>
</dependency>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.0.Final</version>
</dependency>
<!-- Protostuff -->
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.4.0</version>
</dependency>
<!-- msgpack-->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.13</version>
</dependency>
<!--msgpack -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.8.13</version>
</dependency>
<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.0-alpha</version>
</dependency>
<!-- Json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.6</version>
</dependency>
</dependencies>


引入的java包很简单,三种序列化fastjson、msgpack、protostuff ,万金油spring,通讯框架netty , 分布式应用程序协调服务 zookeeper 以及日志包slf4j,下面先帖上对应组件。

netty服务端:

package com.black.blackrpc.communication.netty.tcp.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/***
* netty tcp 服务端
* @author v_wangshiyu
*
*/
public class NettyTcpService {
private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class);
private String host;
private int port;

public NettyTcpService(String host,int port) throws Exception{
this.host=host;
this.port=port;
}
/**用于分配处理业务线程的线程组个数 */
private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认
/** 业务出现线程大小*/
private static final int BIZTHREADSIZE = 4;
/*
* NioEventLoopGroup实际上就是个线程,
* NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件,
* 每一个NioEventLoop负责处理m个Channel,
* NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

public void start() throws Exception {
log.info("Netty Tcp Service Run...");
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChanne
4000
l(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("decoder", new ByteArrayDecoder());
pipeline.addLast("encoder", new ByteArrayEncoder());
//  pipeline.addLast(new Encoder());
//  pipeline.addLast(new Decoder());
pipeline.addLast(new TcpServerHandler());
}
});

b.bind(host, port).sync();
log.info("Netty Tcp Service Success!");
}

/**
* 停止服务并释放资源
*/
public void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}


服务端Handler

package com.black.blackrpc.communication.netty.tcp.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.black.blackrpc.code.base.entity.ProvideServiceBase;
import com.black.blackrpc.code.base.entity.RpcRequest;
import com.black.blackrpc.code.base.entity.RpcResponse;
import com.black.blackrpc.code.cache.ProvideServiceCache;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.code.invoking.BeanInvoking;
import com.black.blackrpc.common.constant.ErrorConstant;
import com.black.blackrpc.communication.message.Head;
import com.black.blackrpc.communication.message.HeadAnalysis;
import com.black.blackrpc.serialization.SerializationIntegrate;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class TcpServerHandler extends SimpleChannelInboundHandler<Object>{
private static final Logger log = LoggerFactory.getLogger(TcpServerHandler.class);

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse =new RpcResponse();
byte[] head_data=(byte[])msg;
HeadAnalysis headAnalysis =new HeadAnalysis(head_data);
if(head_data.length!=headAnalysis.getLength()+8){
throw new Exception("TcpServer Receive Data Length is not Agreement!!!");
}
byte[] data=new byte[headAnalysis.getLength()];
System.arraycopy(head_data,8,data,0,data.length);
SerializationTypeEnum serializationType= SerializationTypeEnum.getSerializationTypeEnum(headAnalysis.getSerializationType());
RpcRequest rpcRequest= SerializationIntegrate.deserialize(data, RpcRequest.class,serializationType);
//如果参数数组为null,添加一个空的数组,有些序列化不识别空的数组
rpcRequest.setParameterTypes(rpcRequest.getParameterTypes()==null?new Class<?>[0]:rpcRequest.getParameterTypes());

log.debug("Tcp Server receive head:"+headAnalysis+"Tcp Server receive data:" +rpcRequest);

ProvideServiceBase provideServiceBase= ProvideServiceCache.provideServiceMap.get(rpcRequest.getServiceName());
rpcResponse.setRequestId(rpcRequest.getRequestId());
if(provideServiceBase!=null){
rpcResponse.setResult(BeanInvoking.invoking(provideServiceBase.getBean(), rpcRequest.getMethodName(), rpcRequest.getParameterTypes(), rpcRequest.getParameters()));
}else{
rpcResponse.setError(ErrorConstant.Servicer222);
}

/********定义报文头,组装数据*******/
byte[] response_data=SerializationIntegrate.serialize(rpcResponse, serializationType);

Head response_head =new Head(response_data.length,0,0,serializationType.getValue());
byte[] response_head_data=response_head.getHeadData();
System.arraycopy(response_data,0,response_head_data,8,response_data.length);
/********定义报文头,组装数据*******/

ctx.channel().writeAndFlush(response_head_data);
log.debug("Tcp Server send head:"+response_head+"Tcp Server send data:" +rpcResponse);
}

}


ZooKeeper 操作

package com.black.blackrpc.zk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.black.blackrpc.code.cache.InvokingServiceCache;
import com.black.blackrpc.common.constant.ZkConstant;
import com.black.blackrpc.common.util.MapUtil;
/**
* ZooKeeper 操作
* wangshiyu
*/
public class ZooKeeperOperation {

private static final Logger log = LoggerFactory.getLogger(ZooKeeperOperation.class);

private CountDownLatch latch = new CountDownLatch(1);

private ZooKeeper zk;

private String zkAddress;

public ZooKeeperOperation(String zkAddress) {
this.zkAddress = zkAddress;
}

/**
* 连接服务器
* @return
*/
public boolean connectServer() {
try {
zk = new ZooKeeper(zkAddress, ZkConstant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException e) {
log.error(e.toString());
return false;
}catch (InterruptedException ex){
log.error(ex.toString());
return false;
}
return true;
}

/**
* 添加root节点
* @return
*/
public boolean AddRootNode(){
try {
Stat s = zk.exists(ZkConstant.ZK_RPC_DATA_PATH, false);
if (s == null) {
//同步创建临时持久节点
zk.create(ZkConstant.ZK_RPC_DATA_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
log.error(e.toString());
return false;
} catch (InterruptedException e) {
log.error(e.toString());
return false;
}
return true;
}

/**
* 创建node节点
* @param zk

105fb
* @param data
*/
public boolean createNode(String node, String data) {
try {
byte[] bytes = data.getBytes();
//同步创建临时顺序节点
String path = zk.create(ZkConstant.ZK_RPC_DATA_PATH+"/"+node+"-", bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("create zookeeper node ({} => {})", path, data);
}catch (KeeperException e) {
log.error("", e);
return false;
}catch (InterruptedException ex){
log.error("", ex);
return false;
}
return true;
}

/**
* 同步节点
* 这是一个通知模式
* syncNodes会通过级联方式,在每次watcher被触发后,就会再挂一次watcher。完成了一个类似链式触发的功能
* @param zk
*/
@SuppressWarnings("unchecked")
public boolean syncNodes() {
try {
List<String> nodeList = zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
syncNodes();
}
}
});

Map<String,List<String>> map =new HashMap<String,List<String>>();

for (String node : nodeList) {
byte[] bytes = zk.getData(ZkConstant.ZK_RPC_DATA_PATH + "/" + node, false, null);
String key =node.substring(0, node.lastIndexOf(ZkConstant.DELIMITED_MARKER));
String value=new String(bytes);
Object object =map.get(key);
if(object!=null){
((List<String>)object).add(value);
}else {
List<String> dataList = new ArrayList<String>();
dataList.add(value);
map.put(key,dataList);
}
log.info("node:"+node+"    data:"+new String(bytes));
}

if(MapUtil.isNotEmpty(map)){/**修改连接的地址缓存*/
log.debug("invoking service cache updateing....");
InvokingServiceCache.updataInvokingServiceMap(map);
}
return true;
} catch (KeeperException | InterruptedException e) {
log.error(e.toString());
return false;
}
}

/**
* 停止服务
* @return
*/
public boolean zkStop(){
if(zk!=null){
try {
zk.close();
return true;
} catch (InterruptedException e) {
log.error(e.toString());
return false;
}
}
return true;
}
}


Spring 上下文监听

package com.black.blackrpc.code.spring.listener;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import com.black.blackrpc.code.annotation.RegisterService;
import com.black.blackrpc.code.annotation.RegisterServiceInContext;
import com.black.blackrpc.code.annotation.SerializationType;
import com.black.blackrpc.code.annotation.Weight;
import com.black.blackrpc.code.base.entity.ProvideServiceBase;
import com.black.blackrpc.code.cache.ProvideServiceCache;
import com.black.blackrpc.code.enums.SerializationTypeEnum;
import com.black.blackrpc.common.constant.ZkConstant;
import com.black.blackrpc.common.util.ListUtil;
import com.black.blackrpc.common.util.math.Arith;
import com.black.blackrpc.communication.netty.tcp.service.NettyTcpService;

/**
* 上下文监听
*
* @author v_wangshiyu
*
*/
@Component
@Order(1)//使其在服务端核心初始化之前加载
public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> {
private static final Logger log = LoggerFactory.getLogger(NettyTcpService.class);

@Override
public void onApplicationEvent(ContextRefreshedEvent event){
log.info("Context Refreshed Listener Is Run!");
// 根容器为Spring容器
if (event.getApplicationContext().getParent() == null) {
/**监听被标记@RegisterService 注解的类,同时将其加入缓存**/
Map<String, Object> beans = event.getApplicationContext().getBeansWithAnnotation(RegisterService.class);
beans.putAll(event.getApplicationContext().getBeansWithAnnotation(RegisterServiceInContext.class));
ProvideServiceCache.provideServiceMapInit();// 初始化服务缓存
for (Object bean : beans.values()) {
RegisterService registerService = bean.getClass().getAnnotation(RegisterService.class);
SerializationType serializationType = bean.getClass().getAnnotation(SerializationType.class);
Weight weight = bean.getClass().getAnnotation(Weight.class);
if (!"".equals(registerService.value())) {//是否存在别名
ProvideServiceBase provideServiceBase = new ProvideServiceBase();
provideServiceBase.setServiceName(registerService.value());
provideServiceBase.setServiceClass(bean.getClass());
provideServiceBase.setBean(bean);
if(serializationType==null){
provideServiceBase.setSerializationType(SerializationTypeEnum.Protostuff);
}else{
provideServiceBase.setSerializationType(serializationType.value());
}
if(weight==null){
provideServiceBase.setWeight(1.0);
}else{
try {
provideServiceBase.setWeight(Arith.round(weight.value(), 1));
} catch (Exception e) {
provideServiceBase.setWeight(1.0);
e.printStackTrace();
}
}
//发布的名称加上序列化方式
ProvideServiceCache.provideServiceMap.put(registerService.value()+ZkConstant.DELIMITED_MARKER+serializationType.value(), provideServiceBase);
} else {
Class<?>[] classs = bean.getClass().getInterfaces();
if (ListUtil.isNotEmpty(classs)) {
for (Class<?> class_ : classs) {
ProvideServiceBase provideServiceBase = new ProvideServiceBase();
provideServiceBase.setServiceName(class_.getName());
provideServiceBase.setServiceClass(bean.getClass());
if(serializationType==null){
provideServiceBase.setSerializationType(SerializationTypeEnum.Protostuff);
}else{
provideServiceBase.setSerializationType(serializationType.value());
}
if(weight==null){
provideServiceBase.setWeight(1.0);
}else{
try {
provideServiceBase.setWeight(Arith.round(weight.value(), 1));
} catch (Exception e) {
provideServiceBase.setWeight(1.0);
e.printStackTrace();
}
}
provideServiceBase.setBean(bean);
//发布的名称加上序列化方式
ProvideServiceCache.provideServiceMap.put(class_.getName(), provideServiceBase);
}
}
}
}
/**监听被标记@RegisterService 注解的类,同时将其加入缓存**/

}
}
}


rpc服务端核心

package com.black.blackrpc.code.service;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import com.black.blackrpc.code.base.entity.ProvideServiceBase;
import com.black.blackrpc.code.cache.ObjectCache;
import com.black.blackrpc.code.cache.ProvideServiceCache;
import com.black.blackrpc.common.configure.BreakRpcConfigure;
import com.black.blackrpc.common.util.MapUtil;
import com.black.blackrpc.common.util.StringUtil;
import com.black.blackrpc.communication.netty.tcp.service.NettyTcpService;
import com.black.blackrpc.zk.ZooKeeperOperation;
/**
* 服务端核心初始化
* @author v_wangshiyu
*
*/
@Component
@Order(2)//使其在上下文监听之后加载
public class ServiceCodeInit  implements ApplicationListener<ContextRefreshedEvent>  {
private static final Logger log = LoggerFactory.getLogger(ServiceCodeInit.class);
@Autowired
private BreakRpcConfigure breakRpcConfigure;
/***
* 初始化操作涉及:
* 启动 netty,
* 连接 zk,
* 注册本地服务。
* 执行时间:上下文监听之后加载
* @throws Exception
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("Service Code Init....");
//有需要发布的服务时开启netty服务端 和服务注册
if(MapUtil.isNotEmpty(ProvideServiceCache.provideServiceMap)){
try {
NettyTcpService nettyTcpService = new NettyTcpService("localhost",8888);
nettyTcpService.start();
} catch (Exception e) {
e.printStackTrace();
}
}

//服务端开启并且有需要发布的服务时开启服务注册
if(MapUtil.isNotEmpty(ProvideServiceCache.provideServiceMap)&&breakRpcConfigure.getServerOpen()!=null&&breakRpcConfigure.getServerOpen()){
String zkAddress= breakRpcConfigure.getZkAddress();
if(StringUtil.isNotEmpty(zkAddress)){
ZooKeeperOperation zo =new ZooKeeperOperation(zkAddress);
if(zo.connectServer()){//连接
zo.AddRootNode();
}
ObjectCache.zooKeeperOperation =zo;
//zo.syncNodes();//测试使用
}else{
throw new RuntimeException("zookeeper address is null!");
}
Map<String,ProvideServiceBase> provideServiceMap=ProvideServiceCache.provideServiceMap;
for(String key:provideServiceMap.keySet()){
ProvideServiceBase provideServiceBase= provideServiceMap.get(key);
//发布的时候加上权值
ObjectCache.zooKeeperOperation.createNode(key, breakRpcConfigure.getServerTcpAddress()+provideServiceBase.getZkDate());
}
}
log.info("Service Code Init Success!");
}
}


这里先简单介绍一下,如果想跑起来,请去git上下载源码。

1.当项目跑起来的时候,首先会进入ContextRefreshedListener 这个上下文监听,监听里的onApplicationEvent方法会遍历spring上下文中所有的bean,检测各个bean中是否包含@RegisterService,如果包含,说明这个是一个需要发布的服务,获取对应信息(别名,权值,序列化方式)存储到ProvideServiceCache缓存当中。这里如果为定义别名,会以该类实现的接口的名称为别名,存储进入缓存当中。

2.ContextRefreshedListener 执行完之后,会去调用ServiceCodeInit(服务端核心)类的onApplicationEvent方法。这里根据配置,和ProvideServiceCache缓存情况判断是否初始化ZooKeeperOperation(zookeeper操作) 和NettyTcpService(netty服务端)。一旦ZooKeeperOperation初始化完成,回去发布ProvideServiceCache缓存里的服务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息