您的位置:首页 > 数据库 > Redis

使用BookSleeve通过twemproxy 操作 Redis集群

2014-01-17 21:28 537 查看
本文地址:http://blog.csdn.net/wangjia184/article/details/18418911

Redis Cluster 正式版目前来说还遥遥无期。
比较流行的替代方式是twitter开源的 twemproxy (code name:nutcrack) ,作为Redis的负载均衡器来实现数据分区(data partition)与故障转移(failure over)

典型的twemproxy配置文件(/etc/nutcracker/nutcracker.yml)如下
twemproxy:
listen: 0.0.0.0:22121
hash: fnv1a_64
distribution: ketama
auto_eject_hosts: true
redis: true
server_connections: 10
server_retry_timeout: 2000
server_failure_limit: 5
servers:
- 127.0.0.1:6379:1
- 192.168.1.189:6379:1

具体的配置介绍可以在GitHub上查到。

需要注意的是,通过twemproxy访问redis的话,只能调用单服务器即时命令,而像阻塞类命令、多命令事务执行、订阅与发布、以及执行脚本查询服务器信息等命令等都是不被支持的。从https://github.com/twitter/twemproxy/blob/master/notes/redis.md 可以查到支持的命令。

booksleeve是Stack Exchange开源的高性能redis客户端库, 它提供纯异步TPL的接口, 并实现了pipeline模式。但如果使用它直连twemproxy的话,因为它使用到了不被twemproxy支持的命令,导致链接在打开后会马上关闭,这个问题目前已经报告在此

为解决此问题,可采用如下方式.
首先,当创建了RedisConnection对象后,应首先调用SetKeepAlive屏蔽掉KeepAlive特性(PING命令不被twemproxy支持);然后调用SetServerVersion指定服务器的版本与类型,这样可以避免它执行不被twemproxy所支持的INFO等操作。
public class RedisConn : RedisConnection
{
private static readonly string _server = ConfigurationManager.AppSettings["Redis.Server"];
private static readonly int _port = int.Parse(ConfigurationManager.AppSettings["Redis.Port"]);

public RedisConn()
: base ( _server, _port)
{
base.SetKeepAlive(0);
base.SetServerVersion( new Version("2.6.16"), BookSleeve.ServerType.Master);
}
}
然后就需要修改Booksleeve的源码了。
修改RedisConnectionBase.cs文件

protected Task<long> PingImpl(bool queueJump, bool duringInit = false, object state = null)
{
var msg = new PingMessage();
if(duringInit) msg.DuringInit();
return ExecuteInt64(msg, queueJump, state);
}
改为
protected Task PingImpl(bool queueJump, bool duringInit = false, object state = null)
{
return Task.Delay(0);
}

修改IServerCommands.cs,Task<long> Ping(bool queueJump = false);

Task<long> IServerCommands.Ping(bool queueJump)
{
return base.PingImpl(queueJump, duringInit: false);
}

[Obsolete("Please use the Server API", false), EditorBrowsable(EditorBrowsableState.Never)]
public Task<long> Ping(bool queueJump = false)
{
return Server.Ping(queueJump);
}

改为
Task Ping(bool queueJump = false);

Task IServerCommands.Ping(bool queueJump)
{
return base.PingImpl(queueJump, duringInit: false);
}
[Obsolete("Please use the Server API", false), EditorBrowsable(EditorBrowsableState.Never)]
public Task Ping(bool queueJump = false)
{
return Server.Ping(queueJump);
}


修改RedisConnection.cs文件
public Counters GetCounters(bool allowTalkToServer)
{
int messagesSent, messagesReceived, queueJumpers, messagesCancelled, unsent, errorMessages, timeouts, syncCallbacks, asyncCallbacks, syncCallbacksInProgress, asyncCallbacksInProgress;
GetCounterValues(out messagesSent, out messagesReceived, out queueJumpers, out messagesCancelled, out unsent, out errorMessages, out timeouts, out syncCallbacks, out asyncCallbacks, out syncCallbacksInProgress, out asyncCallbacksInProgress);
return new Counters(
messagesSent, messagesReceived, queueJumpers, messagesCancelled,
timeouts, unsent, errorMessages, syncCallbacks, asyncCallbacks, syncCallbacksInProgress, asyncCallbacksInProgress,
GetSentCount(),
GetDbUsage(), LastSentMillisecondsAgo, LastKeepAliveMillisecondsAgo, KeepAliveSeconds, State,
// important that ping happens last, as this may artificially drain the queues
allowTalkToServer ? (int)Wait(Server.Ping()) : -1
);
}
改为
public Counters GetCounters(bool allowTalkToServer)
{
int messagesSent, messagesReceived, queueJumpers, messagesCancelled, unsent, errorMessages, timeouts, syncCallbacks, asyncCallbacks, syncCallbacksInProgress, asyncCallbacksInProgress;
GetCounterValues(out messagesSent, out messagesReceived, out queueJumpers, out messagesCancelled, out unsent, out errorMessages, out timeouts, out syncCallbacks, out asyncCallbacks, out syncCallbacksInProgress, out asyncCallbacksInProgress);
return new Counters(
messagesSent, messagesReceived, queueJumpers, messagesCancelled,
timeouts, unsent, errorMessages, syncCallbacks, asyncCallbacks, syncCallbacksInProgress, asyncCallbacksInProgress,
GetSentCount(),
GetDbUsage(), LastSentMillisecondsAgo, LastKeepAliveMillisecondsAgo, KeepAliveSeconds, State,
// important that ping happens last, as this may artificially drain the queues
allowTalkToServer ? 0 : -1
);
}
本文地址:http://blog.csdn.net/wangjia184/article/details/18418911
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: