您的位置:首页 > 数据库 > Redis

Redis 客户端源码分析+实现

2017-10-13 17:41 483 查看
前言

小编也是从别处学到的,在这里做备忘,也会不断的补充,主要内容:普通jedis , 分布式代理,java 实现 redis 哨兵模式。

依赖:

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>


协议分析 :

# 分析redis RESP通信协议的方法:
> 1 官方文档

> 2 抓包分析

> 3 mock redis服务

> 4 aof日志分析

# 基于TCP的应用层协议 RESP (REdis Serialization Protocol)
请求响应模型(pipelining,subscribes这两种情况例外)

特点:
> 易于实现

> 快速解析

> 可阅读性强

内容:

For Simple Strings the first byte of the reply is "+"
> "+OK\r\n"

For Errors the first byte of the reply is "-"
> "-Error message\r\n"

> -ERR unknown command 'foobar'

> -WRONGTYPE Operation against a key holding the wrong kind of value

For Integers the first byte of the reply is ":"
> ":1000\r\n"

For Bulk Strings the first byte of the reply is "$"
> "$6\r\nfoobar\r\n"

> "$0\r\n\r\n"

> "$-1\r\n"

For Arrays the first byte of the reply is "*"
> "*2\r\n$1\r\na\r\n$1\r\nb\r\n"

# 掌握通信协议,可以应用在哪些场景?
> 1 客户端开发

> 2 实现redis代理(分布式redis解决方案,通过分片存储内存无限大)

> 3 实现哨兵机制

# 著名的开源方案:
> Jedis     https://github.com/xetorthio/jedis 
> Redisson      https://github.com/redisson/redisson 
> 推特开源Twemproxy  https://github.com/twitter/twemproxy 
> 豌豆荚团队开源codis https://github.com/CodisLabs/codis 
> 官方高可用方案:sentinel(哨兵)


代码分析:

package com.tony.test;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class RedisVM {
// 自己实现socketServer,client连接发出请求后,打印出redis客户端请求信息
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(6378);

Socket socket = serverSocket.accept();

byte[] request = new byte[1024];

InputStream inputStream = socket.getInputStream();

inputStream.read(request);

System.out.println(new String(request));
socket.close();
serverSocket.close();
}
}


测试

@Test
public void jedis() {
Jedis jedis = new Jedis("127.0.0.1", 6378);

//      jedis.set("hello", "tony");

String value = jedis.get("tony");

System.out.println("返回结果:" + value);

jedis.close();

}

//执行结果
*3   //三个参数
$3   //第一个参数长度为  3
SET  //第一个参数
$4   //第二个参数长度为  4
agui //第二个参数
$7   //第三个参数长度为  7
welcome  //第三个参数


如上结果可以看出来,jedis 遵循 TCP 应用层协议 resp 风格,接下来我们进行模拟。

(1)普通redis 客户端 :

启动redis服务, 端口默认 6379 。

第一种 :

非管道方案

package com.tony.redis.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class TonyRedisClient {
Socket socket;
InputStream reader;
OutputStream writer;

public TonyRedisClient() throws Exception {
socket = new Socket("127.0.0.1", 6379);
reader = socket.getInputStream();
writer = socket.getOutputStream();
}

public String set(String k, String v) throws Exception {
StringBuffer command = new StringBuffer();
command.append("*3").append("\r\n");
command.append("$3").append("\r\n");
command.append("SET").append("\r\n");
command.append("$").append(k.getBytes().length).append("\r\n");
command.append(k).append("\r\n");
command.append("$").append(v.getBytes().length).append("\r\n");
command.append(v).append("\r\n");

//      System.out.println("字符串:"+command.toString());

writer.write(command.toString().getBytes());

byte[] reponse = new byte[1024];
reader.read(reponse);

return new String(reponse);
}

public String get(String k) throws Exception {
StringBuffer command = new StringBuffer();
command.append("*2").append("\r\n");
command.append("$3").append("\r\n");
command.append("GET").append("\r\n");
command.append("$").append(k.getBytes().length).append("\r\n");
command.append(k).append("\r\n");

writer.write(command.toString().getBytes());

byte[] reponse = new byte[1024];
reader.read(reponse);

return new String(reponse);
//      return null ;
}

public
13cdc
Pipeline pipelined() {
return new Pipeline(reader, writer);
}

public Subscribe subscribe() {
return new Subscribe(reader, writer);
}
}


第二种

管道方案

package com.tony.redis.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class Pipeline {
InputStream reader;
OutputStream writer;

public Pipeline(InputStream reader, OutputStream writer) {
this.reader = reader;
this.writer = writer;
}

public void set(String k, String v) throws Exception {
StringBuffer command = new StringBuffer();
command.append("*3").append("\r\n");
command.append("$3").append("\r\n");
command.append("SET").append("\r\n");
command.append("$").append(k.getBytes().length).append("\r\n");
command.append(k).append("\r\n");
command.append("$").append(v.getBytes().length).append("\r\n");
command.append(v).append("\r\n");

writer.write(command.toString().getBytes());

}

public String response() throws Exception {
byte[] reponse = new byte[10000 * 100];
reader.read(reponse);

return new String(reponse);
}

}


测试:

// 普通的实现,未使用管道
@Test
public void batchTest() throws Exception {
TonyRedisClient tonyRedisClient = new TonyRedisClient();

long now = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
tonyRedisClient.set("counter_batch", i + "");
}
String value = tonyRedisClient.get("counter_batch");
System.out.println("##################开始打印普通结果");
System.out.println(value);
System.out.println("未使用管道执行时间:" + (System.currentTimeMillis() - now));
System.out.println("##################打印结束");
}

// 未使用管道执行时间:1471
// 使用管道的执行时间:378
@Test
public void batchPipelineTest() throws Exception {
TonyRedisClient tonyRedisClient = new TonyRedisClient();

long now = System.currentTimeMillis();
Pipeline pipelined = tonyRedisClient.pipelined();
for (int i = 0; i < 10000; i++) {
pipelined.set("counter_pipeline_batch", i + "");
}
pipelined.response();
String value = tonyRedisClient.get("counter_pipeline_batch");
System.out.println("##################开始pipeline普通结果");
System.out.println(value);
System.out.println("使用管道的执行时间:" + (System.currentTimeMillis() - now));
System.out.println("##################打印结束");
}

//结果分析
# 第一种:
##################开始打印普通结果
$4
9999
未使用管道执行时间:608
##################打印结束
#第二种:
##################开始pipeline普通结果
$4
9999
使用管道的执行时间:167
##################打印结束

//608  167 , 明显管道更快,无乱哪一种我能够实现和redis 服务的通信。


拓展:

代码实现对一个key 监听,实现最新动态

package com.tony.redis.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class Subscribe {
InputStream reader;
OutputStream writer;

public Subscribe(InputStream reader, OutputStream writer) {
this.reader = reader;
this.writer = writer;
}

public void sub(String topic) throws Exception {
StringBuffer command = new StringBuffer();
command.append("*2").append("\r\n");
command.append("$9").append("\r\n");
command.append("SUBSCRIBE").append("\r\n");
command.append("$").append(topic.getBytes().length).append("\r\n");
command.append(topic).append("\r\n");

writer.write(command.toString().getBytes());

while(true){
byte[] reponse = new byte[1024];
reader.read(reponse);
System.out.println("最新动态:");
System.out.println(new String(reponse));
}

}
}


测试 :

命令 : publish agui sayHello 。

//订阅最新动态
@Test
public void subscribeTest() throws Exception {
TonyRedisClient tonyRedisClient = new TonyRedisClient();

tonyRedisClient.subscribe().sub("agui");
}

//运行之后,我们打开redis-cli.exe 客户端,修改 key为 agui 的值,我们可以在控制到看到修改后的值,以及一些属性(参数个数,参数长度)。




(2)分布式代理 :

我们启动三个redis 服务 , 端口 6380 , 6381 , 6382 。 我们提前在list 中存入三个redis 服务的ip:port 。通过对key 取模 , 来获取 list 中的值,进而切割字符串拿到ip 和端口。然后 模拟普通jedis客户端模式来对redis 服务进行连接并执行写操作,将redis服务返回值进行返回。

启动三个节点的redis 服务 :



代理类

启动代理类

package com.tony.redis.proxy;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class TonyRedisProxy {
private static List<String> servers = new ArrayList<String>();

static {
servers.add("127.0.0.1:6380");
servers.add("127.0.0.1:6381");
servers.add("127.0.0.1:6382");
}

// 最简单的代理实现负载均衡
public static void main(String[] args) throws Exception {
// 监听端口
ServerSocket serverSocket = new ServerSocket(19000);
Socket socket;
while ((socket = serverSocket.accept()) != null) {
try {

while(true) {

System.out.println("一个链接....");
InputStream inputStream = socket.getInputStream();
byte[] request = new byte[1024];
inputStream.read(request);
// 解析请求
String req = new String(request);
System.out.println("收到请求:");
System.out.println(req);

String[] params = req.split("\r\n");
// 获取key的长度
int keyLenth = Integer.parseInt(params[3].split("\\$")[1]);
// 根据key长度取模
int mod = keyLenth % servers.size();
// 根据取模结果获取地址
System.out.println("根据算法选择服务器:" + servers.get(mod));
String[] serverInfo = servers.get(mod).split(":");
// 处理请求
Socket client = new Socket(serverInfo[0], Integer.parseInt(serverInfo[1]));
client.getOutputStream().write(request);
// 返回结果
byte[] response = new byte[1024];
client.getInputStream().read(response);
client.close();
socket.getOutputStream().write(response);

System.out.println("##############打印结束");
System.out.println();

}
}catch (Exception e) {
}

}
}
}


测试 :

// 通过不同长度的key,测试数据是否均匀分布在三个redis实例
@Test
public void proxyTest() {
Jedis jedis = null;
String response = null;

jedis = new Jedis("127.0.0.1", 19000);
response = jedis.set("a", "tony_a");
jedis.close();
System.out.println("返回结果:" + response);

jedis = new Jedis("127.0.0.1", 19000);
response = jedis.set("ab", "tony_ab");
jedis.close();
System.out.println("返回结果:" + response);

jedis = new Jedis("127.0.0.1", 19000);
response = jedis.set("abc", "tony_abc");
jedis.close();
System.out.println("返回结果:" + response);

jedis = new Jedis("127.0.0.1", 19000);
response = jedis.set("abcd", "tony_abcd");
jedis.close();
System.out.println("返回结果:" + response);

}
//结果分析
//客户端输出:
返回结果:OK
返回结果:OK
返回结果:OK
返回结果:OK
//服务端输出:
一个链接....
收到请求:
*3
$3
SET
$1
a
$6
tony_a
根据算法选择服务器:127.0.0.1:6381
##############打印结束
一个链接....
一个链接....
收到请求:
*3
$3
SET
$2
ab
$7
tony_ab
根据算法选择服务器:127.0.0.1:6382
##############打印结束
一个链接....
一个链接....
收到请求:
*3
$3
SET
$3
abc
$8
tony_abc
根据算法选择服务器:127.0.0.1:6380
##############打印结束
一个链接....
一个链接....
收到请求:
*3
$3
SET
$4
abcd
$9
tony_abcd
根据算法选择服务器:127.0.0.1:6381
##############打印结束


(3)java实现redis 哨兵 :

官方实现的哨兵逻辑更复杂,这里简单实现~

package com.tony.redis.sentinel;

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;

import redis.clients.jedis.Jedis;

public class TonyRedisSentinel {
static String master;
// 所有 slave
static final Vector<String> slaveRedisServers = new Vector<String>();
// 坏掉的实例
static final Vector<String> badRedisServers = new Vector<String>();

public static void main(String[] args) throws Exception {
// 配置 redis master
config("127.0.0.1:6380");
// 定时任务
new Timer().schedule(new TimerTask() {

@Override
public void run() {
// 检测 master是否可以
checkMaster();
// 更新slave列表
updateSlaves();
// 检测坏掉的实例是否恢复正常
checkBadServer();

}
}, 1000L, 3000L);

// 开启端口接收请求
open();
}

private static void checkBadServer() {
// 获取所有slave
Iterator<String> iterator = badRedisServers.iterator();
while (iterator.hasNext()) {
String bad = iterator.next();
try {
String badHost = bad.split(":")[0];
int badPort = Integer.parseInt(bad.split(":")[1]);
Jedis badServer = new Jedis(badHost, badPort);
badServer.ping();

// 如果ping没有问题,则挂在当前的master
badServer.slaveof(master.split(":")[0], Integer.parseInt(master.split(":")[1]));
badServer.close();

slaveRedisServers.add(bad);
iterator.remove();
System.out.println(bad + " 恢复正常,当前master:" + master);
} catch (Exception e) {
}
}
}

private static void updateSlaves() {
// 获取所有slave
try {
String masterHost = master.split(":")[0];
int masterPort = Integer.parseInt(master.split(":")[1]);
Jedis jedis = new Jedis(masterHost, masterPort);
String info_replication = jedis.info("replication");
// 解析info replication
String[] lines = info_replication.split("\r\n");
int slaveCount = Integer.parseInt(lines[2].split(":")[1]);
if (slaveCount > 0) {
slaveRedisServers.clear();
for (int i = 0; i < slaveCount; i++) {
String port = lines[3 + i].split(",")[1].split("=")[1];
slaveRedisServers.add("127.0.0.1:" + port);
}
}
System.out.println("更新slave列表:" + Arrays.toString(slaveRedisServers.toArray(new String[] {})));
jedis.close();
} catch (Exception e) {
e.printStackTrace();
System.out.println("更新slave失败:" + e.getMessage());
}
}

private static void config(String ms) {
master = ms;
}

private static void checkMaster() {
// 主从切换
// 检查状态
System.out.println("检查master状态:" + master);
String masterHost = master.split(":")[0];
int masterPort = Integer.parseInt(master.split(":")[1]);
try {
Jedis jedis = new Jedis(masterHost, masterPort);
jedis.ping();
jedis.close();
} catch (Exception e) {
// master挂掉啦
badRedisServers.add(master);
// 切换master
changeMaster();
}
}

/** 切换master */
private static void changeMaster() {
Iterator<String> iterator = slaveRedisServers.iterator();
while (iterator.hasNext()) {
String slave = iterator.next();
try {
String slaveHost = slave.split(":")[0];
int slavePort = Integer.parseInt(slave.split(":")[1]);
Jedis jedis = new Jedis(slaveHost, slavePort);
jedis.slaveofNoOne();
jedis.close();
master = slave;
System.out.println("产生新的master:" + master);
break;
} catch (Exception e) {
badRedisServers.add(slave);
} finally {
iterator.remove();
}
}

// 所有slave切到新的master
for (String slave : slaveRedisServers) {
String slaveHost = slave.split(":")[0];
int slavePort = Integer.parseInt(slave.split(":")[1]);
Jedis jedis = new Jedis(slaveHost, slavePort);

jedis.slaveof(master.split(":")[0], Integer.parseInt(master.split(":")[1]));

jedis.close();
}
}

private static void open() throws Exception {
// SENTINEL get-master-addr-by-name master
// tcp port
ServerSocket sentlnel = new ServerSocket(26380);
Socket socket;
while ((socket = sentlnel.accept()) != null) {
try {

while(true) {

System.out.println("一个链接....");
InputStream inputStream = socket.getInputStream();
byte[] request = new byte[1024];
inputStream.read(request);
// 解析 get-master-addr-by-name 请求
String req = new String(request);
System.out.println("收到请求:");
System.out.println(req);
System.out.println("##############打印结束");
System.out.println();

String[] params = req.split("\r\n");

if("get-master-addr-by-name".equals(params[4])){
// 返回结果
String result = "*2\r\n" +
"$9\r\n" +
master.split(":")[0]+"\r\n" + // master host
"$4\r\n" +
master.split(":")[1]+"\r\n"; // master port

socket.getOutputStream().write(result.getBytes());
}
}
}catch (Exception e) {
}

}

}

}


// 哨兵机制测试
@Test
public static void sentinelTest() {
// SENTINEL get-master-addr-by-name master
Set<String> sentinels = new HashSet<String>();
// 这里添加哨兵服务器
sentinels.add("localhost:26380");

JedisSentinelPool jedisSentinelPool = new JedisSentinelPool("mymaster", sentinels);
Jedis jedis = jedisSentinelPool.getResource();

jedis.set("hello_sentinel_3", "tony_sentinel_3");
}

// 通过这个socket客户端发出请求
// 获取sentinel的响应内容
@Test
public void sentinelClientTest() throws Exception {
// SENTINEL get-master-addr-by-name master
Socket socket = new Socket("127.0.0.1", 26380);

socket.getOutputStream().write(("*3\r\n" +
"$8\r\n" +
"SENTINEL\r\n" +
"$23\r\n" +
"get-master-addr-by-name\r\n" +
"$6\r\n" +
"master\r\n").getBytes());

byte[] response = new byte[1024];
socket.getInputStream().read(response);

System.out.println(new String(response));
}


总结 :

通过如上可以对jedisSentinelPool 进行二次封装实现代码层次的读写分离。但是好像没有必要,redis 虽然是单线程,单机qps 1W–5W 是没有问题的,考虑fork指令和网络情况,每秒的qps 也是很可观的。故小编认为没有必要去弄读写分离。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: