您的位置:首页 > 产品设计 > UI/UE

Netty之实现自定义简单的编解码器二(MessageToMessageEncoder<CharSequence>和MessageToMessageDecoder<ByteBuf>)

2016-12-14 21:46 986 查看
1、对于MessageToMessageEncoder的理解

         MessageToMessageEncoder编码器,这里的第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和之前的MessageToByte的原理是一样的了。所以要在MessageToMessageDecoder<ByteBuf>的解码器里面,手动的指定,是对ByteBuf类型的对象进行解码的操作。

2、编写MyStringEncoder编码器和MyStringDecoder解码器,以便于,Netty中可以直接发送和接收String类型的数据

     2.1  MyStringEncoder编码器的代码

import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;

public class MyStringEncoder extends MessageToMessageEncoder<CharSequence> {
private final Charset charset;

public MyStringEncoder() {
this(Charset.defaultCharset());
}

public MyStringEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}

protected void encode(ChannelHandlerContext ctx, CharSequence msg,
List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}

out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),
this.charset));
}
}


     2.2  MyStringDecoder解码器的代码

import java.nio.charset.Charset;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;

public class MyStringDecoder extends MessageToMessageDecoder<ByteBuf> {
private final Charset charset;

public MyStringDecoder() {
this(Charset.defaultCharset());
}

public MyStringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}

protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
out.add(msg.toString(this.charset));
}
}
3、服务端的实现

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {
public void bind(int port) throws Exception {
// 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
// 另一个线程组用于处理SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// NIO服务器端的辅助启动类 降低服务器开发难度
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)// 类似NIO中serverSocketChannel
.option(ChannelOption.SO_BACKLOG, 1024)// 配置TCP参数
.option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区
.option(ChannelOption.SO_SNDBUF, 32 * 1024) // 设置发送缓冲大小
.option(ChannelOption.SO_RCVBUF, 32 * 1024) // 这是接收缓冲大小
.option(ChannelOption.SO_KEEPALIVE, true) // 保持连接
.childHandler(new ChildChannelHandler());// 最后绑定I/O事件的处理类
// 处理网络IO事件

// 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
ChannelFuture f = serverBootstrap.bind(port).sync();
System.out.println("Server启动");
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();

} finally {
// 优雅退出 释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("服务器优雅的释放了线程资源...");
}

}

/**
* 网络事件处理器
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 增加自定义的编码器和解码器
ch.pipeline().addLast(new MyStringEncoder());
ch.pipeline().addLast(new MyStringDecoder());
// 服务端的处理器
ch.pipeline().addLast(new ServerHandler());
}
}

public static void main(String[] args) throws Exception {
int port = 9998;
new Server().bind(port);
}
}
4、服务端Handler的实现

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// 接受客户端的数据
String body = (String) msg;
System.out.println("Client :" + body);
// 服务端,回写数据给客户端
// 直接回写整形的数据
String data = "Hello ,I am Server ...";
ctx.writeAndFlush(data);

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
5、客户端的实现

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
/**
* 连接服务器
*
* @param port
* @param host
* @throws Exception
*/
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端辅助启动类 对客户端配置
Bootstrap b = new Bootstrap();
b.group(group)//
.channel(NioSocketChannel.class)//
.option(ChannelOption.TCP_NODELAY, true)//
.handler(new MyChannelHandler());//
// 异步链接服务器 同步等待链接成功
ChannelFuture f = b.connect(host, port).sync();
System.out.println(f);
// 发送消息
Thread.sleep(1000);
f.channel().writeAndFlush("777");
f.channel().writeAndFlush("666");
Thread.sleep(2000);
f.channel().writeAndFlush("888");

// 等待链接关闭
f.channel().closeFuture().sync();

} finally {
group.shutdownGracefully();
System.out.println("客户端优雅的释放了线程资源...");
}

}

/**
* 网络事件处理器
*/
private class MyChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 增加自定义的编码器和解码器
ch.pipeline().addLast(new MyStringEncoder());
ch.pipeline().addLast(new MyStringDecoder());
// 客户端的处理器
ch.pipeline().addLast(new ClientHandler());
}

}

public static void main(String[] args) throws Exception {
new Client().connect(9998, "127.0.0.1");

}
}
6、客户端Handler的实现

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
String body = (String) msg;
System.out.println("Client :" + body);

// 只是读数据,没有写数据的话
// 需要自己手动的释放的消息

} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}

}
7、关于直接发送和接收String类型的编码

     7.1 直接发送String类型的数据



     7.2 直接接收String类型的数据

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Netty