您的位置:首页 > 数据库 > Redis

[Redis]RedisCluster使用pipeline批量读取数据

2017-12-03 21:38 190 查看
 因为公司的Redis是Cluster的,一共有三台主机,有个需求是同时读取三千条数据,因为是Cluster,所以没办法直接使用mget。

 目前在网上找到的方法有两种,一种是写入时处理,一种是读取时处理。

 先说一下RedisCluster针对每条数据的写入的方法。RedisCluster一共有16384个slot(槽位),cluster每一个节点分管一部分slot。当有数据写入时,当用户put或者是get一个数据的时候,首先会查找这个数据对应的槽位是多少,然后查找对应的节点,然后才把数据放入这个节点。查找这个数据对应的槽位的方法就是对数据的key取模,即 CRC16(key) mod 16384,得到的结果就是写入数据存放的slot位置。

 但是当要对rediscluster进行mget批量读取的时候,因为是有不同的节点,每个节点管理一部分slot,将导致对存放在不同slot上的数据读取失败。

 如果还想要使用批量读取应该怎么办呢?目前了解到的解决方法有两种,一种是使用hash_tag模式读写。简单说就是使用”{}”来将要hash的key的部分包裹起来,rediscluster写入数据时只会对key中被”{}”包裹部分进行哈希取模计算slot位置。即存入时使用 “a{123}”和”b{123}”是在同一个slot上。这样就可以批量读取存放在同一个slot上的数据。

 第二种方法是在批量读取时,先计算所有数据的存放节点。具体做法是,我们已经知道了rediscluster对数据哈希取模的算法,可以先计算数据存放的slot位置,然后我们又可以很容易知道每个节点分管的slot段。这样,我们就可以通过key来计算出数据存放在哪个节点上。然后根据不同的节点将数据分成多批。对不同批的数据进行分批pipeline处理。

核心代码如下:

/**
* JedisPool和Keys的映射关系
*/
private Map<JedisPool, ArrayList<String>> jedisPoolKeysMap;

/**
* 批量查询数据
*
* @param keys
*
c832
@return
*/
public Map<String, Object> getBatch(String... keys) {

// 返回的结果,包括正确的keys的value的集合;和不存在的keys的集合
Map<String, Object> result = new HashMap<>(16);

// 正确的keys的value的集合
Map<String, Map<String, Double>> existResult = new HashMap<>(16);

// 错误的key的集合
Set<String> errorKeys = new HashSet<>(16);

// JedisPool和Keys的映射关系
jedisPoolKeysMap = new HashMap<JedisPool, ArrayList<String>>();

for (String key : keys) {
JedisPool jedisPool = getJedisByKey(key);
if (jedisPool == null){
continue;
}
if (!jedisPoolKeysMap.keySet().contains(jedisPool)) {
ArrayList<String> keysList = new ArrayList<>();
keysList.add(key);
jedisPoolKeysMap.put(jedisPool, keysList);
} else {
ArrayList<String> keysList = jedisPoolKeysMap.get(jedisPool);
keysList.add(key);
}
}

for (JedisPool jedisPool : jedisPoolKeysMap.keySet()) {
Jedis jedis = jedisPool.getResource();

Pipeline pipeline = jedis.pipelined();
try {
if (pipeline != null) {
pipeline.clear();

ArrayList<String> keysList = jedisPoolKeysMap.get(jedisPool);
for (String key : keysList) {
pipeline.get(key);
}
List<Object> results = pipeline.syncAndReturnAll();

for (int index = 0; index < results.size(); index++) {
if (results.get(index) == null) {
errorKeys.add(keysList.get(index));
} else {
existResult.put(keysList.get(index), stringToMap(results.get(index).toString()));
}
}
}
returnResource(jedis);
}catch(Exception e){
e.printStackTrace();
}
}
result.put("error", errorKeys);
result.put("exist", existResult);
return result;
}

/**
* 通过key来获取对应的jedisPool对象
*
* @param key
* @return
*/
public JedisPool getJedisByKey(String key) {
int slot = JedisClusterCRC16.getSlot(key);
Map.Entry<Long, String> entry;
entry = getSlotHostMap().lowerEntry(Long.valueOf(slot + 1));

if(entry == null){
logger.error("entry为空!!!!! key为:" + key + ",slot为:" + slot);
return null;
}
return historyCtrJedisClusterBatchUtil.getNodeMap().get(entry.getValue());
}

/**
* 返还到连接池
*
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedis.close();
}
}

/**
* 节点映射关系
*/
private Map<String, JedisPool> nodeMap;

/**
* slot和host之间的映射
*/
private TreeMap<Long, String> slotHostMap;

/**
* 初始化JedisNodeMap
*/
private void initJedisNodeMap() {
try {
nodeMap = historyCtrJedisClusterFactory.getClusterNodes();
String anyHost = nodeMap.keySet().iterator().next();
initSlotHostMap(anyHost);
} catch (JedisClusterException e) {
logger.error(e.getMessage());
}
}

/**
* 获取slot和host之间的对应关系
*
* @param anyHostAndPortStr
* @return
*/
private void initSlotHostMap(String anyHostAndPortStr) {
TreeMap<Long, String> tree = new TreeMap<Long, String>();
String parts[] = anyHostAndPortStr.split(":");
HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
try {
Jedis jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
List<Object> list = jedis.clusterSlots();
for (Object object : list) {
List<Object> list1 = (List<Object>) object;
List<Object> master = (List<Object>) list1.get(2);
String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
tree.put((Long) list1.get(0), hostAndPort);
tree.put((Long) list1.get(1), hostAndPort);
}
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
slotHostMap = tree;
}

@Override
public void afterPropertiesSet() throws Exception {
initJedisNodeMap();
}


这是从我项目文件里直接复制出来的,有部分是冗余的,核心部分是getBatch、initJedisNodeMap、getJedisByKey、initSlotHostMap 这几个方法,其中getBatch是主入口
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis