您的位置:首页 > 其它

自己设计一个的轻量级的RPC框架--服务端netty

2019-03-04 14:31 204 查看

自己设计一个的轻量级的RPC框架--服务端netty

  • netty解码器和TCP的粘包和拆包
  • 创建netty服务类
  • 创建调用服务工具类
  • 启动netty服务
  • #前言

    Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。由于jdk的nio使用比较复杂,并且自己之前看过netty的相关书籍,刚刚好拿来练手

    创建netty服务

    public static void connect(int port){
    //netty主从线程模型(建立2个线程组) 一个用于网络读写   一个用于和客户的进行连接
    EventLoopGroup bossGroup=new NioEventLoopGroup();
    EventLoopGroup workerGroup=new NioEventLoopGroup();
    try {
    //启动辅助类 用于配置各种参数
    ServerBootstrap b=new ServerBootstrap();
    b.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,1024)//最大排列队数
    .childHandler(new ChannelInitializer<SocketChannel>(){
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(new LineBasedFrameDecoder(8192));//以换行符为结束位置进行分包
    socketChannel.pipeline().addLast(new StringDecoder());//将接收到的对象转为字符串
    socketChannel.pipeline().addLast(new RPCResponseHandler());//处理类
    }
    });
    //绑定端口 同步等待成功
    ChannelFuture future=b.bind(port).sync();
    System.out.println("netty server start on port:"+port);
    //同步等待服务端监听端口关闭
    future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    //释放资源退出
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }

    梳理一下流程

    1.创建了一个 ServerBootstrap
    是启动辅助类

    2.创建了2个EventLoopGroup
    是一个主从的线程模型并将其放入启动类中 一个用于网络读写 一个用于和客户的进行连接
    3.设置 NioServerSocketChannel

    这个意思是用于服务端非阻塞地接收TCP连接。当然还有好几种类型包括阻塞的channel等。Netty的Channel在JDK NIO的Channel基础上做了一层封装,提供了更多的功能

    4.设置 option
    option(ChannelOption.SO_BACKLOG,1024) 意思是当很多客户的进行与服务器连接会进行排队,这里设置的1024指的是队列的最大排列数量

    5.设置childHandler
    LineBasedFrameDecoder 是一种是用于防止粘包和分包的解码器,以换行符为结尾会自动解析成一个包。
    StringDecoder 将对象变成String的解码器
    RPCResponseHandler 自己编写的处理类(接收客户端请求和发起应答)

    6.绑定端口
    7.同步等待服务端监听端口关闭
    8.释放资源

    netty解码器和TCP的粘包和拆包

    介绍

    TCP是一个"流"的协议,所以底层就是一串没有界限的数据。一个业务可能会发生一个包拆解成N个小包进行发送,也有可能会把N个包合并成一个包进行发送。

    TCP 拆包/粘包

    当客户端发送2个数据包给服务端的时候会出现好几种情况。(B1,B2 两个包)
    1.服务器读到2个独立的包 B1,B2。没有问题
    2.服务器读到1个包,B1,B2合并在一起,这个就是粘包
    3.服务器读到2个包,一个完整的B1和部分B2,一个是剩下B2。这个就是拆包
    …等其他情况

    TCP 拆包/粘包 解决办法

    1.定长的数据包比如说 1024 。不到的用空格补起
    2.以某种类型的符号进行分割。例如 换行符
    3.更为复杂的协议 例如http webservice等

    netty解决拆包/粘包问题 解码器

    DelimiterBasedFrameDecoder 分隔解码器
    LineBasedFrameDecoder 换行符分隔解码器
    FixedLengthFrameDecoder 固定长度解码器,二进制

    创建netty服务类

    public class RPCResponseHandler extends ChannelHandlerAdapter {
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
    String requestJson= (String) msg;
    RPCRequest request= RPC.requestDeocde(requestJson);
    //通过反射调用服务
    Object result=InvokeServiceUtil.invoke(request);
    RPCResponse response=new RPCResponse();
    response.setRequestID(request.getRequestID());
    response.setResult(result);
    String respStr=RPC.responseEncode(response);
    ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes());
    //写入缓存数组
    System.out.println("response : " + respStr);
    System.out.println(respStr.getBytes().length);
    ctx.writeAndFlush(responseBuf);
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //flush方法再全部写到通道中
    ctx.flush();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    }
    
    }

    这个很简单 继承ChannelHandlerAdapter
    channelRead() 方法用于读取到客户端发过来的数据 进行业务操作之后把响应数据写入缓存数组
    channelReadComplete() 方法用于发送数据 将数据发送至channel通道

    创建调用服务工具类

    public static Object invoke(RPCRequest request){
    Object result=null;
    try {
    Class implClass=Class.forName(request.getClassName());
    Object[] parameters=request.getParameters();
    int parameterNums=request.getParameters().length;
    Class[] parameterTypes=new Class[parameterNums];
    for (int i = 0; i <parameterNums ; i++) {
    parameterTypes[i]=parameters[i].getClass();
    }
    Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes);
    //获取到注册的服务bean
    Object implObj=ZkServer.serverContext.getBean(StringUtil.toLowerCaseFirstOne(implClass.getSimpleName()));
    result=method.invoke(implObj,parameters);
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    } catch (NoSuchMethodException e) {
    e.printStackTrace();
    } catch (IllegalAccessException e) {
    e.printStackTrace();
    } catch (InvocationTargetException e) {
    e.printStackTrace();
    }
    return result;
    }

    启动netty服务

    public void Initialized() {
    //初始化方法 不开线程会和tomcat 冲突
    new Thread() {
    public void run() {
    RPC.start();
    }
    }.start();//开启线程
    }
    
    /**
    * 实现端启动RPC服务
    */
    public static void start(){
    RPCResponseNet.connect(port);
    }

    注意 这里在Spring中注入bean 初始化调换用start() 方法是不行的,
    会和tomcat有冲突 导致tomcat 并没有启动 只启动了netty服务。所以必须启动一个线程了单独启动netty服务。

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: