[Redis]RedisCluster使用pipeline批量读取数据
2017-12-03 21:38
190 查看
因为公司的Redis是Cluster的,一共有三台主机,有个需求是同时读取三千条数据,因为是Cluster,所以没办法直接使用mget。
目前在网上找到的方法有两种,一种是写入时处理,一种是读取时处理。
先说一下RedisCluster针对每条数据的写入的方法。RedisCluster一共有16384个slot(槽位),cluster每一个节点分管一部分slot。当有数据写入时,当用户put或者是get一个数据的时候,首先会查找这个数据对应的槽位是多少,然后查找对应的节点,然后才把数据放入这个节点。查找这个数据对应的槽位的方法就是对数据的key取模,即 CRC16(key) mod 16384,得到的结果就是写入数据存放的slot位置。
但是当要对rediscluster进行mget批量读取的时候,因为是有不同的节点,每个节点管理一部分slot,将导致对存放在不同slot上的数据读取失败。
如果还想要使用批量读取应该怎么办呢?目前了解到的解决方法有两种,一种是使用hash_tag模式读写。简单说就是使用”{}”来将要hash的key的部分包裹起来,rediscluster写入数据时只会对key中被”{}”包裹部分进行哈希取模计算slot位置。即存入时使用 “a{123}”和”b{123}”是在同一个slot上。这样就可以批量读取存放在同一个slot上的数据。
第二种方法是在批量读取时,先计算所有数据的存放节点。具体做法是,我们已经知道了rediscluster对数据哈希取模的算法,可以先计算数据存放的slot位置,然后我们又可以很容易知道每个节点分管的slot段。这样,我们就可以通过key来计算出数据存放在哪个节点上。然后根据不同的节点将数据分成多批。对不同批的数据进行分批pipeline处理。
核心代码如下:
这是从我项目文件里直接复制出来的,有部分是冗余的,核心部分是getBatch、initJedisNodeMap、getJedisByKey、initSlotHostMap 这几个方法,其中getBatch是主入口
目前在网上找到的方法有两种,一种是写入时处理,一种是读取时处理。
先说一下RedisCluster针对每条数据的写入的方法。RedisCluster一共有16384个slot(槽位),cluster每一个节点分管一部分slot。当有数据写入时,当用户put或者是get一个数据的时候,首先会查找这个数据对应的槽位是多少,然后查找对应的节点,然后才把数据放入这个节点。查找这个数据对应的槽位的方法就是对数据的key取模,即 CRC16(key) mod 16384,得到的结果就是写入数据存放的slot位置。
但是当要对rediscluster进行mget批量读取的时候,因为是有不同的节点,每个节点管理一部分slot,将导致对存放在不同slot上的数据读取失败。
如果还想要使用批量读取应该怎么办呢?目前了解到的解决方法有两种,一种是使用hash_tag模式读写。简单说就是使用”{}”来将要hash的key的部分包裹起来,rediscluster写入数据时只会对key中被”{}”包裹部分进行哈希取模计算slot位置。即存入时使用 “a{123}”和”b{123}”是在同一个slot上。这样就可以批量读取存放在同一个slot上的数据。
第二种方法是在批量读取时,先计算所有数据的存放节点。具体做法是,我们已经知道了rediscluster对数据哈希取模的算法,可以先计算数据存放的slot位置,然后我们又可以很容易知道每个节点分管的slot段。这样,我们就可以通过key来计算出数据存放在哪个节点上。然后根据不同的节点将数据分成多批。对不同批的数据进行分批pipeline处理。
核心代码如下:
/** * JedisPool和Keys的映射关系 */ private Map<JedisPool, ArrayList<String>> jedisPoolKeysMap; /** * 批量查询数据 * * @param keys * c832 @return */ public Map<String, Object> getBatch(String... keys) { // 返回的结果,包括正确的keys的value的集合;和不存在的keys的集合 Map<String, Object> result = new HashMap<>(16); // 正确的keys的value的集合 Map<String, Map<String, Double>> existResult = new HashMap<>(16); // 错误的key的集合 Set<String> errorKeys = new HashSet<>(16); // JedisPool和Keys的映射关系 jedisPoolKeysMap = new HashMap<JedisPool, ArrayList<String>>(); for (String key : keys) { JedisPool jedisPool = getJedisByKey(key); if (jedisPool == null){ continue; } if (!jedisPoolKeysMap.keySet().contains(jedisPool)) { ArrayList<String> keysList = new ArrayList<>(); keysList.add(key); jedisPoolKeysMap.put(jedisPool, keysList); } else { ArrayList<String> keysList = jedisPoolKeysMap.get(jedisPool); keysList.add(key); } } for (JedisPool jedisPool : jedisPoolKeysMap.keySet()) { Jedis jedis = jedisPool.getResource(); Pipeline pipeline = jedis.pipelined(); try { if (pipeline != null) { pipeline.clear(); ArrayList<String> keysList = jedisPoolKeysMap.get(jedisPool); for (String key : keysList) { pipeline.get(key); } List<Object> results = pipeline.syncAndReturnAll(); for (int index = 0; index < results.size(); index++) { if (results.get(index) == null) { errorKeys.add(keysList.get(index)); } else { existResult.put(keysList.get(index), stringToMap(results.get(index).toString())); } } } returnResource(jedis); }catch(Exception e){ e.printStackTrace(); } } result.put("error", errorKeys); result.put("exist", existResult); return result; } /** * 通过key来获取对应的jedisPool对象 * * @param key * @return */ public JedisPool getJedisByKey(String key) { int slot = JedisClusterCRC16.getSlot(key); Map.Entry<Long, String> entry; entry = getSlotHostMap().lowerEntry(Long.valueOf(slot + 1)); if(entry == null){ logger.error("entry为空!!!!! key为:" + key + ",slot为:" + slot); return null; } return historyCtrJedisClusterBatchUtil.getNodeMap().get(entry.getValue()); } /** * 返还到连接池 * * @param jedis */ public static void returnResource(Jedis jedis) { if (jedis != null) { jedis.close(); } } /** * 节点映射关系 */ private Map<String, JedisPool> nodeMap; /** * slot和host之间的映射 */ private TreeMap<Long, String> slotHostMap; /** * 初始化JedisNodeMap */ private void initJedisNodeMap() { try { nodeMap = historyCtrJedisClusterFactory.getClusterNodes(); String anyHost = nodeMap.keySet().iterator().next(); initSlotHostMap(anyHost); } catch (JedisClusterException e) { logger.error(e.getMessage()); } } /** * 获取slot和host之间的对应关系 * * @param anyHostAndPortStr * @return */ private void initSlotHostMap(String anyHostAndPortStr) { TreeMap<Long, String> tree = new TreeMap<Long, String>(); String parts[] = anyHostAndPortStr.split(":"); HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1])); try { Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort()); List<Object> list = jedis.clusterSlots(); for (Object object : list) { List<Object> list1 = (List<Object>) object; List<Object> master = (List<Object>) list1.get(2); String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1); tree.put((Long) list1.get(0), hostAndPort); tree.put((Long) list1.get(1), hostAndPort); } jedis.close(); } catch (Exception e) { e.printStackTrace(); } slotHostMap = tree; } @Override public void afterPropertiesSet() throws Exception { initJedisNodeMap(); }
这是从我项目文件里直接复制出来的,有部分是冗余的,核心部分是getBatch、initJedisNodeMap、getJedisByKey、initSlotHostMap 这几个方法,其中getBatch是主入口
相关文章推荐
- Redis 使用 Lua 实现 split 结合 HMGET 批量读取数据
- ASP中使用XMLHTTP读取远程数据3
- 使用 XMLHttpRequest 读取 XML 数据
- 使用C#进行基于PI的开发(三)——应用PISDK和PIAPI从PI数据库读取数据
- 演练:使用 DataGrid Web 控件读取和写入数据
- 使用 SqlDataReader 读取数据示例
- ASP.NET数据库使用精典-----读取数据库中数据
- 使用Robot循环读取Excel中的数据
- 使用SqlDataReader读取数据示例
- 使用SqlDataReader读取数据示例
- 使用ifstream::get()方法从文本文件中读取数据
- [教程]使用AODKeycap读取数据
- [教程]在ADOKeycap中使用DataReader读取数据
- ■ASP中使用XMLHTTP读取远程数据3
- 第四课 使用SqlDataReader读取数据(翻译)
- 使用 CFile 来读取特定格式的数据
- vbs中使用 ADO 读取所有数据均在一行上的文本文件的代码
- ASP中使用XMLHTTP读取远程数据2
- Linux下C语言实现的简单使用线程向FIFO里写入与读取数据的例子
- ASP中使用XMLHTTP读取远程数据1