使用spring-data-redis兼容redis单机和集群操作
2018-05-18 11:13
741 查看
一、简介
有时候我们部署了集群版的redis,还希望开发的时候使用单机版(毕竟省事儿),但是比较常用的Java客户端(Jedis和Lettuce)的单机和集群api是不统一的。经过调研,发现spring-data-redis可以实现这个需求,本文就是将spring-data-redis和Jedis整合,来统一操作的API。本文假设你已熟悉spring-data-redis和Jedis的基本使用,这些教程Google一下一大堆的。
二、配置
首先引入Maven依赖,注意1.8.12版本要求spring 4.3.17以上<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.8.12.RELEASE</version> </dependency>
spring-data-redis配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="50"/> <property name="maxTotal" value="100"/> <property name="maxWaitMillis" value="20000"/> </bean> <bean id="redisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate"> <constructor-arg name="connectionFactory" ref="jedisConnectionFactory"/> </bean> <!-- 单机配置 --> <!--<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">--> <!--<property name="hostName" value="127.0.0.1"/>--> <!--<property name="port" value="6379"/>--> <!--<property name="timeout" value="20000"/>--> <!--<property name="poolConfig" ref="jedisPoolConfig" />--> <!--</bean>--> <!-- 集群配置 --> <bean id="redisClusterConfiguration" class="org.springframework.data.redis.connection.RedisClusterConfiguration"> <property name="maxRedirects" value="3"/> <property name="clusterNodes"> <set> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6481"/> </bean> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6482"/> </bean> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6483"/> </bean> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6484"/> </bean> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6485"/> </bean> <bean class="org.springframework.data.redis.connection.RedisClusterNode"> <constructor-arg name="host" value="127.0.0.1"/> <constructor-arg name="port" value="6486"/> </bean> </set> </property> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <constructor-arg name="clusterConfig" ref="redisClusterConfiguration" /> <constructor-arg name="poolConfig" ref="jedisPoolConfig" /> <property name="timeout" value="20000"/> </bean> </beans>
这里由集群切换为单机只要注释掉集群配置,打开单机配置即可。
三、代码使用
上述配置中声明了一个redisTemplate的Bean,我们直接将其注入到Service中直接使用即可。这里主要说下批量操作的问题:由于Redis集群是将所有的key使用Hash分布到了16384个槽(slot,下同)中,所以如果你使用批量的操作会报错“CROSSSLOT Keys in request don't hash to the same slot”。我们项目中的批量操作只有批量删除操作,这里就说一下Redis集群的批量删除应该怎么搞。直接上代码:private static final String REDIS_CURSOR_START = "0"; /** * 模糊删除 * @param keyPattern :形如"MESSAGE_*",将删除所有MESSAGE_开头的key的数据 */ protected void deleteByPattern(String keyPattern) { RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection(); if(redisConnection instanceof RedisClusterConnection) { //集群模式 RedisClusterConnection clusterConnection = (RedisClusterConnection) redisConnection; Set<RedisClusterNode> clusterNodes = clusterConnection.clusterGetMasterSlaveMap().keySet(); //首先查出所有的key Set<byte[]> allKeys = new HashSet<>(); for(RedisClusterNode redisClusterNode : clusterNodes) { allKeys.addAll(clusterConnection.keys(redisClusterNode, keyPattern.getBytes(StandardCharsets.UTF_8))); } //将每个key和slot相对应 Map<Integer, List<byte[]>> keySlotMap = new HashMap<>(); for(byte[] key : allKeys) { int slot = JedisClusterCRC16.getSlot(key); if(keySlotMap.containsKey(slot)) { keySlotMap.get(slot).add(key); } else { keySlotMap.put(slot, Lists.newArrayList(key)); } } //将key按slot删除 for(List<byte[]> delKeys : keySlotMap.values()) { clusterConnection.del(delKeys.toArray(new byte[][]{})); } } else { //单机模式 Jedis jedis = (Jedis) redisConnection.getNativeConnection(); Set<String> keys = this.keysInSingleNode(keyPattern, jedis); if(CollectionUtils.isEmpty(keys) == false) { jedis.del(keys.toArray(new String[]{})); } } } /** * 取单个Redis节点符合表达式的key */ private Set<String> keysInSingleNode(String keyPattern, Jedis jedis) { String cursor = REDIS_CURSOR_START; ScanParams params = new ScanParams().match(keyPattern).count(10000); Set<String> keys = new HashSet<>(); do { ScanResult<String> scanResult = jedis.scan(cursor, params); keys.addAll(scanResult.getResult()); cursor = scanResult.getStringCursor(); } while(!REDIS_CURSOR_START.equals(cursor)); return keys; }
这里有一个问题,就是keys这个操作是比较危险的,尤其是在redis中数据量很大的情况,keys会导致redis的线程阻塞,由于redis是单线程模型,所以阻塞时系统所有的操作都会卡死。这里用keys是因为比较清晰一点,下边我们优化一下:
protected void deleteByPattern(String keyPattern) { RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection(); if(redisConnection instanceof RedisClusterConnection) { //集群模式 RedisClusterConnection clusterConnection = (RedisClusterConnection) redisConnection; JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection(); Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes(); Map<Integer, List<String>> keySlotMap = new HashMap<>(); for (JedisPool jedisPool : clusterNodes.values()) { try(Jedis jedis = jedisPool.getResource()) { //只取master节点的数据 if (!jedis.info("replication").contains("role:slave")) { Set<String> keys = this.keysInSingleNode(keyPattern, jedis); if (!CollectionUtils.isEmpty(keys)) { for (String key : keys) { // cluster模式执行多key操作的时候,这些key必须在同一个slot上, int slot = JedisClusterCRC16.getSlot(key); // 按slot将key分组,相同slot的key一起提交 if (keySlotMap.containsKey(slot)) { keySlotMap.get(slot).add(key); } else { keySlotMap.put(slot, Lists.newArrayList(key)); } } } } } } for (List<String> delKeys : keySlotMap.values()) { redisTemplate.delete(delKeys); } } }
可能有人看到这里有点懵逼了,不是说单机和集群操作统一吗?你这不还是分成了一个if-else?
是的,redisTemplate对于单个key的操作确实是统一了的,但是对于批量操作redisCluster原生其实是不支持的,redisTemplate自然也就无法统一了,只能由我们自己实现。
最后插一句:redisson也是单机和集群api统一的,有兴趣的同学可以研究一下
相关文章推荐
- Spring Data Redis 操作 redis 集群 --client
- 使用spring-data-redis操作redis
- spring data redis 操作redis 单机版和集群
- 使用spring-data-redis配置集群错误
- Java使用jedis操作Redis单机和集群
- 使用Spring-data-redis操作Redis的Sentinel
- java代码中操作Redis:单机redis、集群redis(spring+redis集成)
- 使用spring-data-redis操作Redis的Sentinel
- Spring集成Jedis(不依赖spring-data-redis)(单机/集群模式)(待实践)
- 使用Spring-data-redis操作Redis的Sentinel
- 使用Spring-data-redis操作Redis的Sentinel
- spring data redis 集群(sentinel实现)和simple spring memcached分布式初使用
- spring-data-redis操作redis集群
- 使用Spring-data-redis操作Redis的Sentinel
- 使用Spring-data进行Redis操作
- Redis 一二事 - 在spring中使用jedis 连接调试单机redis以及集群redis
- Spring中使用RedisTemplate操作Redis(spring-data-redis)
- 使用spring-data-redis进行对redis的操作,封装的一些操作方法
- Spring集成Jedis(不依赖spring-data-redis)(单机/集群模式)(待实践)
- spring data redis 操作redis 单机版和集群