分布式缓存技术redis学习系列(八)——JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取
2016-11-16 15:46
971 查看
redis集群环境,客户端使用JedisCluster获取连接并操作redis服务,上一篇 分布式缓存技术redis学习系列(七)——spring整合jediscluster 简单介绍了spring使用JedisCluster,这篇从JedisCluster源码层面看看是如何使用。
2、在单元测试中我们注入了JedisCluster jedisCluster,它来源于spring配置文件对其的注册:
3、JedisClusterFactory:
它实现了InitializingBean接口,重写了afterPropertiesSet()方法,当JedisClusterFactory Bean被注册之后,此方法被调用,它里面调用parseHostAndPort()方法,具体代码如下:
4、从初始化集群代码一路跟踪,直至如下代码处:
注意上面代码段for循环里面的break:
它让for循环立刻跳出,导致for循环实际上只执行了一次,也就是说只使用了配置文件中所有节点配置的第一个,如此说来,集群配置文件中节点配置一个和配置多个效果一致。
同时还要注意到:通过获取的第一个节点配置实例化了一个Jedis jedis,如果配置文件中第一个节点指向的服务挂机或无法连接,将导致程序无法使用整个集群,虽然redis集群中其它节点是可用的。
5、discoverClusterNodesAndSlots:从redis服务器获取集群节点信息以及slot槽信息
关键信息:String localNodes = jedis.clusterNodes();
调用jedis实例的clusterNodes,实际上在redis服务端执行了cluster nodes命令,执行结果如下:
6、通过cluster nodes命令获取集群所有节点的信息字符串,然后进行解析封装,解析函数parse代码如下:
关键信息:String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
获取slot槽分配的区间值,比如0-5460,具体获取代码如下:
parse函数中另一个关键信息:fillSlotInformation(slotInfoPartArray, info);
fillSlotInformation函数调用了fillSlotInformationFromSlotRange函数,fillSlotInformationFromSlotRange函数作用是把所有的slot槽的index值存放到addAvailableSlot集合中,具体代码如下:
7、初始化核心工作:把当前节点存放入nodes集合中,key为节点host:port,value为JedisPool实例;把slot槽index索引值与当前节点的JedisPool进行映射,存入Map
关键信息:slots.put(slot, targetPool);实现slot槽index索引值与当前节点的JedisPool进行映射
2、jedisCluster.set():使用匿名内部类重写execute方法从redis服务器存取数据,调用run方法获取分配的slot槽,进而获取slot槽对应的JedisPool来生成jedis连接实例
3、run(),调用runWithRetries()方法:
关键信息解析:redirections参数
重试连接次数,与配置的redis集群环境主从服务个数相同,如果集群中一个master挂机,并且哨兵机制还没有发现该master挂机,客户端程序发送请求,出现无法连接的情况,之后会不断重试连接,超过最大集群服务个数,则抛出
关键信息解析:tryRandomNode参数
是否重试随机连接,首次为false,如果出现master挂机无法连接,执行重试,该参数变为true。
关键信息解析:connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
1)、通过JedisCluster的CRC16算法,对当前的key进行计算,获取一个整形值,区间为16384 - 1,redis集群分配的所有slot数
2)、通过获取的整形值,从集群初始化的slots集合中获取对应的JedisPool,进而获取Jedis连接实例
=====20170622更新=====
关键信息解析:两个catch语句块
两个catch语句块,实现了master挂机时,客户端重连的机制,这也是JedisCluster实现集群高可用的关键。
但是,如果master挂机,客户端程序执行重连,redis的哨兵机制还没有发现该master挂机或者发现但是没有完成挂机master替换或者替换失败,最终还是会抛出
=====20170622更新=====
总结:
程序启动初始化集群环境:
1)、读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例
2)、用获取的redis连接实例执行clusterNodes()方法,实际执行redis服务端cluster nodes命令,获取主从配置信息
3)、解析主从配置信息,先把所有节点存放到nodes的map集合中,key为节点的ip:port,value为当前节点的jedisPool
4)、解析主节点分配的slots区间段,把slot对应的索引值作为key,第三步中拿到的jedisPool作为value,存储在slots的map集合中
综上,就实现了slot槽索引值与jedisPool的映射,这个jedisPool包含了master的节点信息,所以槽和几点是对应的,与redis服务端一致
从集群环境存取值:
1)、把key作为参数,执行CRC16算法,获取key对应的slot值
2)、通过该slot值,去slots的map集合中获取jedisPool实例
3)、通过jedisPool实例获取jedis实例,最终完成redis数据存取工作
jediscluster并不能实现客户端程序高可用:上述初始化和存取值的过程,如果客户端程序运行过程中,某一个master挂了,redis服务端的sentinel哨兵执行了主从替换,但是程序还是必然报错;
因为集群节点初始化是程序启动时执行的,并且在程序执行过程中只初始化一次;后续服务端master变更,客户端程序初始化的slots与master对应关系并没有同步实现变更;
存取值过程,依据slot索引拿到jedisPool实例,其包含的redis连接还是原来的也就是挂掉的ip:port,所以集群配置并没有让客户端程序实现高可用,只是实现了分布式功能。
只有重启服务重新初始化新的集群环境,程序方可正常运行。
===20170622更新======
上述蓝色字体结论部分有错误,经过重新验证,得出一下结论:
jediscluster可以实现客户端程序高可用,当出现master挂机,客户端程序通过catch语句块形式进行回调,完成重连操作,实现高可用机制;
但是,redis集群环境是通过哨兵机制进行监控的,要考虑的哨兵机制的延迟性(替换挂机的master),客户端程序需要对因哨兵机制的延迟造成的影响进行处理(比如增加延迟重试、对指定jedisCluster异常进行捕获后再重试等),充分保证客户端程序高可用
一、集群初始化
1、从单元测试开始,代码如下:@Autowired private JedisCluster jedisCluster; @Test public void testJedisCluster(){ jedisCluster.set("name", "啊芝"); String val = jedisCluster.get("name"); System.out.println(val); }
2、在单元测试中我们注入了JedisCluster jedisCluster,它来源于spring配置文件对其的注册:
<bean id="jedisCluster" class="com.jsun.service.redis.impl.JedisClusterFactory"> <property name="addressConfig"> <value>classpath:redis.properties</value> </property> <property name="addressKeyPrefix" value="cluster" /> <!-- 属性文件里 key的前缀 --> <property name="timeout" value="300000" /> <property name="maxRedirections" value="6" /> <property name="genericObjectPoolConfig" ref="genericObjectPoolConfig" /> </bean>
3、JedisClusterFactory:
它实现了InitializingBean接口,重写了afterPropertiesSet()方法,当JedisClusterFactory Bean被注册之后,此方法被调用,它里面调用parseHostAndPort()方法,具体代码如下:
@Override public void afterPropertiesSet() throws Exception { //拿到所有节点配置 Set<HostAndPort> haps = this.parseHostAndPort(); //初始化集群 jedisCluster = new JedisCluster(haps, timeout, maxRedirections,genericObjectPoolConfig); }
private Set<HostAndPort> parseHostAndPort() throws Exception { try { //读取配置文件,即redis.properties,把配置的节点全部存放入Set<HostAndPort> haps里面返回 Properties prop = new Properties(); prop.load(this.addressConfig.getInputStream()); Set<HostAndPort> haps = new HashSet<HostAndPort>(); for (Object key : prop.keySet()) { if (!((String) key).startsWith(addressKeyPrefix)) { continue; } String val = (String) prop.get(key); boolean isIpPort = p.matcher(val).matches(); if (!isIpPort) { throw new IllegalArgumentException("ip 或 port 不合法"); } String[] ipAndPort = val.split(":"); HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1])); haps.add(hap); } return haps; } catch (IllegalArgumentException ex) { throw ex; } catch (Exception ex) { throw new Exception("解析 jedis 配置文件失败", ex); } }
4、从初始化集群代码一路跟踪,直至如下代码处:
private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig) { //遍历所有从配置文件读取的节点 for (HostAndPort hostAndPort : startNodes) { Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); try { cache.discoverClusterNodesAndSlots(jedis); //这个break是个重点,虽然使用的for循环遍历,但是此处的break让此循环立刻跳出 break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } } for (HostAndPort node : startNodes) { cache.setNodeIfNotExist(node); } }
注意上面代码段for循环里面的break:
它让for循环立刻跳出,导致for循环实际上只执行了一次,也就是说只使用了配置文件中所有节点配置的第一个,如此说来,集群配置文件中节点配置一个和配置多个效果一致。
同时还要注意到:通过获取的第一个节点配置实例化了一个Jedis jedis,如果配置文件中第一个节点指向的服务挂机或无法连接,将导致程序无法使用整个集群,虽然redis集群中其它节点是可用的。
5、discoverClusterNodesAndSlots:从redis服务器获取集群节点信息以及slot槽信息
//存放 private Map<String, JedisPool> nodes = new HashMap<String, JedisPool>(); private Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>(); public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { this.nodes.clear(); this.slots.clear(); String localNodes = jedis.clusterNodes(); for (String nodeInfo : localNodes.split("\n")) { ClusterNodeInformation clusterNodeInfo = nodeInfoParser.parse(nodeInfo, new HostAndPort( jedis.getClient().getHost(), jedis.getClient().getPort())); HostAndPort targetNode = clusterNodeInfo.getNode(); setNodeIfNotExist(targetNode); assignSlotsToNode(clusterNodeInfo.getAvailableSlots(), targetNode); } } finally { w.unlock(); } }
关键信息:String localNodes = jedis.clusterNodes();
调用jedis实例的clusterNodes,实际上在redis服务端执行了cluster nodes命令,执行结果如下:
2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 119.254.166.136:7031 myself,master - 0 0 1 connected 0-5460 6f7119b06bb3316119f0bed3f793c2ce87983566 119.254.166.136:7036 slave 6fd9a873b29f5e9a61756606ececa4a953a11db7 0 1479278388635 6 connected 1b42b9d25779ccd5555fb804d01ddfbdd20635bf 119.254.166.136:7032 master - 0 1479278389637 2 connected 5461-10922 e22f9b0d7cb3fdb932926a1b4d9c3140e70255eb 119.254.166.136:7034 slave 2a0ebb6d554fc8aa8a936bc0c0c2a6583425cf7e 0 1479278390639 4 connected cd480d1b437ad323ae3e15e548db8faf43a5d766 119.254.166.136:7035 slave 1b42b9d25779ccd5555fb804d01ddfbdd20635bf 0 1479278385630 5 connected 6fd9a873b29f5e9a61756606ececa4a953a11db7 119.254.166.136:7033 master - 0 1479278386632 3 connected 10923-16383
6、通过cluster nodes命令获取集群所有节点的信息字符串,然后进行解析封装,解析函数parse代码如下:
public static final int SLOT_INFORMATIONS_START_INDEX = 8; public static final int HOST_AND_PORT_INDEX = 1; public ClusterNodeInformation parse(String nodeInfo, HostAndPort current) { String[] nodeInfoPartArray = nodeInfo.split(" "); HostAndPort node = getHostAndPortFromNodeLine(nodeInfoPartArray, current); ClusterNodeInformation info = new ClusterNodeInformation(node); if (nodeInfoPartArray.length >= SLOT_INFORMATIONS_START_INDEX) { String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray); fillSlotInformation(slotInfoPartArray, info); } return info; }
关键信息:String[] slotInfoPartArray = extractSlotParts(nodeInfoPartArray);
获取slot槽分配的区间值,比如0-5460,具体获取代码如下:
private String[] extractSlotParts(String[] nodeInfoPartArray) { String[] slotInfoPartArray = new String[nodeInfoPartArray.length - SLOT_INFORMATIONS_START_INDEX]; for (int i = SLOT_INFORMATIONS_START_INDEX; i < nodeInfoPartArray.length; i++) { slotInfoPartArray[i - SLOT_INFORMATIONS_START_INDEX] = nodeInfoPartArray[i]; } return slotInfoPartArray; }
parse函数中另一个关键信息:fillSlotInformation(slotInfoPartArray, info);
fillSlotInformation函数调用了fillSlotInformationFromSlotRange函数,fillSlotInformationFromSlotRange函数作用是把所有的slot槽的index值存放到addAvailableSlot集合中,具体代码如下:
private void fillSlotInformation(String[] slotInfoPartArray, ClusterNodeInformation info) { for (String slotRange : slotInfoPartArray) { fillSlotInformationFromSlotRange(slotRange, info); } }
private void fillSlotInformationFromSlotRange(String slotRange, ClusterNodeInformation info) { if (slotRange.startsWith(SLOT_IN_TRANSITION_IDENTIFIER)) { // slot is in transition int slot = Integer.parseInt(slotRange.substring(1).split("-")[0]); if (slotRange.contains(SLOT_IMPORT_IDENTIFIER)) { // import info.addSlotBeingImported(slot); } else { // migrate (->-) info.addSlotBeingMigrated(slot); } } else if (slotRange.contains("-")) { // slot range String[] slotRangePart = slotRange.split("-"); for (int slot = Integer.valueOf(slotRangePart[0]); slot <= Integer.valueOf(slotRangePart[1]); slot++) { info.addAvailableSlot(slot); } } else { // single slot info.addAvailableSlot(Integer.valueOf(slotRange)); } }
7、初始化核心工作:把当前节点存放入nodes集合中,key为节点host:port,value为JedisPool实例;把slot槽index索引值与当前节点的JedisPool进行映射,存入Map
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) { w.lock(); try { JedisPool targetPool = nodes.get(getNodeKey(targetNode)); if (targetPool == null) { setNodeIfNotExist(targetNode); targetPool = nodes.get(getNodeKey(targetNode)); } for (Integer slot : targetSlots) { slots.put(slot, targetPool); } } finally { w.unlock(); } }
关键信息:slots.put(slot, targetPool);实现slot槽index索引值与当前节点的JedisPool进行映射
二、使用JedisCluster客户端存取值
1、还是看单元测试:@Autowired private JedisCluster jedisCluster; //验证高可用性 @Test public void testJedisCluster1() throws InterruptedException{ for(int i=0;i<100;i++){ try{ jedisCluster.set("name"+i, "hello world="+i); String val = jedisCluster.get("name"+i); System.out.println(val); //重连次数超过redis集群环境主从服务个数 }catch(JedisClusterMaxRedirectionsException m){ System.err.println("start Too many Cluster redirections?"); //Thread.sleep(2000); i =i-1; System.err.println("continue Too many Cluster redirections?"); continue; }catch(JedisClusterException jce){ //redis服务器的哨兵机制,还没有完成挂机master替换或者替换失败 //此时客户端请求从挂机的master中获取数据,所得到的结果就是the cluster is down System.out.println("start CLUSTERDOWN the cluster is down"); //Thread.sleep(2000); i =i-1; System.out.println("continue CLUSTERDOWN the cluster is down"); continue; } //修改该睡眠时间的大小,会影响抛出异常的类型, //较小值更容易抛出JedisClusterMaxRedirectionsException异常, //较小值更容易抛出JedisClusterException异常 //以上三句结论,缺乏充分验证,请勿轻信!!! Thread.sleep(1000); } }
2、jedisCluster.set():使用匿名内部类重写execute方法从redis服务器存取数据,调用run方法获取分配的slot槽,进而获取slot槽对应的JedisPool来生成jedis连接实例
@Override public String set(final String key, final String value) { return new JedisClusterCommand<String>(connectionHandler, maxRedirections) { @Override public String execute(Jedis connection) { return connection.set(key, value); } }.run(key); }
3、run(),调用runWithRetries()方法:
private T runWithRetries(String key, int redirections, boolean tryRandomNode, boolean asking) { //redirections 重试连接次数,与集群环境主从服务个数相同 if (redirections <= 0) { throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?"); } Jedis connection = null; try { if (asking) { // TODO: Pipeline asking with the original command to make it // faster.... connection = askConnection.get(); connection.asking(); // if asking success, reset asking flag asking = false; } else { //默认首次tryRandomNode=false if (tryRandomNode) { connection = connectionHandler.getConnection(); } else { connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key)); } } return execute(connection); } catch (JedisConnectionException jce) { if (tryRandomNode) { // maybe all connection is down throw jce; } releaseConnection(connection, true); connection = null; // retry with random connection //回调,重试连接 return runWithRetries(key, redirections - 1, true, asking); } catch (JedisRedirectionException jre) { if (jre instanceof JedisAskDataException) { asking = true; askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode())); } else if (jre instanceof JedisMovedDataException) { // it rebuilds cluster's slot cache // recommended by Redis cluster specification this.connectionHandler.renewSlotCache(); } else { throw new JedisClusterException(jre); } releaseConnection(connection, false); connection = null; //回调,重试连接 return runWithRetries(key, redirections - 1, false, asking); } finally { releaseConnection(connection, false); } }
关键信息解析:redirections参数
重试连接次数,与配置的redis集群环境主从服务个数相同,如果集群中一个master挂机,并且哨兵机制还没有发现该master挂机,客户端程序发送请求,出现无法连接的情况,之后会不断重试连接,超过最大集群服务个数,则抛出
JedisClusterMaxRedirectionsException异常
关键信息解析:tryRandomNode参数
是否重试随机连接,首次为false,如果出现master挂机无法连接,执行重试,该参数变为true。
关键信息解析:connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
1)、通过JedisCluster的CRC16算法,对当前的key进行计算,获取一个整形值,区间为16384 - 1,redis集群分配的所有slot数
2)、通过获取的整形值,从集群初始化的slots集合中获取对应的JedisPool,进而获取Jedis连接实例
=====20170622更新=====
关键信息解析:两个catch语句块
两个catch语句块,实现了master挂机时,客户端重连的机制,这也是JedisCluster实现集群高可用的关键。
但是,如果master挂机,客户端程序执行重连,redis的哨兵机制还没有发现该master挂机或者发现但是没有完成挂机master替换或者替换失败,最终还是会抛出
JedisClusterMaxRedirectionsException或者
JedisClusterException异常,客户端程序需要对此情况进行额外处理,延迟并增加重试次数,否则造成部分情况失败,额外处理方式,可以参考单元测试
=====20170622更新=====
总结:
程序启动初始化集群环境:
1)、读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例
2)、用获取的redis连接实例执行clusterNodes()方法,实际执行redis服务端cluster nodes命令,获取主从配置信息
3)、解析主从配置信息,先把所有节点存放到nodes的map集合中,key为节点的ip:port,value为当前节点的jedisPool
4)、解析主节点分配的slots区间段,把slot对应的索引值作为key,第三步中拿到的jedisPool作为value,存储在slots的map集合中
综上,就实现了slot槽索引值与jedisPool的映射,这个jedisPool包含了master的节点信息,所以槽和几点是对应的,与redis服务端一致
从集群环境存取值:
1)、把key作为参数,执行CRC16算法,获取key对应的slot值
2)、通过该slot值,去slots的map集合中获取jedisPool实例
3)、通过jedisPool实例获取jedis实例,最终完成redis数据存取工作
jediscluster并不能实现客户端程序高可用:上述初始化和存取值的过程,如果客户端程序运行过程中,某一个master挂了,redis服务端的sentinel哨兵执行了主从替换,但是程序还是必然报错;
因为集群节点初始化是程序启动时执行的,并且在程序执行过程中只初始化一次;后续服务端master变更,客户端程序初始化的slots与master对应关系并没有同步实现变更;
存取值过程,依据slot索引拿到jedisPool实例,其包含的redis连接还是原来的也就是挂掉的ip:port,所以集群配置并没有让客户端程序实现高可用,只是实现了分布式功能。
只有重启服务重新初始化新的集群环境,程序方可正常运行。
===20170622更新======
上述蓝色字体结论部分有错误,经过重新验证,得出一下结论:
jediscluster可以实现客户端程序高可用,当出现master挂机,客户端程序通过catch语句块形式进行回调,完成重连操作,实现高可用机制;
但是,redis集群环境是通过哨兵机制进行监控的,要考虑的哨兵机制的延迟性(替换挂机的master),客户端程序需要对因哨兵机制的延迟造成的影响进行处理(比如增加延迟重试、对指定jedisCluster异常进行捕获后再重试等),充分保证客户端程序高可用
相关文章推荐
- JedisCluster源码解读:集群初始化、slot(槽)的分配、值的存取
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(四)——redis高级应用(集群搭建、集群分区原理、集群操作)
- 分布式缓存技术redis学习系列(二)——详细讲解redis数据结构(内存模型)以及常用命令
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(三)——redis高级应用(主从、事务与锁、持久化)
- 分布式缓存技术redis学习系列(一)——redis简介以及linux上的安装
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列(九)——Redis主从实现读写分离
- 分布式缓存技术redis学习系列----深入理解Spring Redis的使用
- 分布式缓存技术redis学习系列(二)——详细讲解redis数据结构(内存模型)以及常用命令
- 分布式缓存技术redis学习系列(五)——redis实战(redis与spring整合,分布式锁实现)
- 分布式缓存技术redis学习系列(二)——详细讲解redis数据结构(内存模型)以及常用命令
- 分布式缓存技术redis学习系列(三)——redis高级应用(主从、事务与锁、持久化)
- 分布式缓存技术redis学习系列(二)——详细讲解redis数据结构(内存模型)以及常用命令
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析