Redis 客户端源码分析+实现
2017-10-13 17:41
483 查看
前言
小编也是从别处学到的,在这里做备忘,也会不断的补充,主要内容:普通jedis , 分布式代理,java 实现 redis 哨兵模式。
依赖:
协议分析 :
代码分析:
测试
如上结果可以看出来,jedis 遵循 TCP 应用层协议 resp 风格,接下来我们进行模拟。
(1)普通redis 客户端 :
启动redis服务, 端口默认 6379 。
第一种 :
非管道方案
第二种
管道方案
测试:
拓展:
代码实现对一个key 监听,实现最新动态
测试 :
命令 : publish agui sayHello 。
(2)分布式代理 :
我们启动三个redis 服务 , 端口 6380 , 6381 , 6382 。 我们提前在list 中存入三个redis 服务的ip:port 。通过对key 取模 , 来获取 list 中的值,进而切割字符串拿到ip 和端口。然后 模拟普通jedis客户端模式来对redis 服务进行连接并执行写操作,将redis服务返回值进行返回。
启动三个节点的redis 服务 :
代理类
启动代理类
测试 :
(3)java实现redis 哨兵 :
官方实现的哨兵逻辑更复杂,这里简单实现~
总结 :
通过如上可以对jedisSentinelPool 进行二次封装实现代码层次的读写分离。但是好像没有必要,redis 虽然是单线程,单机qps 1W–5W 是没有问题的,考虑fork指令和网络情况,每秒的qps 也是很可观的。故小编认为没有必要去弄读写分离。
小编也是从别处学到的,在这里做备忘,也会不断的补充,主要内容:普通jedis , 分布式代理,java 实现 redis 哨兵模式。
依赖:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency>
协议分析 :
# 分析redis RESP通信协议的方法: > 1 官方文档 > 2 抓包分析 > 3 mock redis服务 > 4 aof日志分析 # 基于TCP的应用层协议 RESP (REdis Serialization Protocol) 请求响应模型(pipelining,subscribes这两种情况例外) 特点: > 易于实现 > 快速解析 > 可阅读性强 内容: For Simple Strings the first byte of the reply is "+" > "+OK\r\n" For Errors the first byte of the reply is "-" > "-Error message\r\n" > -ERR unknown command 'foobar' > -WRONGTYPE Operation against a key holding the wrong kind of value For Integers the first byte of the reply is ":" > ":1000\r\n" For Bulk Strings the first byte of the reply is "$" > "$6\r\nfoobar\r\n" > "$0\r\n\r\n" > "$-1\r\n" For Arrays the first byte of the reply is "*" > "*2\r\n$1\r\na\r\n$1\r\nb\r\n" # 掌握通信协议,可以应用在哪些场景? > 1 客户端开发 > 2 实现redis代理(分布式redis解决方案,通过分片存储内存无限大) > 3 实现哨兵机制 # 著名的开源方案: > Jedis https://github.com/xetorthio/jedis > Redisson https://github.com/redisson/redisson > 推特开源Twemproxy https://github.com/twitter/twemproxy > 豌豆荚团队开源codis https://github.com/CodisLabs/codis > 官方高可用方案:sentinel(哨兵)
代码分析:
package com.tony.test; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; public class RedisVM { // 自己实现socketServer,client连接发出请求后,打印出redis客户端请求信息 public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(6378); Socket socket = serverSocket.accept(); byte[] request = new byte[1024]; InputStream inputStream = socket.getInputStream(); inputStream.read(request); System.out.println(new String(request)); socket.close(); serverSocket.close(); } }
测试
@Test public void jedis() { Jedis jedis = new Jedis("127.0.0.1", 6378); // jedis.set("hello", "tony"); String value = jedis.get("tony"); System.out.println("返回结果:" + value); jedis.close(); } //执行结果 *3 //三个参数 $3 //第一个参数长度为 3 SET //第一个参数 $4 //第二个参数长度为 4 agui //第二个参数 $7 //第三个参数长度为 7 welcome //第三个参数
如上结果可以看出来,jedis 遵循 TCP 应用层协议 resp 风格,接下来我们进行模拟。
(1)普通redis 客户端 :
启动redis服务, 端口默认 6379 。
第一种 :
非管道方案
package com.tony.redis.client; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class TonyRedisClient { Socket socket; InputStream reader; OutputStream writer; public TonyRedisClient() throws Exception { socket = new Socket("127.0.0.1", 6379); reader = socket.getInputStream(); writer = socket.getOutputStream(); } public String set(String k, String v) throws Exception { StringBuffer command = new StringBuffer(); command.append("*3").append("\r\n"); command.append("$3").append("\r\n"); command.append("SET").append("\r\n"); command.append("$").append(k.getBytes().length).append("\r\n"); command.append(k).append("\r\n"); command.append("$").append(v.getBytes().length).append("\r\n"); command.append(v).append("\r\n"); // System.out.println("字符串:"+command.toString()); writer.write(command.toString().getBytes()); byte[] reponse = new byte[1024]; reader.read(reponse); return new String(reponse); } public String get(String k) throws Exception { StringBuffer command = new StringBuffer(); command.append("*2").append("\r\n"); command.append("$3").append("\r\n"); command.append("GET").append("\r\n"); command.append("$").append(k.getBytes().length).append("\r\n"); command.append(k).append("\r\n"); writer.write(command.toString().getBytes()); byte[] reponse = new byte[1024]; reader.read(reponse); return new String(reponse); // return null ; } public 13cdc Pipeline pipelined() { return new Pipeline(reader, writer); } public Subscribe subscribe() { return new Subscribe(reader, writer); } }
第二种
管道方案
package com.tony.redis.client; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class Pipeline { InputStream reader; OutputStream writer; public Pipeline(InputStream reader, OutputStream writer) { this.reader = reader; this.writer = writer; } public void set(String k, String v) throws Exception { StringBuffer command = new StringBuffer(); command.append("*3").append("\r\n"); command.append("$3").append("\r\n"); command.append("SET").append("\r\n"); command.append("$").append(k.getBytes().length).append("\r\n"); command.append(k).append("\r\n"); command.append("$").append(v.getBytes().length).append("\r\n"); command.append(v).append("\r\n"); writer.write(command.toString().getBytes()); } public String response() throws Exception { byte[] reponse = new byte[10000 * 100]; reader.read(reponse); return new String(reponse); } }
测试:
// 普通的实现,未使用管道 @Test public void batchTest() throws Exception { TonyRedisClient tonyRedisClient = new TonyRedisClient(); long now = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { tonyRedisClient.set("counter_batch", i + ""); } String value = tonyRedisClient.get("counter_batch"); System.out.println("##################开始打印普通结果"); System.out.println(value); System.out.println("未使用管道执行时间:" + (System.currentTimeMillis() - now)); System.out.println("##################打印结束"); } // 未使用管道执行时间:1471 // 使用管道的执行时间:378 @Test public void batchPipelineTest() throws Exception { TonyRedisClient tonyRedisClient = new TonyRedisClient(); long now = System.currentTimeMillis(); Pipeline pipelined = tonyRedisClient.pipelined(); for (int i = 0; i < 10000; i++) { pipelined.set("counter_pipeline_batch", i + ""); } pipelined.response(); String value = tonyRedisClient.get("counter_pipeline_batch"); System.out.println("##################开始pipeline普通结果"); System.out.println(value); System.out.println("使用管道的执行时间:" + (System.currentTimeMillis() - now)); System.out.println("##################打印结束"); } //结果分析 # 第一种: ##################开始打印普通结果 $4 9999 未使用管道执行时间:608 ##################打印结束 #第二种: ##################开始pipeline普通结果 $4 9999 使用管道的执行时间:167 ##################打印结束 //608 167 , 明显管道更快,无乱哪一种我能够实现和redis 服务的通信。
拓展:
代码实现对一个key 监听,实现最新动态
package com.tony.redis.client; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class Subscribe { InputStream reader; OutputStream writer; public Subscribe(InputStream reader, OutputStream writer) { this.reader = reader; this.writer = writer; } public void sub(String topic) throws Exception { StringBuffer command = new StringBuffer(); command.append("*2").append("\r\n"); command.append("$9").append("\r\n"); command.append("SUBSCRIBE").append("\r\n"); command.append("$").append(topic.getBytes().length).append("\r\n"); command.append(topic).append("\r\n"); writer.write(command.toString().getBytes()); while(true){ byte[] reponse = new byte[1024]; reader.read(reponse); System.out.println("最新动态:"); System.out.println(new String(reponse)); } } }
测试 :
命令 : publish agui sayHello 。
//订阅最新动态 @Test public void subscribeTest() throws Exception { TonyRedisClient tonyRedisClient = new TonyRedisClient(); tonyRedisClient.subscribe().sub("agui"); } //运行之后,我们打开redis-cli.exe 客户端,修改 key为 agui 的值,我们可以在控制到看到修改后的值,以及一些属性(参数个数,参数长度)。
(2)分布式代理 :
我们启动三个redis 服务 , 端口 6380 , 6381 , 6382 。 我们提前在list 中存入三个redis 服务的ip:port 。通过对key 取模 , 来获取 list 中的值,进而切割字符串拿到ip 和端口。然后 模拟普通jedis客户端模式来对redis 服务进行连接并执行写操作,将redis服务返回值进行返回。
启动三个节点的redis 服务 :
代理类
启动代理类
package com.tony.redis.proxy; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.List; public class TonyRedisProxy { private static List<String> servers = new ArrayList<String>(); static { servers.add("127.0.0.1:6380"); servers.add("127.0.0.1:6381"); servers.add("127.0.0.1:6382"); } // 最简单的代理实现负载均衡 public static void main(String[] args) throws Exception { // 监听端口 ServerSocket serverSocket = new ServerSocket(19000); Socket socket; while ((socket = serverSocket.accept()) != null) { try { while(true) { System.out.println("一个链接...."); InputStream inputStream = socket.getInputStream(); byte[] request = new byte[1024]; inputStream.read(request); // 解析请求 String req = new String(request); System.out.println("收到请求:"); System.out.println(req); String[] params = req.split("\r\n"); // 获取key的长度 int keyLenth = Integer.parseInt(params[3].split("\\$")[1]); // 根据key长度取模 int mod = keyLenth % servers.size(); // 根据取模结果获取地址 System.out.println("根据算法选择服务器:" + servers.get(mod)); String[] serverInfo = servers.get(mod).split(":"); // 处理请求 Socket client = new Socket(serverInfo[0], Integer.parseInt(serverInfo[1])); client.getOutputStream().write(request); // 返回结果 byte[] response = new byte[1024]; client.getInputStream().read(response); client.close(); socket.getOutputStream().write(response); System.out.println("##############打印结束"); System.out.println(); } }catch (Exception e) { } } } }
测试 :
// 通过不同长度的key,测试数据是否均匀分布在三个redis实例 @Test public void proxyTest() { Jedis jedis = null; String response = null; jedis = new Jedis("127.0.0.1", 19000); response = jedis.set("a", "tony_a"); jedis.close(); System.out.println("返回结果:" + response); jedis = new Jedis("127.0.0.1", 19000); response = jedis.set("ab", "tony_ab"); jedis.close(); System.out.println("返回结果:" + response); jedis = new Jedis("127.0.0.1", 19000); response = jedis.set("abc", "tony_abc"); jedis.close(); System.out.println("返回结果:" + response); jedis = new Jedis("127.0.0.1", 19000); response = jedis.set("abcd", "tony_abcd"); jedis.close(); System.out.println("返回结果:" + response); } //结果分析 //客户端输出: 返回结果:OK 返回结果:OK 返回结果:OK 返回结果:OK //服务端输出: 一个链接.... 收到请求: *3 $3 SET $1 a $6 tony_a 根据算法选择服务器:127.0.0.1:6381 ##############打印结束 一个链接.... 一个链接.... 收到请求: *3 $3 SET $2 ab $7 tony_ab 根据算法选择服务器:127.0.0.1:6382 ##############打印结束 一个链接.... 一个链接.... 收到请求: *3 $3 SET $3 abc $8 tony_abc 根据算法选择服务器:127.0.0.1:6380 ##############打印结束 一个链接.... 一个链接.... 收到请求: *3 $3 SET $4 abcd $9 tony_abcd 根据算法选择服务器:127.0.0.1:6381 ##############打印结束
(3)java实现redis 哨兵 :
官方实现的哨兵逻辑更复杂,这里简单实现~
package com.tony.redis.sentinel; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Arrays; import java.util.Iterator; import java.util.Timer; import java.util.TimerTask; import java.util.Vector; import redis.clients.jedis.Jedis; public class TonyRedisSentinel { static String master; // 所有 slave static final Vector<String> slaveRedisServers = new Vector<String>(); // 坏掉的实例 static final Vector<String> badRedisServers = new Vector<String>(); public static void main(String[] args) throws Exception { // 配置 redis master config("127.0.0.1:6380"); // 定时任务 new Timer().schedule(new TimerTask() { @Override public void run() { // 检测 master是否可以 checkMaster(); // 更新slave列表 updateSlaves(); // 检测坏掉的实例是否恢复正常 checkBadServer(); } }, 1000L, 3000L); // 开启端口接收请求 open(); } private static void checkBadServer() { // 获取所有slave Iterator<String> iterator = badRedisServers.iterator(); while (iterator.hasNext()) { String bad = iterator.next(); try { String badHost = bad.split(":")[0]; int badPort = Integer.parseInt(bad.split(":")[1]); Jedis badServer = new Jedis(badHost, badPort); badServer.ping(); // 如果ping没有问题,则挂在当前的master badServer.slaveof(master.split(":")[0], Integer.parseInt(master.split(":")[1])); badServer.close(); slaveRedisServers.add(bad); iterator.remove(); System.out.println(bad + " 恢复正常,当前master:" + master); } catch (Exception e) { } } } private static void updateSlaves() { // 获取所有slave try { String masterHost = master.split(":")[0]; int masterPort = Integer.parseInt(master.split(":")[1]); Jedis jedis = new Jedis(masterHost, masterPort); String info_replication = jedis.info("replication"); // 解析info replication String[] lines = info_replication.split("\r\n"); int slaveCount = Integer.parseInt(lines[2].split(":")[1]); if (slaveCount > 0) { slaveRedisServers.clear(); for (int i = 0; i < slaveCount; i++) { String port = lines[3 + i].split(",")[1].split("=")[1]; slaveRedisServers.add("127.0.0.1:" + port); } } System.out.println("更新slave列表:" + Arrays.toString(slaveRedisServers.toArray(new String[] {}))); jedis.close(); } catch (Exception e) { e.printStackTrace(); System.out.println("更新slave失败:" + e.getMessage()); } } private static void config(String ms) { master = ms; } private static void checkMaster() { // 主从切换 // 检查状态 System.out.println("检查master状态:" + master); String masterHost = master.split(":")[0]; int masterPort = Integer.parseInt(master.split(":")[1]); try { Jedis jedis = new Jedis(masterHost, masterPort); jedis.ping(); jedis.close(); } catch (Exception e) { // master挂掉啦 badRedisServers.add(master); // 切换master changeMaster(); } } /** 切换master */ private static void changeMaster() { Iterator<String> iterator = slaveRedisServers.iterator(); while (iterator.hasNext()) { String slave = iterator.next(); try { String slaveHost = slave.split(":")[0]; int slavePort = Integer.parseInt(slave.split(":")[1]); Jedis jedis = new Jedis(slaveHost, slavePort); jedis.slaveofNoOne(); jedis.close(); master = slave; System.out.println("产生新的master:" + master); break; } catch (Exception e) { badRedisServers.add(slave); } finally { iterator.remove(); } } // 所有slave切到新的master for (String slave : slaveRedisServers) { String slaveHost = slave.split(":")[0]; int slavePort = Integer.parseInt(slave.split(":")[1]); Jedis jedis = new Jedis(slaveHost, slavePort); jedis.slaveof(master.split(":")[0], Integer.parseInt(master.split(":")[1])); jedis.close(); } } private static void open() throws Exception { // SENTINEL get-master-addr-by-name master // tcp port ServerSocket sentlnel = new ServerSocket(26380); Socket socket; while ((socket = sentlnel.accept()) != null) { try { while(true) { System.out.println("一个链接...."); InputStream inputStream = socket.getInputStream(); byte[] request = new byte[1024]; inputStream.read(request); // 解析 get-master-addr-by-name 请求 String req = new String(request); System.out.println("收到请求:"); System.out.println(req); System.out.println("##############打印结束"); System.out.println(); String[] params = req.split("\r\n"); if("get-master-addr-by-name".equals(params[4])){ // 返回结果 String result = "*2\r\n" + "$9\r\n" + master.split(":")[0]+"\r\n" + // master host "$4\r\n" + master.split(":")[1]+"\r\n"; // master port socket.getOutputStream().write(result.getBytes()); } } }catch (Exception e) { } } } }
// 哨兵机制测试 @Test public static void sentinelTest() { // SENTINEL get-master-addr-by-name master Set<String> sentinels = new HashSet<String>(); // 这里添加哨兵服务器 sentinels.add("localhost:26380"); JedisSentinelPool jedisSentinelPool = new JedisSentinelPool("mymaster", sentinels); Jedis jedis = jedisSentinelPool.getResource(); jedis.set("hello_sentinel_3", "tony_sentinel_3"); } // 通过这个socket客户端发出请求 // 获取sentinel的响应内容 @Test public void sentinelClientTest() throws Exception { // SENTINEL get-master-addr-by-name master Socket socket = new Socket("127.0.0.1", 26380); socket.getOutputStream().write(("*3\r\n" + "$8\r\n" + "SENTINEL\r\n" + "$23\r\n" + "get-master-addr-by-name\r\n" + "$6\r\n" + "master\r\n").getBytes()); byte[] response = new byte[1024]; socket.getInputStream().read(response); System.out.println(new String(response)); }
总结 :
通过如上可以对jedisSentinelPool 进行二次封装实现代码层次的读写分离。但是好像没有必要,redis 虽然是单线程,单机qps 1W–5W 是没有问题的,考虑fork指令和网络情况,每秒的qps 也是很可观的。故小编认为没有必要去弄读写分离。
相关文章推荐
- Redis源码分析(三十三)--- redis-cli.c客户端命令行接口的实现(2)
- redis事务实现原理(源码分析)【转】
- redis客户端Jedis源码分析系列——集合JedisByteHashMap
- 豆瓣客户端的实现09 源码分析
- cpp_redis (Windows C++ Redis客户端)(C++11实现)官方最新源码编译
- 基于Redis实现分布式锁,Redisson使用及源码分析
- 基于Redis实现分布式锁,Redisson使用及源码分析
- 基于Redis实现分布式锁-Redisson使用及源码分析
- redis源码分析 dict字典的实现和内部应用
- shiro源码分析篇5:结合redis实现session跨域
- Redis源码分析(二十四)——客户端与服务器
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现之DFSPacket
- Redis源码分析(十九)--- replication主从数据复制的实现
- redis之整数集合的实现源码分析
- Redis源码分析(二十五)--- zmalloc内存分配实现
- owncloud源码分析6--客户端单点实现
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
- [置顶] Redis客户端Jedis源码阅读及连接池分析
- Redis源码分析(三十四)--- redis.h服务端的实现分析(1)
- redis源码分析 -- cs结构分析之客户端