您的位置:首页 > 其它

Codis源码解析——处理slot操作(2)

2017-08-16 20:40 579 查看
这一篇我们把处理slot操作的机制讲完。还剩最后两个部分。一个是fillSlot,一个是每一个槽具体的处理方式。

本例中有两个group,将之前auto-rebalance过的slot(0-511属于group1,512-1023属于group2)



现在将slot 400到500移动到group2。我们之前走到了

s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)


首先new一个ApiClient,里面存储了proxy的地址,以及xauth信息(xauth是根据ProductName,ProductAuth以及proxy的token生成的)。下面调用Proxy的FillSlots方法,填充slot

//这里传过来的slots实际上只有一个,就是整个集群中最小的Action.State对应的Slotmapping
func (s *Proxy) FillSlots(slots []*models.Slot) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return ErrClosedProxy
}
for _, m := range slots {
if err := s.router.FillSlot(m); err != nil {
return err
}
}
return nil
}


func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) {
slot := &s.slots[m.Id]
//将slot的lock.hold属性设为true,并加锁
slot.blockAndWait()

//这里的bc就是我们之前提到过的处理redis请求的sharedBackendConn。第一次进来的时候,backend为空,这里直接返回
slot.backend.bc.Release()
slot.backend.bc = nil
slot.backend.id = 0
slot.migrate.bc.Release()
slot.migrate.bc = nil
slot.migrate.id = 0
for i := range slot.replicaGroups {
for _, bc := range slot.replicaGroups[i] {
bc.Release()
}
}
slot.replicaGroups = nil

slot.switched = switched

//这里的addr就是我们的redis服务器地址,10.0.2.15:6379
if addr := m.BackendAddr; len(addr) != 0 {
//根据地址获得sharedBackendConn
slot.backend.bc = s.pool.primary.Retain(addr)
slot.backend.id = m.BackendAddrGroupId
}
if from := m.MigrateFrom; len(from) != 0 {
slot.migrate.bc = s.pool.primary.Retain(from)
slot.migrate.id = m.MigrateFromGroupId
}
if !s.config.BackendPrimaryOnly {
for i := range m.ReplicaGroups {
var group []*sharedBackendConn
for _, addr := range m.ReplicaGroups[i] {
group = append(group, s.pool.replica.Retain(addr))
}
if len(group) == 0 {
continue
}
slot.replicaGroups = append(slot.replicaGroups, group)
}
}
if method != nil {
slot.method = method
}

if !m.Locked {
slot.unblock()
}
if !s.closed {
if slot.migrate.bc != nil {
if switched {
log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t, +switched",
slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold)
} else {
log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t",
slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold)
}
} else {
if switched {
log.Warnf("fill slot %04d, backend.addr = %s, locked = %t, +switched",
slot.id, slot.backend.bc.Addr(), slot.lock.hold)
} else {
log.Warnf("fill slot %04d, backend.addr = %s, locked = %t",
slot.id, slot.backend.bc.Addr(), slot.lock.hold)
}
}
}
}


上面的根据addr取得sharedBackendConnPool,是先从Router.pool.pimary这个sharedBackendConnPool中取,如果能取出就直接取,缓存中为空的话就新建

func (p *sharedBackendConnPool) Retain(addr string) *sharedBackendConn {
if bc := p.pool[addr]; bc != nil {
return bc.Retain()
} else {
bc = newSharedBackendConn(addr, p)
p.pool[addr] = bc
return bc
}
}
//更新每个sharedBackendConn的引用次数
func (s *sharedBackendConn) Retain() *sharedBackendConn {
if s == nil {
return nil
}

if s.refcnt <= 0 {
log.Panicf("shared backend conn has been closed")
} else {
s.refcnt++
}
return s
}

func (s *haredBackendConn) Release() {
if s == nil {
return
}
if s.refcnt <= 0 {
log.Panicf("shared backend conn has been closed, close too many times")
} else {
s.refcnt--
}
if s.refcnt != 0 {
return
}
for _, parallel := range s.conns {
for _, bc := range parallel {
bc.Close()
}
}
delete(s.owner.pool, s.addr)
}


之前我们说过,具体的redis请求都是通过sharedBackendConn来处理的,而新建sharedBackendConn就在这个地方。根据addr(也就是”10.0.2.15:6379”)获取sharedBackendConn这个过程比较复杂,详见Codis源码解析——sharedBackendConn

经过上面的过程之后,slot已经被填充了。slot的conns属性就是与redis16个库的连接



下一步,再回到之前的真正对slot做操作的一步。即上一篇中的,取出plans,对每个slot进行处理。以id为510的slot为例,这个slot要从group1被迁移到group2

for sid, _ := range plans {
fut.Add()
go func(sid int) {
log.Warnf("slot-[%d] process action", sid)
//针对每个slot做处理
var err = s.processSlotAction(sid)
if err != nil {
status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
s.action.progress.status.Store(status)
} else {
s.action.progress.status.Store("")
}
//在Future的vmap中存储slotId和对应的error,并调用WaitGroup.Done
fut.Done(strconv.Itoa(sid), err)
}(sid)
}


func (s *Topom) processSlotAction(sid int) error {
var db int = 0
for s.IsOnline() {
//返回的exec就是具体的slot操作执行函数
if exec, err := s.newSlotActionExecutor(sid); err != nil {
return err
} else if exec == nil {
time.Sleep(time.Second)
} else {
n, nextdb, err := exec(db)
if err != nil {
return err
}
log.Debugf("slot-[%d] action executor %d", sid, n)

if n == 0 && nextdb == -1 {
return s.SlotActionComplete(sid)
}
status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n)
s.action.progress.status.Store(status)

if us := s.GetSlotActionInterval(); us != 0 {
time.Sleep(time.Microsecond * time.Duration(us))
}
db = nextdb
}
}
return nil
}
func (s *Topom) newSlotActionExecutor(sid int) (func(db int) (remains int, nextdb int, err error), error) {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return nil, err
}

//根据slot的id获取SlotMapping,主要方法就是return ctx.slots[sid], nil
m, err := ctx.getSlotMapping(sid)
if err != nil {
return nil, err
}

switch m.Action.State {

//最初slot还处在迁移过程中,即migrating
case models.ActionMigrating:

if s.action.disabled.IsTrue() {
return nil, nil
}
//查看m所在的group,如果存在group,而且其Promoting.State不为空字符串,就返回true;否则返回false
if ctx.isGroupPromoting(m.GroupId) {
return nil, nil
}
if ctx.isGroupPromoting(m.Action.TargetId) {
return nil, nil
}

//迁移过程中,一个slot本身所在的group以及目标group的Promoting.State都必须为空才可以做迁移
from := ctx.getGroupMaster(m.GroupId)
//取出group2的第一个server,也就是master,dest是"10.0.2.15:6380"
dest := ctx.getGroupMaster(m.Action.TargetId)

//Topom的action中的计数器加一
s.action.executor.Incr()

return func(db int) (int, int, error) {
//每执行一个槽的迁移操作,Topom的action中的计数器就减1
defer s.action.executor.Decr()
if from == "" {
return 0, -1, nil
}
//从cache中得到group1的redisClient,这个client由addr(10.0.2.15:6379), auth, timeout,Database,redigo.Conn组成;如果cache没有,就新建。详见文末的Client
c, err := s.action.redisp.GetClient(from)
if err != nil {
return 0, -1, err
}
//将刚才新建的或者从cache中取出的redis client再put到Topom.action.redisp中
defer s.action.redisp.PutClient(c)

//这里db是0,相当于redis从16个库中选择0号
if err := c.Select(db); err != nil {
return 0, -1, err
}
var do func() (int, error)

method, _ := models.ParseForwardMethod(s.config.MigrationMethod)
switch method {
case models.ForwardSync:
do = func() (int, error) {
//调用redis的SLOTSMGRTTAGSLOT命令,随机选择当前slot的一个key,并将与这个key的tag相同的k-v全部迁移到目标机
return c.MigrateSlot(sid, dest)
}
case models.ForwardSemiAsync:
var option = &redis.MigrateSlotAsyncOption{
MaxBulks: s.config.MigrationAsyncMaxBulks,
MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
NumKeys:  s.config.MigrationAsyncNumKeys,
Timeout: math2.MinDuration(time.Second*5,
s.config.MigrationTimeout.Duration()),
}
do = func() (int, error) {
//调用redis的SLOTSMGRTTAGSLOT-ASYNC命令,参数是target redis的ip和port,也就是10.0.2.15和6380
return c.MigrateSlotAsync(sid, dest, option)
}
default:
log.Panicf("unknown forward method %d", int(method))
}

n, err := do()
if err != nil {
return 0, -1, err
} else if n != 0 {
return n, db, nil
}

nextdb := -1
//通过info命令查keyspace信息并做处理,这里取出的m为空
m, err := c.InfoKeySpace()
if err != nil {
return 0, -1, err
}
for i := range m {
if (nextdb == -1 || i < nextdb) && db < i {
nextdb = i
}
}
//返回的nextdb是-1
return 0, nextdb, nil
}, nil

case models.ActionFinished:

return func(int) (int, int, error) {
return 0, -1, nil
}, nil

default:

return nil, errors.Errorf("slot-[%d] action state is invalid", m.Id)

}
}
func (s *Topom) SlotActionComplete(sid int) error {
s.mu.Lock()
defer s.mu.Unlock()
ctx, err := s.newContext()
if err != nil {
return err
}

m, err := ctx.getSlotMapping(sid)
if err != nil {
return err
}

log.Warnf("slot-[%d] action complete:\n%s", m.Id, m.Encode())

switch m.Action.State {

//首先状态是migrating
case models.ActionMigrating:

defer s.dirtySlotsCache(m.Id)

//推进到finished
m.Action.State = models.ActionFinished
if err := s.storeUpdateSlotMapping(m); err != nil {
return err
}

fallthrough

case models.ActionFinished:

log.Warnf("slot-[%d] resync to finished", m.Id)

//这个方法在上一节介绍过,本节就不再赘述了
if err := s.resyncSlotMappings(ctx, m); err != nil {
log.Warnf("slot-[%d] resync to finished failed", m.Id)
return err
}
defer s.dirtySlotsCache(m.Id)

m = &models.SlotMapping{
Id:      m.Id,
GroupId: m.Action.TargetId,
}
return s.storeUpdateSlotMapping(m)

default:

return errors.Errorf("slot-[%d] action state is invalid", m.Id)

}
}


在Codis中,对于redis的操作都是通过Client中的redigo.Conn(一个第三方redis客户端)发命令的。如下所示

type Client struct {
conn redigo.Conn
Addr string
Auth string

Database int

LastUse time.Time
Timeout time.Duration
}
func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) {
r, err := c.conn.Do(cmd, args...)
if err != nil {
return nil, errors.Trace(err)
}
c.LastUse = time.Now()

if err, ok := r.(redigo.Error); ok {
return nil, errors.Trace(err)
}
return r, nil
}


到这里,slot的操作就结束了。在页面中可以看到slot已经迁移成功



从dashboard的日志中,我们也能了解到,首先是将所有slot的action.state设为pending,然后再逐个迁移每个slot。下面以id为1022的slot迁移到group1举例

//之前已经先把所有slot的action.state设为pending
.
.
.
2017/10/26 10:04:45 topom_slots.go:240: [WARN] slot-[1022] action prepare:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "pending",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "preparing",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_slots.go:259: [WARN] slot-[1022] resync to prepared
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "prepared",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_slots.go:279: [WARN] slot-[1022] resync to migrating
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "migrating",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_action.go:56: [WARN] slot-[1022] process action
2017/10/26 10:04:45 topom_slots.go:320: [WARN] slot-[1022] action complete:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "migrating",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
"id": 1022,
"group_id": 0,
"action": {
"index": 1023,
"state": "finished",
"target_id": 1
}
}
2017/10/26 10:04:45 topom_slots.go:337: [WARN] slot-[1022] resync to finished
2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]:
{
"id": 1022,
"group_id": 1,
"action": {}
}


总结一下,slot的迁移工作是很复杂的。对集群中的slot手动进行rebalance之后,每个slot都被指定了相应的迁移计划。对集群中的slot做SlotActionPrepareFilter处理,先找Action.State既不为空也不是pending的SlotMapping中Action.Index最小的SlotMapping,找不到的话就去找Action.State为pending的SlotMapping中Action.Index最小的SlotMapping,找到之后逐个变更每个SlotMapping的action.state,在zk上更新,Action.state符合preparing或者prepared的时候,要调用Topom的resyncSlotMappings方法,根据SlotMapping的参数同步到models.Slot(ctx.toSlotSlice方法),再同步到Proxy.Slot(Router.fillslot方法),这个过程中,每个Slot都被分配了backendConn,这个backendConn是从Proxy.Router.pool中取出来的(没有就新建,再放到pool中)。

上面的迁移准备工作完成之后,再逐个处理每一个slot的迁移。只有一个slot本身所在的group以及目标group的Promoting.State都为空时,才可以做迁移。槽的迁移是由Topom.action.redisp里面的from client来进行的,分为sync和semi-sync。每一个slot迁移完成之后,再调用SlotActionComplete,推进slot的action.state,更新zk上的信息,并调用resyncSlotMappings同步集群中的slot信息,例如释放掉migrate.bc。

说明

如有转载,请注明出处

http://blog.csdn.net/antony9118/article/details/77277540
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: