您的位置:首页 > 数据库 > Redis

redis cluster 3.0

2016-11-18 14:49 183 查看

redis cluster 集群

本文内容从redis官方文档整理而来,简要概述redis cluster的优点和缺点

redis cluster需要3.0版本的支持,详细说明参照下面文档

集群指南:http://redis.io/topics/cluster-tutorial

集群规范:http://redis.io/topics/cluster-spec

redis cluster 集群
redis cluster 30 架构概述

集群环境搭建

java客户端支持

redis cluster集群环境 项目中的具体实现

Spring-data-redis

redis cluster 3.0 架构概述



上图是redis cluster的整体架构

1.分布式

从redis cluster的架构图中可以看出采用分布式架构

键空间被分割为 16384 槽(slot),事实上集群的最大节点数量是 16384 个

然而建议最大节点数量设置在1000这个数量级上,所有的主节点都负责 16384 个哈希槽中的一部分

集群中的每个节点负责处理一部分哈希槽

举个例子, 一个集群可以有三个哈希槽, 其中:

- 节点 A 负责处理 0 号至 5500 号哈希槽
- 节点 B 负责处理 5501 号至 11000 号哈希槽
- 节点 C 负责处理 11001 号至 16384 号哈希槽


这样的好处是便于水平拓展集群环境,如果添加一个新的节点,直接添加就可以了

如果要删除一个节点只需要将该节点中的所有哈希槽移动到其他节点即可

简而言之:

redis cluster自动分割数据到不同的节点上,并非所有节点都存储了全部的数据

整个节点部分节点挂掉不会导致整个集群环境宕机,保证了一定程度上的可用性

2.主从复制

对于集群环境而言,容灾是必须要考虑的问题

为了使在部分节点失败或者大部分节点无法通信的情况下集群仍然可用

集群使用了主从复制模型,每个节点都会有N-1个复制品

如果主节点挂掉,redis cluster会从子节点中选举一个作为新的master

而redis cluster实现了这种模式,举例说明:

- 节点 A 负责处理 0 号至 5500 号哈希槽
- 节点 B 负责处理 5501 号至 11000 号哈希槽
- 节点 C 负责处理 11001 号至 16384 号哈希槽


假设有A,B,C三个节点的集群,在没有复制模型的情况下,如果节点B失败了,那么整个集群就会以为缺少5501-11000这个范围的槽而不可用.

然而如果在集群创建的时候(或者过一段时间)我们为每个节点添加一个从节点A1,B1,C1,那么整个集群便有三个master节点和三个slave节点组成,这样在节点B失败后,集群便会选举B1为新的主节点继续服务,整个集群便不会因为槽找不到而不可用了

3.异步复写

这一点是对主从复制的补充,redis cluster并不能保证数据的强一致性

在生产环境中特定的条件下可能会丢失写操作,导致这一现象的本质原因是redis cluster采用异步复制,异步写入

整个执行过程如下:

1.客户端向主节点B写入一条命令.

2.主节点B向客户端回复命令状态.

3.主节点将写操作复制给他的从节点 B1, B2 和 B3

主节点对命令的复制工作发生在返回命令回复之后,即第二部之后

因为如果每次处理命令请求都需要等待复制操作完成的话, 那么主节点处理命令请求的速度将极大地降低

由此可见redis为了保证性能,在性能和一致性之间做了一定的权衡(也许redis后续版本会支持同步复写)

对于上述情况,我个人对可能会丢失写操作的理解如下:

假设有A B C A1 B1 C1 留个节点ABC为master节点,A1B1C1分别为这三个master节点的slave节点

假设集群环境中存在网络分区,一方包含节点A 、C、A1、B1、C1 ,另一方包含节点 B 和客户端

由于网络分区之间的通信问题,客户端能正常向B写入数据

如果网络分区发生时间较短,这个操作能正常执行,如果分区的时间足够让大部分的另一方将B1选举为新的master,那么客户端写入B中得数据便丢失了

生产环境应当尽量避免网络分区带来的种种问题,据我所知RabbitMQ集群的网络分区容错性也并不是非常高

4.架构细节

在 Redis 集群中节点并不是把命令转发到管理所给出的键值的正确节点上:

1.所有的redis节点彼此互联(PING-PONG机制),内部使用二进制协议优化传输速度和带宽.

2.节点的fail是通过集群中超过半数的节点检测失效时才生效,此时将会判定整个集群不可用

3.客户端与redis节点直连,不需要中间proxy层.客户端不需要连接集群所有节点,连接集群中任何一个 可用节点即可

4.redis-cluster把所有的物理节点映射到[0-16383]slot上,cluster 负责维护node<->slot<->value,官方推荐节点数量为1000个(实际中能用到这么多的有几个)

5.数据读取表现

1.redis cluster把客户端重定向到服务一定范围内的键值的节点上,

2.如果这个节点没有客户端需要的数据,将重定向其他节点,直到节点查询完毕为止,或者查询到所需数据就提前终止

3.客户端获得一份最新的集群表示,里面有写着哪些节点服务哪些键值子集,所以在正常操作中客户端是直接联系到对应的节点并把给定的命令发过去

6.功能实现

1.redis cluster实现了所有在非分布式 redis 版本中出现的处理单一键值的命令

2.多个键值的复杂操作未实现, 比如 set 里的并集(unions)和交集(intersections)操作

因为执行这些复杂操作命令需要在多个redis 节点之间移动数据, 并且在高负载的情况下

这些命令将降低 Redis 集群的性能, 并导致不可预测的行为,又一次为了性能而舍弃了

集群环境搭建

快速在linux环境下搭建redis cluster集群

准备6个redis节点,3从三主,对应的redis节点的ip和端口对应关系如下

127.0.0.1:7000
127.0.0.1:7001
127.0.0.1:7002
127.0.0.1:7003
127.0.0.1:7004
127.0.0.1:7005


下载最新版redis

wget http://download.redis.io/releases/redis-3.0.0.tar.gz[/code] 
解压并安装

tar xf redis-3.0.0.tar.gz
cd redis-3.0.0
make && make install


创建存放多个实例的目录

mkdir /data/cluster -p
cd /data/cluster
mkdir 7000 7001 7002 7003 7004 7005


修改配置文件

cp redis-3.0.0/redis.conf /data/cluster/7000/


修改配置文件中的下面参数,修改完成后,把修改完成的redis.conf复制到7001-7005目录下,并且端口修改成和文件夹对应

#修改为对应端口
port 7000
#是否在后台运行
daemonize yes
#是否启用cluster
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000
appendonly yes


分别启动实例

cd /data/cluster/7000
redis-server redis.conf
cd /data/cluster/7001
redis-server redis.conf
cd /data/cluster/7002
redis-server redis.conf
cd /data/cluster/7003
redis-server redis.conf
cd /data/cluster/7004
redis-server redis.conf
cd /data/cluster/7005
redis-server redis.conf


创建集群前先安装依赖

首先安装依赖,否则创建集群失败,这个用的是ruby脚本

yum install ruby rubygems -y


安装gem-redis

下载地址:https://rubygems.org/gems/redis/versions/3.0.0

gem install -l redis-3.0.0.gem


复制集群管理程序到/usr/local/bin

cp redis-3.0.0/src/redis-trib.rb /usr/local/bin/redis-trib


创建集群

redis-trib create --replicas 1 127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005


命令的意义如下:

给定 redis-trib.rb 程序的命令是 create , 表示创建一个新的集群

选项 –replicas 1 表示我们希望为集群中的每个主节点创建一个从节点

之后跟着的其他参数则是实例的地址列表, 使用这些地址所指示的实例来创建新集群

简单来说, 以上命令的意思就是让 redis-trib 程序创建一个包含三个主节点和三个从节点的集群

java客户端支持

这里只介绍java版本的客户端支持,其他版本的可以从http://redis.io/ 客户端页面(Clients)查看

Jedis

对redis的实现比较好,稳定,有一定的用户量

项目:https://github.com/xetorthio/jedis

维基:https://github.com/xetorthio/jedis/wiki

Jedis文档中关于jedis-cluster的使用:

Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
//Jedis Cluster will attempt to discover cluster nodes automatically
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7379));
JedisCluster jc = new JedisCluster(jedisClusterNodes);
jc.set("foo", "bar");
String value = jc.get("foo");


下面介绍如何与spring集成:

添加如下依赖到项目中

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>


添加redis.cluster.properties文件到项目中,内容如下,其中填写

node1=192.168.1.110:6379
node2=192.168.1.110:6380
node3=192.168.1.110:6381
node4=192.168.1.110:6382
node5=192.168.1.110:6383
node6=192.168.1.110:6384


自定义JedisClusterFactory类实现FactoryBean用于和spring集成

import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean{

private Resource addressConfig;
private String addressKeyPrefix ;

private JedisCluster jedisCluster;
private Integer timeout;
private Integer maxRedirections;
private GenericObjectPoolConfig genericObjectPoolConfig;

private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$");

@Override
public JedisCluster getObject() throws Exception {
return jedisCluster;
}

@Override
public Class<? extends JedisCluster> getObjectType() {
return (this.jedisCluster != null ? this.jedisCluster.getClass() : JedisCluster.class);
}

@Override
public boolean isSingleton() {
return true;
}

private Set<HostAndPort> parseHostAndPort() throws Exception {
try {
Properties prop = new Properties();
prop.load(this.addressConfig.getInputStream());

Set<HostAndPort> haps = new HashSet<HostAndPort>();
for (Object key : prop.keySet()) {

if (!((String) key).startsWith(addressKeyPrefix)) {
continue;
}

String val = (String) prop.get(key);

boolean isIpPort = p.matcher(val).matches();

if (!isIpPort) {
throw new IllegalArgumentException("illegal ip or port");
}
String[] ipAndPort = val.split(":");

HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
haps.add(hap);
}

return haps;
} catch (IllegalArgumentException ex) {
throw ex;
} catch (Exception ex) {
throw new Exception("analysis jedis cluster configuration failed", ex);
}
}

@Override
public void afterPropertiesSet() throws Exception {
Set<HostAndPort> haps = this.parseHostAndPort();

jedisCluster = new JedisCluster(haps, timeout, maxRedirections,genericObjectPoolConfig);

}
public void setAddressConfig(Resource addressConfig) {
this.addressConfig = addressConfig;
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

public void setMaxRedirections(int maxRedirections) {
this.maxRedirections = maxRedirections;
}

public void setAddressKeyPrefix(String addressKeyPrefix) {
this.addressKeyPrefix = addressKeyPrefix;
}

public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) {
this.genericObjectPoolConfig = genericObjectPoolConfig;
}

}


添加如下spring配置,连接池的配置参数根据生产环境需要修改

<bean name="genericObjectPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig" >
<property name="maxWaitMillis" value="-1" />
<property name="maxTotal" value="1000" />
<property name="minIdle" value="8" />
<property name="maxIdle" value="100" />
</bean>
<bean id="jedisCluster" class="项目包名.JedisClusterFactory">
<property name="addressConfig">
<value>classpath:redis.cluster.properties</value>
</property>
<property name="addressKeyPrefix" value="node" />
<property name="timeout" value="300000" />
<property name="maxRedirections" value="6" />
<property name="genericObjectPoolConfig" ref="genericObjectPoolConfig" />
</bean>


最后在代码中注入JedisCluster即可

@Resource
JedisCluster jedisCluster;


redis cluster集群环境 项目中的具体实现

一,和spring的集成

<!--集成redis集群-->
<bean name="genericObjectPoolConfig" class="org.apache.commons.pool2.impl.GenericObjectPoolConfig">
<property name="maxWaitMillis" value="${redis.maxWaitMillis}"/>
<property name="maxTotal" value="${redis.maxTotal}"/>
<property name="minIdle" value="${redis.minIdle}"/>
<property name="maxIdle" value="${redis.maxIdle}"/>
</bean>
<bean id="jedisCluster" class="com.common.redis.factory.JedisClusterFactory">
<property name="addressConfig" value="classpath:redis.cluster.properties"/>
<property name="addressKeyPrefix" value="${redis.addressKeyPrefix}"/>
<property name="socketTimeout" value="${redis.socketTimeout}"/>
<property name="connectionTimeout" value="${redis.connectionTimeout}"/>
<property name="maxAttempts" value="${redis.maxAttempts}"/>
<property name="genericObjectPoolConfig" ref="genericObjectPoolConfig"/>
</bean>


二,创建两个properties文件 redis.cluster.properties和redis.properties

redis.cluster.properties

node1=192.168.0.133:7000
node2=192.168.0.133:7001
node3=192.168.0.133:7002
node4=192.168.0.135:7003
node5=192.168.0.135:7004
node6=192.168.0.135:7005


redis.properties

redis.host=192.168.0.199
redis.port=6379
redis.expire=1800
redis.maxIdle=300
redis.minIdle=30
redis.maxTotal=600
redis.maxAttempts=6
redis.socketTimeout=300
redis.connectionTimeout=300000
redis.addressKeyPrefix=node
redis.maxWaitMillis=-1


定义类JedisClusterFactory

package com.common.redis.factory;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;

/**
* @author Sirius_KP
*         customized jedis cluster factory for spring
*         1.do host and ip check
*         2.set pool config
*         3.connectionTimeout and  maxAttempts property
*/
public class JedisClusterFactory implements FactoryBean<JedisCluster>, InitializingBean {
private Pattern p = Pattern.compile("^.+[:]\\d{1,5}\\s*$");
private JedisCluster jedisCluster;
private Resource addressConfig;
private String addressKeyPrefix;
private Integer connectionTimeout;
private Integer socketTimeout;
private Integer maxAttempts;
private String password;
private GenericObjectPoolConfig genericObjectPoolConfig;

@Override
public JedisCluster getObject() throws Exception {
return jedisCluster;
}

@Override
public Class<?> getObjectType() {
return jedisCluster != null ? jedisCluster.getClass() : JedisCluster.class;
}

@Override
public boolean isSingleton() {
return true;
}

/**
* after properties set,create an jedis cluster instance
*
* @throws Exception ex
*/
@Override
public void afterPropertiesSet() throws Exception {
jedisCluster = new JedisCluster(parseHostAndPort(), connectionTimeout, socketTimeout, maxAttempts, password, genericObjectPoolConfig);
}

/**
* parse host and port,it will return a set collection
*
* @return a set of hosts and ports
* @throws Exception if has illegal host and port
*/
private Set<HostAndPort> parseHostAndPort() throws Exception {
try {
Properties properties = new Properties();
//load host and port configuration
properties.load(this.addressConfig.getInputStream());
Set<HostAndPort> haps = new HashSet<>();
for (Object key : properties.keySet()) {
String keyStr = (String) key;
if (!keyStr.startsWith(addressKeyPrefix)) {
continue;
}
String val = properties.getProperty(keyStr);
boolean isIpPort = p.matcher(val).matches();
if (!isIpPort) {
throw new IllegalArgumentException("illegal ip or port");
}
String[] ipAndPort = val.split(":");
HostAndPort hap = new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
haps.add(hap);
}
return haps;
} catch (IllegalArgumentException ex) {
throw ex;
} catch (Exception ex) {
throw new Exception("analysis jedis cluster configuration failed", ex);
}
}

public void setAddressConfig(Resource addressConfig) {
this.addressConfig = addressConfig;
}

public void setAddressKeyPrefix(String addressKeyPrefix) {
this.addressKeyPrefix = addressKeyPrefix;
}

public void setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}

public void setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
}

public void setMaxAttempts(Integer maxAttempts) {
this.maxAttempts = maxAttempts;
}

public void setPassword(String password) {
this.password = password;
}

public void setGenericObjectPoolConfig(GenericObjectPoolConfig genericObjectPoolConfig) {
this.genericObjectPoolConfig = genericObjectPoolConfig;
}
}


定义接口

package com.common.redis.handler;

/**
* Any redis operation will through execute method
* logger will append at here
*
* @author Sirius_KP
*/
public interface GenericHandler {
<T> T execute(RedisCallback<T> redisCallback);
}


package com.common.redis.handler;

import redis.clients.jedis.JedisCluster;

/**
* common call back interface
*
* @author Sirius_KP
*/
public interface RedisCallback<T> {
/**
* redis cluster execute
*
* @return object value ,see redis command document
*/
T call(JedisCluster jedisCluster);
}


实现类

package com.common.redis.handler.support;

import com.common.redis.handler.GenericHandler;
import com.common.redis.handler.RedisCallback;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisCluster;

import javax.annotation.Resource;

/**
* Generic support module for jedis cluster.
* Any redis operation will through execute method.
* Log error in this method
*
* @author Sirius_KP
*/
@Service
public class GenericHandlerSupport implements GenericHandler {
@Resource
private JedisCluster jedisCluster;

@Override
public <T> T execute(RedisCallback<T> redisCallback) {
try {
return redisCallback.call(jedisCluster);
} catch (Exception ex) {
ex.printStackTrace();
}
return null;
}
}


好了现在就可以定义自己要使用的key了,这个对于不同的key类型可以自己去封装这里简单介绍一种

定义一个用于子类共用的Key类所有的其它类型的key的操作可以基于这个扩展

package com.common.redis.handler;

import java.util.Set;

/**
* Any data type in redis are stored as key-value pairs,so this is a common module for any data type.
* This interface handle most key operation.
* U can diy operation method in this class.
*
* @author Sirius_KP
*/
public interface KeyHandler {
/**
* del keys in redis
*
* @param keys single key or multi keys
* @return a long type value of deleted keys
*/
Long del(String... keys);

/**
* check key exists
*
* @param key key
* @return boolean
*/
Boolean exists(String key);

/**
* set expire time for a key,if a key is expired,it will be deleted in redis
*
* @param key     key
* @param seconds seconds
* @return 1: the timeout was set 0: the timeout was not set since
* the key already has an associated timeout(maybe happen under redis version 2.1.3)
*/
Long expire(String key, int seconds);

/**
* return key time remaining
*
* @param key key
* @return key time remaining
*/
Long ttl(String key);

/**
* get all keys in jedis cluster matches pattern
*
* @param pattern regex pattern,支持正则表达式 查全部传*
* @return key set
*/
Set<String> keys(String pattern);
}


实现

package com.common.redis.handler.support;

import com.common.redis.handler.GenericHandler;
import com.common.redis.handler.KeyHandler;
import com.common.redis.handler.RedisCallback;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;

import javax.annotation.Resource;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

/**
* key handler support
*
* @author Sirius_KP
*/
public abstract class KeyHandlerSupport implements KeyHandler {
@Resource
protected GenericHandler handler;

@Override
public Long del(final String... keys) {
return handler.execute(new RedisCallback<Long>() {
@Override
public Long call(JedisCluster jedisCluster) {
return jedisCluster.del(keys);
}
});
}

@Override
public Boolean exists(final String key) {
return handler.execute(new RedisCallback<Boolean>() {
@Override
public Boolean call(JedisCluster jedisCluster) {
return jedisCluster.exists(key);
}
});
}

@Override
public Long expire(final String key, final int seconds) {
return handler.execute(new RedisCallback<Long>() {
@Override
public Long call(JedisCluster jedisCluster) {
return jedisCluster.expire(key, seconds);
}
});
}

@Override
public Long ttl(final String key) {

return handler.execute(new RedisCallback<Long>() {

@Override
public Long call(JedisCluster jedisCluster) {
return jedisCluster.ttl(key);
}
});
}

@Override
public Set<String> keys(final String pattern) {
return handler.execute(new RedisCallback<Set<String>>() {
@Override
public Set<String> call(JedisCluster jedisCluster) {
Set<String> keys = new TreeSet<>();
Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
for (String node : nodes.keySet()) {
JedisPool jedisPool = nodes.get(node);
Jedis jedis = jedisPool.getResource();
try {
keys.addAll(jedis.keys(pattern));
} finally {
jedis.close();
}
}
return keys;
}
});
}
}


package com.common.redis.handler;

/**
* String data type operation interface,see more in redis document
*
* @author Sirius_KP
*/
public interface StringHandler extends KeyHandler {
String set(String key, String value);

String get(String key);
}


实现

package com.common.redis.handler.support;

import com.common.redis.handler.RedisCallback;
import com.common.redis.handler.StringHandler;
import org.springframework.stereotype.Service;
import redis.clients.jedis.JedisCluster;

/**
* String handler support
*
* @author Sirius_KP
*/
@Service
public class StringHandlerSupport extends KeyHandlerSupport implements StringHandler {
@Override
public String set(final String key, final String value) {
return handler.execute(new RedisCallback<String>() {
@Override
public String call(JedisCluster jedisCluster) {
return jedisCluster.set(key, value);
}
});
}

@Override
public String get(final String key) {
return handler.execute(new RedisCallback<String>() {
@Override
public String call(JedisCluster jedisCluster) {
return jedisCluster.get(key);
}
});
}
}


Spring-data-redis

Spring-data-redis spring子项目,支持redis3.0集群

下面简单的介绍一下Spring-data-redis的使用

文档:http://docs.spring.io/spring-data/redis/docs/1.7.4.RELEASE/reference/html/

在项目中添加以下依赖

<dependencies>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
</dependencies>


编码方式集成redis cluster

@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class ClusterConfigurationProperties {

/*
* spring.redis.cluster.nodes[0] = 127.0.0.1:7379
* spring.redis.cluster.nodes[1] = 127.0.0.1:7380
* ...
*/
List<String> nodes;

/**
* Get initial collection of known cluster nodes in format {@code host:port}.
*
* @return
*/
public List<String> getNodes() {
return nodes;
}

public void setNodes(List<String> nodes) {
this.nodes = nodes;
}
}

@Configuration
public class AppConfig {

/**
* Type safe representation of application.properties
*/
@Autowired ClusterConfigurationProperties clusterProperties;

public @Bean RedisConnectionFactory connectionFactory() {

return new JedisConnectionFactory(
new RedisClusterConfiguration(clusterProperties.getNodes()));
}
}


简单使用

RedisClusterConnection connection = connectionFactory.getClusterConnnection();
connection.set("foo", value);
connection.set("bar", value);
connection.keys("*");
connection.keys(NODE_7379, "*");
connection.keys(NODE_7380, "*");
connection.keys(NODE_7381, "*");
connection.keys(NODE_7382, "*");


使用redistemplate进行操作

ClusterOperations clusterOps = redisTemplate.opsForCluster();
clusterOps.shutdown(NODE_7379);


配置方式整合spring可以参考这篇文章:http://www.cnblogs.com/moonandstar08/p/5149585.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis 集群 文档