Netty - 简单入门实例,线程模型
2018-10-10 23:32
344 查看
服务器端和客户端通信流程:
1、client调用writeAndFlush()把信息传到serverHandler
2、serverHandler在channelRead()方法中读取数据并调用writeAndFlush()把信息传到clientHandler
3、clientHandler在channelRead()方法中读取数据
添加依赖
[code] <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency>
注意依赖要选5版本的,如果选其他版本会报以下警告,并且服务端接受不到客户端数据,因为4版本的不能重写channelRead和exceptionCaught方法
[code]警告: Unknown channel option 'SO_SNDBUF' for channel '[id: 0x3b118c19]' 警告: Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x3b118c19]'
服务端
server
[code]package com.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; //服务器端 public class MyServer { //监听线程组,监听客户端请求 private EventLoopGroup acceptorGroup = null; //处理客户端相关操作线程组,负责处理与客户端的数据通信 private EventLoopGroup clientGroup = null; //服务启动相关配置信息,服务端Bootstrap带server private ServerBootstrap serverBootstrap = null; public MyServer(){ init(); } private void init(){ //初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量 acceptorGroup = new NioEventLoopGroup(); clientGroup = new NioEventLoopGroup(); /** * 单线程模型,个人机开发测试使用 * 监听线程组构造参数为1 * acceptorGroup = new NioEventLoopGroup(1); * group传递参数为同一个 * serverBootstrap.group(acceptorGroup,acceptorGroup); * * 多线程模型,长连接,客户端连接数量较少,连接持续时间较长 * 监听线程组构造参数为1 * acceptorGroup = new NioEventLoopGroup(1); * 处理客户端任务线程组构造参数》1 * serverBootstrap = new ServerBootstrap(); * group传递参数为不是同一个 * serverBootstrap.group(acceptorGroup,clientGroup); * * 主从多线程模型,长连接,客户端数量较多,连接持续时间较长 * 监听线程组构造参数》1 * acceptorGroup = new NioEventLoopGroup(); * 处理客户端任务线程组构造参数》1 * serverBootstrap = new ServerBootstrap(); * group传递参数为不是同一个 * serverBootstrap.group(acceptorGroup,clientGroup); */ //初始化服务的配置 serverBootstrap = new ServerBootstrap(); //绑定线程组,acceptorGroup监听信息,clientGroup客户端信息 serverBootstrap.group(acceptorGroup,clientGroup); //设定通信模式为NIO,同步非阻塞 serverBootstrap.channel(NioServerSocketChannel.class); //设定缓冲区大小,缓冲区单位是字节 serverBootstrap.option(ChannelOption.SO_BACKLOG,1024); //SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效) serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024) .option(ChannelOption.SO_RCVBUF,16*1024) .option(ChannelOption.SO_KEEPALIVE,true); } public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException { /** * childHandler是服务的bootstrap独有的方法,用于提供处理对象 * 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式 * 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理 * * ChannelInitializer - 用于提供处理器的一个模型对象 * 其中定义了一个initChannel方法,用于初始化处理逻辑责任链条 * 可以保证服务端的bootstrap只初始化一次处理器,尽量提供处理逻辑的重用 * 避免反复创建处理对象,节约资源开销 */ serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(acceptorHandlers); } }); /** * bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可 * sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果 * 可以使用ChannelFuture实现后续的服务器和客户端的交互 */ ChannelFuture future = serverBootstrap.bind(port).sync(); /*绑定多个端口 serverBootstrap.bind(port); serverBootstrap.bind(port);*/ return future; } /** * shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求 */ public void release(){ this.acceptorGroup.shutdownGracefully(); this.clientGroup.shutdownGracefully(); } public static void main(String[] args){ ChannelFuture future = null; MyServer myServer = null; try { myServer = new MyServer(); future = myServer.doAccept(9999,new MyServerHandler()); System.out.println("server started"); //关闭连接 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { if (null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if (null != myServer){ myServer.release(); } } } }
serverHandler
[code]package com.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.io.UnsupportedEncodingException; /** * @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用 * 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象 */ @Sharable public class MyServerHandler extends ChannelHandlerAdapter{ /** * 业务处理逻辑 * 用于处理读取数据请求的逻辑 * ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel * msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException { // 获取读取的数据,是一个缓冲 ByteBuf readBuffer = (ByteBuf) msg; //创建一个字节数组,用于保存缓存中的数据 byte[] tempDatas = new byte[readBuffer.readableBytes()]; //将缓存中的数据读取到字节数组中 readBuffer.readBytes(tempDatas); String message = new String(tempDatas,"utf-8"); System.out.println("from client :"+message); if ("exit".equals(message)){ ctx.close(); return; } String line = "server message to client!"; //写操作自动释放缓存,避免内存溢出 ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))); /* 如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行 ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8"))); ctx.close();*/ } /** * 异常处理逻辑,当客户端异常退出时也会执行 * ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ System.out.println("server exceptionCaught method run.."); ctx.close(); } }
客户端
client
[code]package com.client; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; import java.util.Timer; import java.util.concurrent.TimeUnit; /** * 客户端是请求的发起者,不需要监听 * 只需要定义唯一的一个线程组即可 */ public class CustorClient { //处理请求和处理服务端响应的线程组 private EventLoopGroup group = null; //客户端服务启动相关配置信息 private Bootstrap bootstrap = null; public CustorClient(){ init(); } private void init(){ group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //绑定线程组 bootstrap.group(group); //设定通讯模式为NIO bootstrap.channel(NioSocketChannel.class); } public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException { /** * 客户端的bootstrap没有childHandler方法,只有handler方法 * 方法含义等同于ServerBootstrap中的childHandler * 在客户端必须绑定处理器(必须调用handler方法) * 服务器必须绑定处理器(必须调用childHandler方法) */ this.bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(handlers); } }); //建立连接 ChannelFuture future = this.bootstrap.connect(host,port).sync(); return future; } public void release(){ this.group.shutdownGracefully(); } public static void main(String[] atgs){ CustorClient client = null; ChannelFuture future = null; try { client = new CustorClient(); future = client.doRequest("localhost", 9999, new CustorClientHandler()); Scanner s = null; while (true) { s = new Scanner(System.in); System.out.println("enter message send to server(enter 'exit' for close client)"); String line = s.nextLine(); if ("exit".equals(line)) { /** * addListener - 增加监听,当条件满足时候,出发监听器 * ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接 */ future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))) .addListener(ChannelFutureListener.CLOSE); break; } //Unpooled工具类用来做buffer转换 future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8"))); //睡一秒读取信息 TimeUnit.SECONDS.sleep(1); } }catch (Exception e){ e.printStackTrace(); }finally { if (null != future){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } if (null != client){ client.release(); } } } }
clientHandler
[code]package com.client; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import java.io.UnsupportedEncodingException; public class CustorClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException { try { ByteBuf readBuffer = (ByteBuf) msg; byte[] tempDatas = new byte[readBuffer.readableBytes()]; readBuffer.readBytes(tempDatas); System.out.println("form server:"+new String(tempDatas,"utf-8")); } finally { //释放资源,避免内存溢出 ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){ System.out.println("client exceptionCaught method run.."); cxt.close(); } }
启动服务器测试,在客户端输入信息,在服务端显示,输入exit退出客户端
阅读更多
相关文章推荐
- netty入门学习(2)-一个简单的netty实例
- js入门·循环与判断/利用函数的简单实例/使用对象/列举对象属性的名称
- Android基础入门教程——5.2.5 Fragment实例精讲——新闻(购物)类App列表Fragment的简单实现
- springMVC入门实例 springMVC简单入门
- JMS两种模型的介绍和ActiveMQ的简单实例
- RBAC模型与Shiro简单的实例介绍
- storm 入门教程+简单实例
- Ant入门简单实例
- RabbitMQ入门之安装配置与简单实例
- Axis的简单入门实例
- Mina、Netty、Twisted一起学(十):线程模型
- java 从零开始,学习笔记之基础入门<线程及实例分析>(十九)
- VB.net数据库编程(01):简单的入门实例--连接到Access文件
- Mina、Netty、Twisted一起学(十):线程模型
- Netty入门实例及分析
- 超级简单的单元测试JUnit4入门实例
- [netty]--Reactor线程模型以及在netty中的应用
- Zend Framework教程之模型Model用法简单实例
- Netty系列之Netty线程模型
- Netty(二) 从线程模型的角度看 Netty 为什么是高性能的?