您的位置:首页 > 数据库 > Redis

使用spring-data-redis兼容redis单机和集群操作

2018-05-18 11:13 741 查看

一、简介

有时候我们部署了集群版的redis,还希望开发的时候使用单机版(毕竟省事儿),但是比较常用的Java客户端(Jedis和Lettuce)的单机和集群api是不统一的。经过调研,发现spring-data-redis可以实现这个需求,本文就是将spring-data-redis和Jedis整合,来统一操作的API。

本文假设你已熟悉spring-data-redis和Jedis的基本使用,这些教程Google一下一大堆的。

二、配置

首先引入Maven依赖,注意1.8.12版本要求spring 4.3.17以上

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.8.12.RELEASE</version>
</dependency>

spring-data-redis配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="50"/>
<property name="maxTotal" value="100"/>
<property name="maxWaitMillis" value="20000"/>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate">
<constructor-arg name="connectionFactory" ref="jedisConnectionFactory"/>
</bean>

<!-- 单机配置 -->
<!--<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">-->
<!--<property name="hostName" value="127.0.0.1"/>-->
<!--<property name="port" value="6379"/>-->
<!--<property name="timeout" value="20000"/>-->
<!--<property name="poolConfig" ref="jedisPoolConfig" />-->
<!--</bean>-->

<!-- 集群配置 -->
<bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration">
<property name="maxRedirects" value="3"/>
<property name="clusterNodes">
<set>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6481"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6482"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6483"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6484"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6485"/>
</bean>
<bean class="org.springframework.data.redis.connection.RedisClusterNode">
<constructor-arg name="host" value="127.0.0.1"/>
<constructor-arg name="port" value="6486"/>
</bean>
</set>
</property>
</bean>

<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<constructor-arg name="clusterConfig" ref="redisClusterConfiguration" />
<constructor-arg name="poolConfig" ref="jedisPoolConfig" />
<property name="timeout" value="20000"/>
</bean>
</beans>

这里由集群切换为单机只要注释掉集群配置,打开单机配置即可。

三、代码使用

上述配置中声明了一个redisTemplate的Bean,我们直接将其注入到Service中直接使用即可。这里主要说下批量操作的问题:由于Redis集群是将所有的key使用Hash分布到了16384个槽(slot,下同)中,所以如果你使用批量的操作会报错“CROSSSLOT Keys in request don't hash to the same slot”。我们项目中的批量操作只有批量删除操作,这里就说一下Redis集群的批量删除应该怎么搞。直接上代码:

private static final String REDIS_CURSOR_START = "0";

/**
* 模糊删除
* @param keyPattern :形如"MESSAGE_*",将删除所有MESSAGE_开头的key的数据
*/
protected void deleteByPattern(String keyPattern) {
RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
if(redisConnection instanceof RedisClusterConnection) {
//集群模式
RedisClusterConnection clusterConnection = (RedisClusterConnection) redisConnection;
Set<RedisClusterNode> clusterNodes = clusterConnection.clusterGetMasterSlaveMap().keySet();
//首先查出所有的key
Set<byte[]> allKeys = new HashSet<>();
for(RedisClusterNode redisClusterNode : clusterNodes) {
allKeys.addAll(clusterConnection.keys(redisClusterNode, keyPattern.getBytes(StandardCharsets.UTF_8)));
}
//将每个key和slot相对应
Map<Integer, List<byte[]>> keySlotMap = new HashMap<>();
for(byte[] key : allKeys) {
int slot = JedisClusterCRC16.getSlot(key);
if(keySlotMap.containsKey(slot)) {
keySlotMap.get(slot).add(key);
} else {
keySlotMap.put(slot, Lists.newArrayList(key));
}
}
//将key按slot删除
for(List<byte[]> delKeys : keySlotMap.values()) {
clusterConnection.del(delKeys.toArray(new byte[][]{}));
}
} else {
//单机模式
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Set<String> keys = this.keysInSingleNode(keyPattern, jedis);
if(CollectionUtils.isEmpty(keys) == false) {
jedis.del(keys.toArray(new String[]{}));
}
}
}

/**
* 取单个Redis节点符合表达式的key
*/
private Set<String> keysInSingleNode(String keyPattern, Jedis jedis) {
String cursor = REDIS_CURSOR_START;
ScanParams params = new ScanParams().match(keyPattern).count(10000);
Set<String> keys = new HashSet<>();
do {
ScanResult<String> scanResult = jedis.scan(cursor, params);
keys.addAll(scanResult.getResult());
cursor = scanResult.getStringCursor();
} while(!REDIS_CURSOR_START.equals(cursor));
return keys;
}

这里有一个问题,就是keys这个操作是比较危险的,尤其是在redis中数据量很大的情况,keys会导致redis的线程阻塞,由于redis是单线程模型,所以阻塞时系统所有的操作都会卡死。这里用keys是因为比较清晰一点,下边我们优化一下:

protected void deleteByPattern(String keyPattern) {
RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
if(redisConnection instanceof RedisClusterConnection) {
//集群模式
RedisClusterConnection clusterConnection = (RedisClusterConnection) redisConnection;
JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
Map<Integer, List<String>> keySlotMap = new HashMap<>();
for (JedisPool jedisPool : clusterNodes.values()) {
try(Jedis jedis = jedisPool.getResource()) {
//只取master节点的数据
if (!jedis.info("replication").contains("role:slave")) {
Set<String> keys = this.keysInSingleNode(keyPattern, jedis);
if (!CollectionUtils.isEmpty(keys)) {
for (String key : keys) {
// cluster模式执行多key操作的时候,这些key必须在同一个slot上,
int slot = JedisClusterCRC16.getSlot(key);
// 按slot将key分组,相同slot的key一起提交
if (keySlotMap.containsKey(slot)) {
keySlotMap.get(slot).add(key);
} else {
keySlotMap.put(slot, Lists.newArrayList(key));
}
}
}
}
}
}
for (List<String> delKeys : keySlotMap.values()) {
redisTemplate.delete(delKeys);
}
}
}

可能有人看到这里有点懵逼了,不是说单机和集群操作统一吗?你这不还是分成了一个if-else?

是的,redisTemplate对于单个key的操作确实是统一了的,但是对于批量操作redisCluster原生其实是不支持的,redisTemplate自然也就无法统一了,只能由我们自己实现。

最后插一句:redisson也是单机和集群api统一的,有兴趣的同学可以研究一下
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息