Codis源码解析——处理slot操作(2)
2017-08-16 20:40
579 查看
这一篇我们把处理slot操作的机制讲完。还剩最后两个部分。一个是fillSlot,一个是每一个槽具体的处理方式。
本例中有两个group,将之前auto-rebalance过的slot(0-511属于group1,512-1023属于group2)
现在将slot 400到500移动到group2。我们之前走到了
首先new一个ApiClient,里面存储了proxy的地址,以及xauth信息(xauth是根据ProductName,ProductAuth以及proxy的token生成的)。下面调用Proxy的FillSlots方法,填充slot
上面的根据addr取得sharedBackendConnPool,是先从Router.pool.pimary这个sharedBackendConnPool中取,如果能取出就直接取,缓存中为空的话就新建
之前我们说过,具体的redis请求都是通过sharedBackendConn来处理的,而新建sharedBackendConn就在这个地方。根据addr(也就是”10.0.2.15:6379”)获取sharedBackendConn这个过程比较复杂,详见Codis源码解析——sharedBackendConn
经过上面的过程之后,slot已经被填充了。slot的conns属性就是与redis16个库的连接
下一步,再回到之前的真正对slot做操作的一步。即上一篇中的,取出plans,对每个slot进行处理。以id为510的slot为例,这个slot要从group1被迁移到group2
在Codis中,对于redis的操作都是通过Client中的redigo.Conn(一个第三方redis客户端)发命令的。如下所示
到这里,slot的操作就结束了。在页面中可以看到slot已经迁移成功
从dashboard的日志中,我们也能了解到,首先是将所有slot的action.state设为pending,然后再逐个迁移每个slot。下面以id为1022的slot迁移到group1举例
总结一下,slot的迁移工作是很复杂的。对集群中的slot手动进行rebalance之后,每个slot都被指定了相应的迁移计划。对集群中的slot做SlotActionPrepareFilter处理,先找Action.State既不为空也不是pending的SlotMapping中Action.Index最小的SlotMapping,找不到的话就去找Action.State为pending的SlotMapping中Action.Index最小的SlotMapping,找到之后逐个变更每个SlotMapping的action.state,在zk上更新,Action.state符合preparing或者prepared的时候,要调用Topom的resyncSlotMappings方法,根据SlotMapping的参数同步到models.Slot(ctx.toSlotSlice方法),再同步到Proxy.Slot(Router.fillslot方法),这个过程中,每个Slot都被分配了backendConn,这个backendConn是从Proxy.Router.pool中取出来的(没有就新建,再放到pool中)。
上面的迁移准备工作完成之后,再逐个处理每一个slot的迁移。只有一个slot本身所在的group以及目标group的Promoting.State都为空时,才可以做迁移。槽的迁移是由Topom.action.redisp里面的from client来进行的,分为sync和semi-sync。每一个slot迁移完成之后,再调用SlotActionComplete,推进slot的action.state,更新zk上的信息,并调用resyncSlotMappings同步集群中的slot信息,例如释放掉migrate.bc。
说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/77277540
本例中有两个group,将之前auto-rebalance过的slot(0-511属于group1,512-1023属于group2)
现在将slot 400到500移动到group2。我们之前走到了
s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)
首先new一个ApiClient,里面存储了proxy的地址,以及xauth信息(xauth是根据ProductName,ProductAuth以及proxy的token生成的)。下面调用Proxy的FillSlots方法,填充slot
//这里传过来的slots实际上只有一个,就是整个集群中最小的Action.State对应的Slotmapping func (s *Proxy) FillSlots(slots []*models.Slot) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return ErrClosedProxy } for _, m := range slots { if err := s.router.FillSlot(m); err != nil { return err } } return nil }
func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) { slot := &s.slots[m.Id] //将slot的lock.hold属性设为true,并加锁 slot.blockAndWait() //这里的bc就是我们之前提到过的处理redis请求的sharedBackendConn。第一次进来的时候,backend为空,这里直接返回 slot.backend.bc.Release() slot.backend.bc = nil slot.backend.id = 0 slot.migrate.bc.Release() slot.migrate.bc = nil slot.migrate.id = 0 for i := range slot.replicaGroups { for _, bc := range slot.replicaGroups[i] { bc.Release() } } slot.replicaGroups = nil slot.switched = switched //这里的addr就是我们的redis服务器地址,10.0.2.15:6379 if addr := m.BackendAddr; len(addr) != 0 { //根据地址获得sharedBackendConn slot.backend.bc = s.pool.primary.Retain(addr) slot.backend.id = m.BackendAddrGroupId } if from := m.MigrateFrom; len(from) != 0 { slot.migrate.bc = s.pool.primary.Retain(from) slot.migrate.id = m.MigrateFromGroupId } if !s.config.BackendPrimaryOnly { for i := range m.ReplicaGroups { var group []*sharedBackendConn for _, addr := range m.ReplicaGroups[i] { group = append(group, s.pool.replica.Retain(addr)) } if len(group) == 0 { continue } slot.replicaGroups = append(slot.replicaGroups, group) } } if method != nil { slot.method = method } if !m.Locked { slot.unblock() } if !s.closed { if slot.migrate.bc != nil { if switched { log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t, +switched", slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold) } else { log.Warnf("fill slot %04d, backend.addr = %s, migrate.from = %s, locked = %t", slot.id, slot.backend.bc.Addr(), slot.migrate.bc.Addr(), slot.lock.hold) } } else { if switched { log.Warnf("fill slot %04d, backend.addr = %s, locked = %t, +switched", slot.id, slot.backend.bc.Addr(), slot.lock.hold) } else { log.Warnf("fill slot %04d, backend.addr = %s, locked = %t", slot.id, slot.backend.bc.Addr(), slot.lock.hold) } } } }
上面的根据addr取得sharedBackendConnPool,是先从Router.pool.pimary这个sharedBackendConnPool中取,如果能取出就直接取,缓存中为空的话就新建
func (p *sharedBackendConnPool) Retain(addr string) *sharedBackendConn { if bc := p.pool[addr]; bc != nil { return bc.Retain() } else { bc = newSharedBackendConn(addr, p) p.pool[addr] = bc return bc } } //更新每个sharedBackendConn的引用次数 func (s *sharedBackendConn) Retain() *sharedBackendConn { if s == nil { return nil } if s.refcnt <= 0 { log.Panicf("shared backend conn has been closed") } else { s.refcnt++ } return s } func (s *haredBackendConn) Release() { if s == nil { return } if s.refcnt <= 0 { log.Panicf("shared backend conn has been closed, close too many times") } else { s.refcnt-- } if s.refcnt != 0 { return } for _, parallel := range s.conns { for _, bc := range parallel { bc.Close() } } delete(s.owner.pool, s.addr) }
之前我们说过,具体的redis请求都是通过sharedBackendConn来处理的,而新建sharedBackendConn就在这个地方。根据addr(也就是”10.0.2.15:6379”)获取sharedBackendConn这个过程比较复杂,详见Codis源码解析——sharedBackendConn
经过上面的过程之后,slot已经被填充了。slot的conns属性就是与redis16个库的连接
下一步,再回到之前的真正对slot做操作的一步。即上一篇中的,取出plans,对每个slot进行处理。以id为510的slot为例,这个slot要从group1被迁移到group2
for sid, _ := range plans { fut.Add() go func(sid int) { log.Warnf("slot-[%d] process action", sid) //针对每个slot做处理 var err = s.processSlotAction(sid) if err != nil { status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err) s.action.progress.status.Store(status) } else { s.action.progress.status.Store("") } //在Future的vmap中存储slotId和对应的error,并调用WaitGroup.Done fut.Done(strconv.Itoa(sid), err) }(sid) }
func (s *Topom) processSlotAction(sid int) error { var db int = 0 for s.IsOnline() { //返回的exec就是具体的slot操作执行函数 if exec, err := s.newSlotActionExecutor(sid); err != nil { return err } else if exec == nil { time.Sleep(time.Second) } else { n, nextdb, err := exec(db) if err != nil { return err } log.Debugf("slot-[%d] action executor %d", sid, n) if n == 0 && nextdb == -1 { return s.SlotActionComplete(sid) } status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n) s.action.progress.status.Store(status) if us := s.GetSlotActionInterval(); us != 0 { time.Sleep(time.Microsecond * time.Duration(us)) } db = nextdb } } return nil } func (s *Topom) newSlotActionExecutor(sid int) (func(db int) (remains int, nextdb int, err error), error) { s.mu.Lock() defer s.mu.Unlock() ctx, err := s.newContext() if err != nil { return nil, err } //根据slot的id获取SlotMapping,主要方法就是return ctx.slots[sid], nil m, err := ctx.getSlotMapping(sid) if err != nil { return nil, err } switch m.Action.State { //最初slot还处在迁移过程中,即migrating case models.ActionMigrating: if s.action.disabled.IsTrue() { return nil, nil } //查看m所在的group,如果存在group,而且其Promoting.State不为空字符串,就返回true;否则返回false if ctx.isGroupPromoting(m.GroupId) { return nil, nil } if ctx.isGroupPromoting(m.Action.TargetId) { return nil, nil } //迁移过程中,一个slot本身所在的group以及目标group的Promoting.State都必须为空才可以做迁移 from := ctx.getGroupMaster(m.GroupId) //取出group2的第一个server,也就是master,dest是"10.0.2.15:6380" dest := ctx.getGroupMaster(m.Action.TargetId) //Topom的action中的计数器加一 s.action.executor.Incr() return func(db int) (int, int, error) { //每执行一个槽的迁移操作,Topom的action中的计数器就减1 defer s.action.executor.Decr() if from == "" { return 0, -1, nil } //从cache中得到group1的redisClient,这个client由addr(10.0.2.15:6379), auth, timeout,Database,redigo.Conn组成;如果cache没有,就新建。详见文末的Client c, err := s.action.redisp.GetClient(from) if err != nil { return 0, -1, err } //将刚才新建的或者从cache中取出的redis client再put到Topom.action.redisp中 defer s.action.redisp.PutClient(c) //这里db是0,相当于redis从16个库中选择0号 if err := c.Select(db); err != nil { return 0, -1, err } var do func() (int, error) method, _ := models.ParseForwardMethod(s.config.MigrationMethod) switch method { case models.ForwardSync: do = func() (int, error) { //调用redis的SLOTSMGRTTAGSLOT命令,随机选择当前slot的一个key,并将与这个key的tag相同的k-v全部迁移到目标机 return c.MigrateSlot(sid, dest) } case models.ForwardSemiAsync: var option = &redis.MigrateSlotAsyncOption{ MaxBulks: s.config.MigrationAsyncMaxBulks, MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(), NumKeys: s.config.MigrationAsyncNumKeys, Timeout: math2.MinDuration(time.Second*5, s.config.MigrationTimeout.Duration()), } do = func() (int, error) { //调用redis的SLOTSMGRTTAGSLOT-ASYNC命令,参数是target redis的ip和port,也就是10.0.2.15和6380 return c.MigrateSlotAsync(sid, dest, option) } default: log.Panicf("unknown forward method %d", int(method)) } n, err := do() if err != nil { return 0, -1, err } else if n != 0 { return n, db, nil } nextdb := -1 //通过info命令查keyspace信息并做处理,这里取出的m为空 m, err := c.InfoKeySpace() if err != nil { return 0, -1, err } for i := range m { if (nextdb == -1 || i < nextdb) && db < i { nextdb = i } } //返回的nextdb是-1 return 0, nextdb, nil }, nil case models.ActionFinished: return func(int) (int, int, error) { return 0, -1, nil }, nil default: return nil, errors.Errorf("slot-[%d] action state is invalid", m.Id) } } func (s *Topom) SlotActionComplete(sid int) error { s.mu.Lock() defer s.mu.Unlock() ctx, err := s.newContext() if err != nil { return err } m, err := ctx.getSlotMapping(sid) if err != nil { return err } log.Warnf("slot-[%d] action complete:\n%s", m.Id, m.Encode()) switch m.Action.State { //首先状态是migrating case models.ActionMigrating: defer s.dirtySlotsCache(m.Id) //推进到finished m.Action.State = models.ActionFinished if err := s.storeUpdateSlotMapping(m); err != nil { return err } fallthrough case models.ActionFinished: log.Warnf("slot-[%d] resync to finished", m.Id) //这个方法在上一节介绍过,本节就不再赘述了 if err := s.resyncSlotMappings(ctx, m); err != nil { log.Warnf("slot-[%d] resync to finished failed", m.Id) return err } defer s.dirtySlotsCache(m.Id) m = &models.SlotMapping{ Id: m.Id, GroupId: m.Action.TargetId, } return s.storeUpdateSlotMapping(m) default: return errors.Errorf("slot-[%d] action state is invalid", m.Id) } }
在Codis中,对于redis的操作都是通过Client中的redigo.Conn(一个第三方redis客户端)发命令的。如下所示
type Client struct { conn redigo.Conn Addr string Auth string Database int LastUse time.Time Timeout time.Duration } func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error) { r, err := c.conn.Do(cmd, args...) if err != nil { return nil, errors.Trace(err) } c.LastUse = time.Now() if err, ok := r.(redigo.Error); ok { return nil, errors.Trace(err) } return r, nil }
到这里,slot的操作就结束了。在页面中可以看到slot已经迁移成功
从dashboard的日志中,我们也能了解到,首先是将所有slot的action.state设为pending,然后再逐个迁移每个slot。下面以id为1022的slot迁移到group1举例
//之前已经先把所有slot的action.state设为pending . . . 2017/10/26 10:04:45 topom_slots.go:240: [WARN] slot-[1022] action prepare: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "pending", "target_id": 1 } } 2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "preparing", "target_id": 1 } } 2017/10/26 10:04:45 topom_slots.go:259: [WARN] slot-[1022] resync to prepared 2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "prepared", "target_id": 1 } } 2017/10/26 10:04:45 topom_slots.go:279: [WARN] slot-[1022] resync to migrating 2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "migrating", "target_id": 1 } } 2017/10/26 10:04:45 topom_action.go:56: [WARN] slot-[1022] process action 2017/10/26 10:04:45 topom_slots.go:320: [WARN] slot-[1022] action complete: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "migrating", "target_id": 1 } } 2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]: { "id": 1022, "group_id": 0, "action": { "index": 1023, "state": "finished", "target_id": 1 } } 2017/10/26 10:04:45 topom_slots.go:337: [WARN] slot-[1022] resync to finished 2017/10/26 10:04:45 topom_cache.go:161: [WARN] update slot-[1022]: { "id": 1022, "group_id": 1, "action": {} }
总结一下,slot的迁移工作是很复杂的。对集群中的slot手动进行rebalance之后,每个slot都被指定了相应的迁移计划。对集群中的slot做SlotActionPrepareFilter处理,先找Action.State既不为空也不是pending的SlotMapping中Action.Index最小的SlotMapping,找不到的话就去找Action.State为pending的SlotMapping中Action.Index最小的SlotMapping,找到之后逐个变更每个SlotMapping的action.state,在zk上更新,Action.state符合preparing或者prepared的时候,要调用Topom的resyncSlotMappings方法,根据SlotMapping的参数同步到models.Slot(ctx.toSlotSlice方法),再同步到Proxy.Slot(Router.fillslot方法),这个过程中,每个Slot都被分配了backendConn,这个backendConn是从Proxy.Router.pool中取出来的(没有就新建,再放到pool中)。
上面的迁移准备工作完成之后,再逐个处理每一个slot的迁移。只有一个slot本身所在的group以及目标group的Promoting.State都为空时,才可以做迁移。槽的迁移是由Topom.action.redisp里面的from client来进行的,分为sync和semi-sync。每一个slot迁移完成之后,再调用SlotActionComplete,推进slot的action.state,更新zk上的信息,并调用resyncSlotMappings同步集群中的slot信息,例如释放掉migrate.bc。
说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/77277540
相关文章推荐
- Codis源码解析——处理slot操作(1)
- Codis源码解析——slot的分配
- Android中handler消息处理机制完全解析,带你从源码的角度彻底理解
- nginx事件处理框架及源码解析
- kafka源码解析之五Broker处理的request的来源
- [分布式监控CAT] Server端源码解析——消息消费\报表处理\展示
- Android异步消息处理机制完全解析,带你从源码的角度彻底理解
- ArrayList源码解析(二)自动扩容机制与add操作
- libjingle源码解析(4)-【PseudoTcp】建立UDP之上的TCP(2):对交互数据流的处理
- Spring对注解(Annotation)处理源码分析2——解析和注入注解配置的资源
- spring boot 源码解析14-默认错误页面处理流程, 自定义,及EnableAutoConfigurationImportSelector处理
- 传智播客strus2源码解析~~~超NBstruts2错误提示后chain操作
- Android异步消息处理机制完全解析,带你从源码的角度彻底理解
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Android异步消息处理机制完全解析,带你从源码的角度彻底理解
- HDFS原理解析(总体架构,读写操作流程及源码查看等)
- Scala深入浅出进阶经典 第65讲:Scala中隐式转换内幕操作规则揭秘、最佳实践及其在Spark中的应用源码解析
- spring xd 源码解析-job 操作过程
- Codis源码解析——sharedBackendConn
- jstorm源码解析之bolt异常处理方法