用Netty解析Redis网络协议
2015-06-19 21:45
711 查看
用Netty解析Redis网络协议
根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!1.RESP协议
Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据。RESP的设计权衡了实现简单、解析快速、人类可读这三个因素。Redis客户端通过RESP序列化整数、字符串、数据等数据类型,发送字符串数组表示参数的命令到服务端。服务端根据不同的请求命令响应不同的数据类型。除了管道和订阅外,Redis客户端和服务端都是以这种简单的请求-响应模型通信的。具体来看,RESP支持五种数据类型。以”*”消息头标识总长度,消息内部还可能有”$”标识字符串长度,每行以\r\n结束:
简单字符串(Simple String):以”+”开头,表示正确的状态信息,”+”后就是具体信息。许多Redis命令使用简单字符串作为成功的响应,例如”+OK\r\n”。但简单字符串因为不像Bulk String那样有长度信息,而只能靠\r\n确定是否结束,所以 Simple String不是二进制安全的,即字符串里不能包含\r\n。
错误(Error):以”-“开头,表示错误的状态信息,”-“后就是具体信息。
整数(Integer):以”:”开头,像SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTS***E, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD都返回整数。
批量字符串(Bulk String):以”$”开头,表示下一行的字符串长度,具体字符串在下一行中,字符串最大能达到512MB。”$-1\r\n”叫做Null Bulk String,表示没有数据存在。
数组(Array):以”*”开头,表示消息体总共有多少行(不包括当前行),”*”是具体行数。客户端用RESP数组表示命令发送到服务端,反过来服务端也可以用RESP数组返回数据的集合给客户端。数组可以是混合数据类型,例如一个整数加一个字符串”*2\r\n:1\r\n$6\r\nfoobar\r\n”。另外,嵌套数组也是可以的。
例如,观察下面命令对应的RESP,这一组set/get也正是我们要在Netty里实现的:
set name helloworld -> *3\r\n $3\r\n set\r\n $4\r\n name\r\n $10\r\n helloworld\r\n <- :1\r\n get name -> *2\r\n $3\r\n get\r\n $4\r\n name\r\n <- $10\r\n helloworld\r\n set name abc111 -> *3\r\n $3\r\n set\r\n $4\r\n name\r\n $6\r\n abc111\r\n <- :0\r\n get age -> *2\r\n $3\r\n get\r\n $3\r\n age\r\n <- :-1\r\n
2.用Netty解析协议
下面就用高性能的网络通信框架Netty实现一个简单的Redis服务器后端,解析set和get命令,并保存键值对。2.1 Netty版本
Netty版本,5.0还处于alpha,使用Final版里最新的。但即便是4.0.25.Final竟然也跟4.0的前几个版本有些不同,网上一些例子中用的API根本就找不到了。Netty的API改得有点太“任性”了吧?:)<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.25.Final</version> </dependency>
2.2 启动服务
Netty服务器启动代码,这套代码应该是Netty 4里的标准模板了,具体细节就不在本文赘述了。主要关注我们注册的几个Handler。Netty中Handler分为Inbound和Outbound,RedisCommandDecoder和RedisCommandHandler是Inbound,RedisCommandDecoder是Outbound:RedisCommandDecoder:解析Redis协议,将字节数组转为Command对象。
RedisReplyEncoder:将响应写入到输出流中,返回给客户端。
RedisCommandHandler:执行Command中的命令。
public class Main { public static void main(String[] args) throws Exception { new Main().start(6379); } public void start(int port) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .localAddress(port) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new RedisCommandDecoder()) .addLast(new RedisReplyEncoder()) .addLast(new RedisCommandHandler()); } }); // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shutdown the EventLoopGroup, which releases all resources. group.shutdownGracefully(); } } }
2.3 协议解析
RedisCommandDecoder开始时cmds是null,进入doDecodeNumOfArgs先解析出命令和参数的个数,并初始化cmds。之后就会进入doDecodeArgs逐一解析命令名和参数了。当最后完成时,会根据解析结果创建出RedisCommand对象,并加入到out列表里。这样下一个handler就能继续处理了。public class RedisCommandDecoder extends ReplayingDecoder<Void> { /** Decoded command and arguments */ private byte[][] cmds; /** Current argument */ private int arg; /** Decode in block-io style, rather than nio. */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (cmds == null) { if (in.readByte() == '*') { doDecodeNumOfArgs(in); } } else { doDecodeArgs(in); } if (isComplete()) { doSendCmdToHandler(out); doCleanUp(); } } /** Decode number of arguments */ private void doDecodeNumOfArgs(ByteBuf in) { // Ignore negative case int numOfArgs = readInt(in); System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs); cmds = new byte[numOfArgs][]; checkpoint(); } /** Decode arguments */ private void doDecodeArgs(ByteBuf in) { for (int i = arg; i < cmds.length; i++) { if (in.readByte() == '$') { int lenOfBulkStr = readInt(in); System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr); cmds[i] = new byte[lenOfBulkStr]; in.readBytes(cmds[i]); // Skip CRLF(\r\n) in.skipBytes(2); arg++; checkpoint(); } else { throw new IllegalStateException("Invalid argument"); } } } /** * cmds != null means header decode complete * arg > 0 means arguments decode has begun * arg == cmds.length means complete! */ private boolean isComplete() { return (cmds != null) && (arg > 0) && (arg == cmds.length); } /** Send decoded command to next handler */ private void doSendCmdToHandler(List<Object> out) { System.out.println("RedisCommandDecoder: Send command to next handler"); if (cmds.length == 2) { out.add(new RedisCommand(new String(cmds[0]), cmds[1])); } else if (cmds.length == 3) { out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2])); } else { throw new IllegalStateException("Unknown command"); } } /** Clean up state info */ private void doCleanUp() { this.cmds = null; this.arg = 0; } private int readInt(ByteBuf in) { int integer = 0; char c; while ((c = (char) in.readByte()) != '\r') { integer = (integer * 10) + (c - '0'); } if (in.readByte() != '\n') { throw new IllegalStateException("Invalid number"); } return integer; } }
因为我们只是简单实现set和get命令,所以只可能有一个参数或两个参数:
public class RedisCommand { /** Command name */ private final String name; /** Optional arguments */ private byte[] arg1; private byte[] arg2; public RedisCommand(String name, byte[] arg1) { this.name = name; this.arg1 = arg1; } public RedisCommand(String name, byte[] arg1, byte[] arg2) { this.name = name; this.arg1 = arg1; this.arg2 = arg2; } public String getName() { return name; } public byte[] getArg1() { return arg1; } public byte[] getArg2() { return arg2; } @Override public String toString() { return "Command{" + "name='" + name + '\'' + ", arg1=" + Arrays.toString(arg1) + ", arg2=" + Arrays.toString(arg2) + '}'; } }
2.4 命令执行
RedisCommandHandler拿到RedisCommand后,根据命令名执行命令。这里用一个HashMap模拟数据库了,set就往Map里放,get就从里面取。除了执行具体操作,还要根据执行结果返回不同的Reply对象:保存成功:返回:1\r\n。
修改成功:返回:0\r\n。说明之前Map中已存在此Key。
查询成功:返回Bulk String。具体见后面BulkReply。
Key不存在:返回:-1\r\n。
@ChannelHandler.Sharable public class RedisCommandHandler extends SimpleChannelInboundHandler<RedisCommand> { private HashMap<String, byte[]> database = new HashMap<String, byte[]>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RedisCommand msg) throws Exception { System.out.println("RedisCommandHandler: " + msg); if (msg.getName().equalsIgnoreCase("set")) { if (database.put(new String(msg.getArg1()), msg.getArg2()) == null) { ctx.writeAndFlush(new IntegerReply(1)); } else { ctx.writeAndFlush(new IntegerReply(0)); } } else if (msg.getName().equalsIgnoreCase("get")) { byte[] value = database.get(new String(msg.getArg1())); if (value != null && value.length > 0) { ctx.writeAndFlush(new BulkReply(value)); } else { ctx.writeAndFlush(BulkReply.NIL_REPLY); } } } }
2.5 发送响应
RedisReplyEncoder实现比较简单,拿到RedisReply消息后,直接写入到ByteBuf中就可以了。具体的写入方法都在各个RedisReply的具体实现中。public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> { @Override protected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception { System.out.println("RedisReplyEncoder: " + msg); msg.write(out); } }
public interface RedisReply<T> { byte[] CRLF = new byte[] { '\r', '\n' }; T data(); void write(ByteBuf out) throws IOException; } public class IntegerReply implements RedisReply<Integer> { private static final char MARKER = ':'; private final int data; public IntegerReply(int data) { this.data = data; } @Override public Integer data() { return this.data; } @Override public void write(ByteBuf out) throws IOException { out.writeByte(MARKER); out.writeBytes(String.valueOf(data).getBytes()); out.writeBytes(CRLF); } @Override public String toString() { return "IntegerReply{" + "data=" + data + '}'; } } public class BulkReply implements RedisReply<byte[]> { public static final BulkReply NIL_REPLY = new BulkReply(); private static final char MARKER = '$'; private final byte[] data; private final int len; public BulkReply() { this.data = null; this.len = -1; } public BulkReply(byte[] data) { this.data = data; this.len = data.length; } @Override public byte[] data() { return this.data; } @Override public void write(ByteBuf out) throws IOException { // 1.Write header out.writeByte(MARKER); out.writeBytes(String.valueOf(len).getBytes()); out.writeBytes(CRLF); // 2.Write data if (len > 0) { out.writeBytes(data); out.writeBytes(CRLF); } } @Override public String toString() { return "BulkReply{" + "bytes=" + Arrays.toString(data) + '}'; } }
2.6 运行测试
服务端跑起来后,用官方的redis-cli就能连上我们的服务,执行一些命令测试一下。看到自己实现的Redis“伪服务端”能够“骗过”redis-cli,还是很有成就感的!127.0.0.1:6379> set name helloworld (integer) 1 127.0.0.1:6379> get name "helloworld" 127.0.0.1:6379> set name abc123 (integer) 0 127.0.0.1:6379> get name "abc123" 127.0.0.1:6379> get age (nil)
3.Netty 4中的那些“坑”
因为是初次使用Netty 4,好多网上的资料都是Netty 3或者Netty 4早期版本的,API都不一样了,所以碰到了不少问题,官方文档里也没找到答案,一点点调试、猜测、看源码才摸出点儿“门道”:Handler的基础类:Netty 4里使用SimpleChannelInboundHandler就可以了,之前的API已经不适用了。
Inbound和Outbound处理器间的数据交换:Context对象是数据交换的接口,不同的是:Inbound之间是靠fireChannelRead()进行数据交换,但从Inbound到Outbound就要靠writeAndFlush()触发了。
Inbound和Outbound的顺序:fireChannelRead()会向后找下一个Inbound处理器,但writeAndFlush()会向前找前一个Outbound处理器。所以在ChannelInitializer中,Outbound要放在SimpleChannelInboundHandler前面才能进行数据交换。
@Sharable注解:如果Handler是无状态的话,可以标这个注解。
相关文章推荐
- android sax解析本地XML和网络XML
- 网络图片查看器
- 关于TCP保活功能及其应用
- 使用Apache commons-httpclient获取上传文件的进度
- BP神经网络
- 卷积神经网络
- httpd配置文件
- 能省则省:在ASP.NET Web API中通过HTTP Headers返回数据
- TCP/IP学习笔记(3)-----------TCP/IP协议详解
- HttpClient的认证(转载)
- 招对合适的网络营销推广人才
- TCP/IP学习笔记(2)----------IP协议与寻址
- 检测网络接口
- Nginx + Tomcat + HTTPS 配置原来不需要在 Tomcat 上启用 SSL 支持
- ios网络通信
- PowerShell统计本地的网络连接(CMD吐数据互访)
- C#网络编程(基本概念和操作) - Part.1
- java 获取https 网站内容(不需要导入证书)
- NAT穿透
- 思科路由器远程管理