您的位置:首页 > 其它

Netty - 简单入门实例,线程模型

2018-10-10 23:32 344 查看

服务器端和客户端通信流程: 

1、client调用writeAndFlush()把信息传到serverHandler

2、serverHandler在channelRead()方法中读取数据并调用writeAndFlush()把信息传到clientHandler

3、clientHandler在channelRead()方法中读取数据

添加依赖

[code]    <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>

注意依赖要选5版本的,如果选其他版本会报以下警告,并且服务端接受不到客户端数据,因为4版本的不能重写channelRead和exceptionCaught方法

[code]警告: Unknown channel option 'SO_SNDBUF' for channel '[id: 0x3b118c19]'
警告: Unknown channel option 'SO_KEEPALIVE' for channel '[id: 0x3b118c19]'

 服务端

server

[code]package com.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

//服务器端
public class MyServer {
//监听线程组,监听客户端请求
private EventLoopGroup acceptorGroup = null;
//处理客户端相关操作线程组,负责处理与客户端的数据通信
private EventLoopGroup clientGroup = null;
//服务启动相关配置信息,服务端Bootstrap带server
private ServerBootstrap serverBootstrap = null;
public MyServer(){
init();
}

private void init(){
//初始化线程组,构建线程组的时候,如果不传递参数,则默认构建的线程组线程数是CPU核心数量
acceptorGroup = new NioEventLoopGroup();
clientGroup = new NioEventLoopGroup();
/**
* 单线程模型,个人机开发测试使用
* 监听线程组构造参数为1
* acceptorGroup = new NioEventLoopGroup(1);
* group传递参数为同一个
* serverBootstrap.group(acceptorGroup,acceptorGroup);
*
* 多线程模型,长连接,客户端连接数量较少,连接持续时间较长
* 监听线程组构造参数为1
* acceptorGroup = new NioEventLoopGroup(1);
* 处理客户端任务线程组构造参数》1
* serverBootstrap = new ServerBootstrap();
* group传递参数为不是同一个
* serverBootstrap.group(acceptorGroup,clientGroup);
*
* 主从多线程模型,长连接,客户端数量较多,连接持续时间较长
* 监听线程组构造参数》1
* acceptorGroup = new NioEventLoopGroup();
* 处理客户端任务线程组构造参数》1
* serverBootstrap = new ServerBootstrap();
* group传递参数为不是同一个
* serverBootstrap.group(acceptorGroup,clientGroup);
*/

//初始化服务的配置
serverBootstrap = new ServerBootstrap();
//绑定线程组,acceptorGroup监听信息,clientGroup客户端信息
serverBootstrap.group(acceptorGroup,clientGroup);
//设定通信模式为NIO,同步非阻塞
serverBootstrap.channel(NioServerSocketChannel.class);
//设定缓冲区大小,缓冲区单位是字节
serverBootstrap.option(ChannelOption.SO_BACKLOG,1024);
//SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
serverBootstrap.option(ChannelOption.SO_SNDBUF,16*1024)
.option(ChannelOption.SO_RCVBUF,16*1024)
.option(ChannelOption.SO_KEEPALIVE,true);
}

public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException {
/**
* childHandler是服务的bootstrap独有的方法,用于提供处理对象
* 可以一次性增加若干个处理逻辑,是类似责任链模式的处理方式
* 增加A,B两个处理逻辑,在处理客户端请求数据的时候,根据A->B顺序依次处理
*
* ChannelInitializer - 用于提供处理器的一个模型对象
* 其中定义了一个initChannel方法,用于初始化处理逻辑责任链条
* 可以保证服务端的bootstrap只初始化一次处理器,尽量提供处理逻辑的重用
* 避免反复创建处理对象,节约资源开销
*/
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(acceptorHandlers);
}
});
/**
* bind方法 - 绑定监听端口,serverBootstrap可以绑定多个监听端口,多次调用即可
* sync - 开始监听逻辑,返回一个ChannelFuture,返回结果代表的是监听成功后的一个对应的未来结果
* 可以使用ChannelFuture实现后续的服务器和客户端的交互
*/
ChannelFuture future = serverBootstrap.bind(port).sync();
/*绑定多个端口
serverBootstrap.bind(port);
serverBootstrap.bind(port);*/
return future;
}

/**
* shutdownGracefully - 是一个安全关闭的方法,可以保证不放弃任何一个已接收的客户端请求
*/
public void release(){
this.acceptorGroup.shutdownGracefully();
this.clientGroup.shutdownGracefully();
}
public static void main(String[] args){
ChannelFuture future = null;
MyServer myServer = null;
try {
myServer = new MyServer();
future = myServer.doAccept(9999,new MyServerHandler());
System.out.println("server started");
//关闭连接
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (null != myServer){
myServer.release();
}
}
}
}

serverHandler

[code]package com.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.io.UnsupportedEncodingException;

/**
* @Sharable代表当前Handler是一个可以分享的处理器,可以分享给多个客户端同时使用
* 如不使用注解类型,每次客户请求时,必须为客户重新创建一个新的Handler对象
*/
@Sharable
public class MyServerHandler extends ChannelHandlerAdapter{
/**
* 业务处理逻辑
* 用于处理读取数据请求的逻辑
* ctx - 上下文对象,其中包含于客户端建立连接的所有资源,如:对应的Channel
* msg - 读取到的数据,默认类型是ByteBuf,是Netty自定义的,是对ByteBuffer的封装,不用考虑复位问题
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// 获取读取的数据,是一个缓冲
ByteBuf readBuffer = (ByteBuf) msg;
//创建一个字节数组,用于保存缓存中的数据
byte[] tempDatas = new byte[readBuffer.readableBytes()];
//将缓存中的数据读取到字节数组中
readBuffer.readBytes(tempDatas);
String message = new String(tempDatas,"utf-8");
System.out.println("from client :"+message);
if ("exit".equals(message)){
ctx.close();
return;
}
String line = "server message to client!";
//写操作自动释放缓存,避免内存溢出
ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));
/*
如果调用的是write方法,不会刷新缓存,缓存中的数据不会发送到客户端,必须再次调用flush方法才行
ctx.write(Unpooled.copiedBuffer(line.getBytes("utf-8")));
ctx.close();*/
}
/**
*  异常处理逻辑,当客户端异常退出时也会执行
*  ChannelHandlerContext关闭,也代表当前与客户端连接资源关闭
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
System.out.println("server exceptionCaught method run..");
ctx.close();
}
}

客户端

client

[code]package com.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;
import java.util.Timer;
import java.util.concurrent.TimeUnit;

/**
* 客户端是请求的发起者,不需要监听
* 只需要定义唯一的一个线程组即可
*/
public class CustorClient {
//处理请求和处理服务端响应的线程组
private EventLoopGroup group = null;
//客户端服务启动相关配置信息
private Bootstrap bootstrap = null;
public CustorClient(){
init();
}
private void init(){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
//绑定线程组
bootstrap.group(group);
//设定通讯模式为NIO
bootstrap.channel(NioSocketChannel.class);
}
public ChannelFuture doRequest(String host, int port, final ChannelHandler... handlers) throws InterruptedException {
/**
* 客户端的bootstrap没有childHandler方法,只有handler方法
* 方法含义等同于ServerBootstrap中的childHandler
* 在客户端必须绑定处理器(必须调用handler方法)
* 服务器必须绑定处理器(必须调用childHandler方法)
*/
this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(handlers);
}
});
//建立连接
ChannelFuture future = this.bootstrap.connect(host,port).sync();
return future;
}
public void release(){
this.group.shutdownGracefully();
}
public static void  main(String[] atgs){
CustorClient client = null;
ChannelFuture future = null;
try {
client = new CustorClient();
future = client.doRequest("localhost", 9999, new CustorClientHandler());
Scanner s = null;
while (true) {
s = new Scanner(System.in);
System.out.println("enter message send to server(enter 'exit' for close client)");
String line = s.nextLine();
if ("exit".equals(line)) {
/**
* addListener - 增加监听,当条件满足时候,出发监听器
* ChannelFutureListener.CLOSE - 关闭监听器,代表ChannelFuture执行返回后,关闭连接
*/
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")))
.addListener(ChannelFutureListener.CLOSE);
break;
}
//Unpooled工具类用来做buffer转换
future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("utf-8")));
//睡一秒读取信息
TimeUnit.SECONDS.sleep(1);
}
}catch (Exception e){
e.printStackTrace();
}finally {
if (null != future){
try {
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (null != client){
client.release();
}
}
}
}

clientHandler

[code]package com.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

import java.io.UnsupportedEncodingException;

public class CustorClientHandler extends ChannelHandlerAdapter{

@Override
public void channelRead(ChannelHandlerContext cxt, Object msg) throws UnsupportedEncodingException {
try {
ByteBuf readBuffer = (ByteBuf) msg;
byte[] tempDatas = new byte[readBuffer.readableBytes()];
readBuffer.readBytes(tempDatas);
System.out.println("form server:"+new String(tempDatas,"utf-8"));
} finally {
//释放资源,避免内存溢出
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt,Throwable cause){
System.out.println("client exceptionCaught method run..");
cxt.close();
}
}

启动服务器测试,在客户端输入信息,在服务端显示,输入exit退出客户端

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: