Jedis不相信眼泪
2015-12-15 20:16
746 查看
最近愉快地上线了一个新的调度系统,主节点和从节点使用redis队列进行通信,然后奇怪的事情发生了,有一个海外的节点,和redis之间的网络一直比较差,经常出现异常(类似下面):
一开始以为是纯粹的网络延时太大,于是加大了的socket的超时时间,可是这个异常还是如大姨妈一般时常来访,而在重启服务之后,异常便会消失。
于是在本地做了下测试,发现当断开redis连接之后重连,便会不断出现如上异常。
可是奇怪的是异常只出现在使用redis管道的地方,其他redis操作一切正常,如下:
在调用管道的sync方法时,抛出了异常,本着先简单快速解决问题的思路,试着把jedis客户端从2.1升级到2.7,再测试,异常消失了,于是更怀疑这是jedis客户端的bug。
异常是没出现了,可是原因依旧未知,于是继续测试,先贴一下部分测试代码:
代码很好理解,不断往redis队列里面push消息,再批量pop出来消费,断开重连redis之后,batchPop方法抛异常,push方法正常。再贴一下其中使用到redis管道的方法,目光如炬的小伙伴可能已经发现问题所在了:
该方法使用jedis连接池获取连接,每次批量从redis队列中pop消息。看到这里的代码,立刻把Jedis连接池pool列入可疑名单中,猜测会不会是returnBrokenResource的时候,没有把断开的连接给销毁掉。
为了排除是否连接池也有问题,所以将以上代码改为每次操作时,先new一个新的Jedis,而不是使用连接池,结果异常消失了。
然后自己又根据jedis的源代码,使用Apache的Commons Pool 1.5.5写了个简单的连接池(jedis 2.1也是使用了这个版本的Commons Pool),将创建对象,销毁对象每一步的日志都打印出来,并且对jedis进行一层封装,赋予一个ID,这样易于观察每个jedis链接的生命周期,部分代码如下:
使用Commons Pool时,需要创建一个工厂类,将jedis连接池源代码中的工厂类拷贝出来使用即可:
经测试,确实调用了destroyObject方法,但是每次从连接池中拿到的都是上次销毁的jedis连接。
回头看一下上面的batchPop方法,发现问题所在了,当抛出JedisConnectionException异常时,调用了returnBrokenResource,将连接放回池中销毁,但是最后的finally代码块里又调用了一次returnResource。没错,这里两次将连接放回了连接池里,所以这个broken的jedis根本没有被销毁掉,又被重复拿出来使用了。
但是,真相到这里还远远没有明了,为啥使用管道lpop的时候异常,使用非管道push的时候却是正常的?
深入看一下jedis管道lpop的源代码,如下:
发现它们其实都使用了this.client.lpop(key),继续进入这个方法,一直点啊点,直到这里:
一个重要的方法出现了:connect(),点进去:
没错,所有的redis操作都会调用到sendCommand方法,而该方法会先判断jedis客户端是否连接,如果不是,则重新new一个socket,这就解释了,为什么断开redis之后重连,可以继续往redis队列中push消息,但是,更疑惑的是,既然重新new了socket,为何管道操作还是会报异常。
继续测试,发现虽然 JedisUtil.batchPop抛出了异常,但是redis队列的长度并没有在增加,所以队列里面的消息应该已经被成功pop出来了,而且,当batchPop的数量为1时,程序竟然偶尔会恢复正常。
继续研究p
ab41
ipline的sync代码,发下如下:
其中pipelinedCommands记录了往redis发送的命令的条数(见上面的sendCommand方法),所以调用sync取返回结果时,要循环pipelinedCommands次,从输入流中读取返回结果,程序也是在这里抛出了SocketTimeoutException。对比这里的sync方法与其他非管道操作获取结果的方法,区别便在于其他操作只调用了一次Protocol.read(this.inputStream),而管道要调用pipelinedCommands次。
注意到这点,真相已经隐隐约约出现了,做个假设,如果pipelinedCommands的数量大于所要读取的返回结果,那么这里是否就读不到输入流的结果,然后就一直阻塞直到socket timeout?
重新回过头看一下前面的代码,梳理整个流程,测试过程中,最耗时的其实是在调用sync读取返回结果,所以连接中断最有可能在这个时候发生,抛出异常,然后这个已经broken的jedis又被放回到了连接池中,下一次又被重新从连接池中拿出来,此时调用pipline.sync之前,虽然socket又重新new了一遍,但是pipelinedCommands的值并没有清0啊,因此调用batchPop的时候,pipelinedCommands的数量要大于所要返回的结果数量,因此读取输入流时阻塞,直到抛出socket timeout异常。
测试中,如果批量获取1条记录,就减少了pipelinedCommands大于返回结果数量的概率,所以偶尔可以测试正常!
另外,看一下jedis的配置文件:
发现从连接池获取jedis之前都会检查是否已连接,如果获取到的是broken的jedis,应该会重新创建一个才对。仔细打印出redis.pool.testOnBorrow的值,发现被坑了,这里后面多了个空格!空格!空格……WTF!
再接着看一下jedis 2.7,发现其用了commons pool2-2.3,对比Commons Pool 1.5.5的源代码(好累,不贴代码了),在新的版本中,如果一个对象如果已经被销毁,则不能重新放回对象池中,因此每次从连接池中拿到的是新的jedis,所以没有出现上面的异常。
到这里,总结一下,这个问题的发生其实是正好三方的代码都有问题:
业务代码中,重复将已经broken的jedis放回了连接池中;
Commons Pool 1.5.5又没有对重复returnObject做限制;
jedis重新new socket的时候,也没有清空pipelinedCommands。
在新版本的jedis客户端中,其实已经简化了编程,如下使用即可:
看一下jedis.close的源代码,如下:
这里已经根据客户端是否为broken,决定如何返还连接。当发生连接异常时,client.borken将被标志。
回头想一想,在查找这个问题时,犯了一些错误,如果一开始就可以仔细梳理代码流程,不要盲目测试,也不用花这么多时间。
与君共勉吧!
简单,率性,不虚妄
redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.jedis.Protocol.process(Protocol.java:79) at redis.clients.jedis.Protocol.read(Protocol.java:131) at redis.clients.jedis.Connection.getAll(Connection.java:225) at redis.clients.jedis.Connection.getAll(Connection.java:217) at redis.clients.jedis.Pipeline.sync(Pipeline.java:68) at com.game.data.stat.share.redis.JedisUtil.batchPop(JedisUtil.java:151) at com.game.data.stat.share.db.JedisUtilTest.testBatchPop(JedisUtilTest.java:185) at com.game.data.stat.share.db.JedisUtilTest.main(JedisUtilTest.java:488) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at java.net.SocketInputStream.read(Unknown Source) at redis.clients.util.RedisInputStream.fill(RedisInputStream.java:109) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:45) at redis.clients.jedis.Protocol.process(Protocol.java:64) ... 7 more
一开始以为是纯粹的网络延时太大,于是加大了的socket的超时时间,可是这个异常还是如大姨妈一般时常来访,而在重启服务之后,异常便会消失。
于是在本地做了下测试,发现当断开redis连接之后重连,便会不断出现如上异常。
可是奇怪的是异常只出现在使用redis管道的地方,其他redis操作一切正常,如下:
Pipeline pipe = jedis.pipelined(); List<Response<String>> responseList = new ArrayList<Response<String>>(); for (int i = 0; i < size; i++) { responseList.add(pipe.lpop(key)); } pipe.sync();
在调用管道的sync方法时,抛出了异常,本着先简单快速解决问题的思路,试着把jedis客户端从2.1升级到2.7,再测试,异常消失了,于是更怀疑这是jedis客户端的bug。
异常是没出现了,可是原因依旧未知,于是继续测试,先贴一下部分测试代码:
public static void testBatchPop() { String key = "test_key"; // for (int i = 0; i < 10; i++) { // JedisUtil.push(key, i + ""); // } List<String> list = null; String value; while (true) { try { value = System.currentTimeMillis() + ""; System.out.println("push:" + value + ", return:" + JedisUtil.push(key, value)); System.out.println("push:" + value + ", return:" + JedisUtil.push(key, value)); // batch pop try { // St 4000 ring str = JedisUtil.pop(key); // System.out.println("pop:" + str); // 使用redis管道批量lpop list = JedisUtil.batchPop(key, 3); if (list == null || list.isEmpty()) { System.out.println("empty list"); } else { for (String str : list) { System.out.println("pop:" + str); } } } catch (Exception e) { System.out.println(ExceptionUtil.parseString(e)); } System.out.println("length of " + key + ":" + JedisUtil.llen(key)); } catch (Exception e) { System.out.println(ExceptionUtil.parseString(e)); } try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(ExceptionUtil.parseString(e)); } } }
代码很好理解,不断往redis队列里面push消息,再批量pop出来消费,断开重连redis之后,batchPop方法抛异常,push方法正常。再贴一下其中使用到redis管道的方法,目光如炬的小伙伴可能已经发现问题所在了:
public static List<String> batchPop(String key, int size) {
Jedis jedis = pool.getResource();
try {
Pipeline pipe = jedis.pipelined(); List<Response<String>> responseList = new ArrayList<Response<String>>(); for (int i = 0; i < size; i++) { responseList.add(pipe.lpop(key)); } pipe.sync();
List<String> list = new ArrayList<String>();
for (Response<String> response : responseList) {
if (response != null && StringUtils.isNotBlank(response.get())
&& !NULL_RES.equals(response.get())) {
list.add(response.get());
}
}
return list;
} catch (JedisConnectionException ex) {
log.error("Jedis Connection Exception");
returnBrokenResource(pool, jedis);
throw ex;
} finally {
returnResource(pool, jedis);
}
}
该方法使用jedis连接池获取连接,每次批量从redis队列中pop消息。看到这里的代码,立刻把Jedis连接池pool列入可疑名单中,猜测会不会是returnBrokenResource的时候,没有把断开的连接给销毁掉。
为了排除是否连接池也有问题,所以将以上代码改为每次操作时,先new一个新的Jedis,而不是使用连接池,结果异常消失了。
然后自己又根据jedis的源代码,使用Apache的Commons Pool 1.5.5写了个简单的连接池(jedis 2.1也是使用了这个版本的Commons Pool),将创建对象,销毁对象每一步的日志都打印出来,并且对jedis进行一层封装,赋予一个ID,这样易于观察每个jedis链接的生命周期,部分代码如下:
private static class JedisTest extends Jedis { private int jedisID; public int getJedisID() { return jedisID; } public void setJedisID(int jedisID) { this.jedisID = jedisID; } public JedisTest(String host, int port, int connectionTimeout, int soTimeout, int jedisID) { super(host, port, connectionTimeout, soTimeout); this.jedisID = jedisID; } }
使用Commons Pool时,需要创建一个工厂类,将jedis连接池源代码中的工厂类拷贝出来使用即可:
private static int NUM; // jedis 2.1 private static class JedisFactory extends BasePoolableObjectFactory { private final String host; private final int port; private final int timeout; private final String password; private final int database; public JedisFactory(String host, int port, int timeout, String password, int database) { this.host = host; this.port = port; this.timeout = timeout; this.password = password; this.database = database; } public Object makeObject() throws Exception { NUM++; JedisTest jedis = new JedisTest(this.host, this.port, this.timeout,NUM); System.out.println("makeObject jedisID:" + jedis.getJedisID()); jedis.connect(); if (null != this.password) { jedis.auth(this.password); } if (this.database != 0) { jedis.select(this.database); } return jedis; } public void destroyObject(Object obj) throws Exception { if (obj instanceof Jedis) { JedisTest jedis = (JedisTest) obj; System.out.println("destroyObject jedisID:" + jedis.getJedisID()); if (!(jedis.isConnected())) return; try { try { jedis.quit(); } catch (Exception e) { } jedis.disconnect(); } catch (Exception e) { } } } public boolean validateObject(Object obj) { boolean res = false; if (obj instanceof Jedis) { JedisTest jedis = (JedisTest) obj; System.out.println("validateObject jedisID:" + jedis.getJedisID()); try { res = ((jedis.isConnected()) && (jedis.ping() .equals("PONG"))); } catch (Exception e) { res = false; } } System.out.println("validateObject return:" + res); return res; } }
经测试,确实调用了destroyObject方法,但是每次从连接池中拿到的都是上次销毁的jedis连接。
回头看一下上面的batchPop方法,发现问题所在了,当抛出JedisConnectionException异常时,调用了returnBrokenResource,将连接放回池中销毁,但是最后的finally代码块里又调用了一次returnResource。没错,这里两次将连接放回了连接池里,所以这个broken的jedis根本没有被销毁掉,又被重复拿出来使用了。
但是,真相到这里还远远没有明了,为啥使用管道lpop的时候异常,使用非管道push的时候却是正常的?
深入看一下jedis管道lpop的源代码,如下:
public Response<String> lpop(String key) { this.client.lpop(key); return getResponse(BuilderFactory.STRING); } 对比下非管道时的lpop方法: public String lpop(String key) { checkIsInMulti(); this.client.lpop(key); return this.client.getBulkReply(); }
发现它们其实都使用了this.client.lpop(key),继续进入这个方法,一直点啊点,直到这里:
protected Connection sendCommand(Protocol.Command cmd, byte[][] args) { connect(); Protocol.sendCommand(this.outputStream, cmd, args); this.pipelinedCommands += 1; return this; }
一个重要的方法出现了:connect(),点进去:
public void connect() { if (isConnected()) return; try { this.socket = new Socket(); this.socket.setReuseAddress(true); this.socket.setKeepAlive(true); this.socket.setTcpNoDelay(true); this.socket.setSoLinger(true, 0); this.socket.connect(new InetSocketAddress(this.host, this.port), this.timeout); this.socket.setSoTimeout(this.timeout); this.outputStream = new RedisOutputStream( this.socket.getOutputStream()); this.inputStream = new RedisInputStream( this.socket.getInputStream()); } catch (IOException ex) { throw new JedisConnectionException(ex); } }
没错,所有的redis操作都会调用到sendCommand方法,而该方法会先判断jedis客户端是否连接,如果不是,则重新new一个socket,这就解释了,为什么断开redis之后重连,可以继续往redis队列中push消息,但是,更疑惑的是,既然重新new了socket,为何管道操作还是会报异常。
继续测试,发现虽然 JedisUtil.batchPop抛出了异常,但是redis队列的长度并没有在增加,所以队列里面的消息应该已经被成功pop出来了,而且,当batchPop的数量为1时,程序竟然偶尔会恢复正常。
继续研究p
ab41
ipline的sync代码,发下如下:
public List<Object> getAll(int except) { List all = new ArrayList(); flush(); while (this.pipelinedCommands > except) { try { all.add(Protocol.read(this.inputStream)); } catch (JedisDataException e) { all.add(e); } this.pipelinedCommands -= 1; } return all; }
其中pipelinedCommands记录了往redis发送的命令的条数(见上面的sendCommand方法),所以调用sync取返回结果时,要循环pipelinedCommands次,从输入流中读取返回结果,程序也是在这里抛出了SocketTimeoutException。对比这里的sync方法与其他非管道操作获取结果的方法,区别便在于其他操作只调用了一次Protocol.read(this.inputStream),而管道要调用pipelinedCommands次。
注意到这点,真相已经隐隐约约出现了,做个假设,如果pipelinedCommands的数量大于所要读取的返回结果,那么这里是否就读不到输入流的结果,然后就一直阻塞直到socket timeout?
重新回过头看一下前面的代码,梳理整个流程,测试过程中,最耗时的其实是在调用sync读取返回结果,所以连接中断最有可能在这个时候发生,抛出异常,然后这个已经broken的jedis又被放回到了连接池中,下一次又被重新从连接池中拿出来,此时调用pipline.sync之前,虽然socket又重新new了一遍,但是pipelinedCommands的值并没有清0啊,因此调用batchPop的时候,pipelinedCommands的数量要大于所要返回的结果数量,因此读取输入流时阻塞,直到抛出socket timeout异常。
测试中,如果批量获取1条记录,就减少了pipelinedCommands大于返回结果数量的概率,所以偶尔可以测试正常!
另外,看一下jedis的配置文件:
redis.pool.testOnBorrow=true redis.pool.testOnReturn=true redis.pool.testWhileIdle=true
发现从连接池获取jedis之前都会检查是否已连接,如果获取到的是broken的jedis,应该会重新创建一个才对。仔细打印出redis.pool.testOnBorrow的值,发现被坑了,这里后面多了个空格!空格!空格……WTF!
再接着看一下jedis 2.7,发现其用了commons pool2-2.3,对比Commons Pool 1.5.5的源代码(好累,不贴代码了),在新的版本中,如果一个对象如果已经被销毁,则不能重新放回对象池中,因此每次从连接池中拿到的是新的jedis,所以没有出现上面的异常。
到这里,总结一下,这个问题的发生其实是正好三方的代码都有问题:
业务代码中,重复将已经broken的jedis放回了连接池中;
Commons Pool 1.5.5又没有对重复returnObject做限制;
jedis重新new socket的时候,也没有清空pipelinedCommands。
在新版本的jedis客户端中,其实已经简化了编程,如下使用即可:
public static long push(String queueName, String value) { Jedis jedis = null; try { jedis = pool.getResource(); // 以队列右边为尾,将数据从右边插入 return jedis.rpush(queueName, value); } finally { if (jedis != null) { jedis.close(); } } }
看一下jedis.close的源代码,如下:
public void close() { if (this.dataSource != null) { if (this.client.isBroken()) this.dataSource.returnBrokenResource(this); else this.dataSource.returnResource(this); } else this.client.close(); }
这里已经根据客户端是否为broken,决定如何返还连接。当发生连接异常时,client.borken将被标志。
回头想一想,在查找这个问题时,犯了一些错误,如果一开始就可以仔细梳理代码流程,不要盲目测试,也不用花这么多时间。
与君共勉吧!
简单,率性,不虚妄