Codis源码解析——dashboard的启动(2)
2017-07-24 20:07
375 查看
1 刷新redis状态
首先认识两个重要的structtype Future struct { sync.Mutex wait sync.WaitGroup vmap map[string]interface{} } type RedisStats struct { //储存了集群中Redis服务器的各种信息和统计数值,详见redis的info命令 Stats map[string]string `json:"stats,omitempty"` Error *rpc.RemoteError `json:"error,omitempty"` Sentinel map[string]*redis.SentinelGroup `json:"sentinel,omitempty"` UnixTime int64 `json:"unixtime"` Timeout bool `json:"timeout,omitempty"` }
接下来看看dashboard如何刷新redis状态
func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error) { s.mu.Lock() defer s.mu.Unlock() //从缓存中读出slots,group,proxy,sentinel等信息封装在context struct中 ctx, err := s.newContext() if err != nil { return nil, err } var fut sync2.Future goStats := func(addr string, do func(addr string) (*RedisStats, error)) { fut.Add() go func() { stats := s.newRedisStats(addr, timeout, do) stats.UnixTime = time.Now().Unix() //vmap中添加键为addr,值为RedisStats的map fut.Done(addr, stats) }() } //遍历ctx中的group,再遍历每个group中的Server。如果对group和Server结构不清楚的,可以看看/pkg/models/group.go文件 //每个Group除了id,还有一个属性就是GroupServer。每个GroupServer有自己的地址、数据中心、action等等 for _, g := range ctx.group { for _, x := range g.Servers { goStats(x.Addr, func(addr string) (*RedisStats, error) { //前面我们已经说过,Topom中有三个redis pool,分别是action,stats,ha。pool本质上就是map[String]*list.List。 //这个是从stats的pool中根据Server的地址从pool中取redis client,如果没有client,就创建 //然后加入到pool里面,并通过Info命令获取详细信息。整个流程和下面的sentinel类似,这里就不放具体的方法实现了 m, err := s.stats.redisp.InfoFull(addr) if err != nil { return nil, err } return &RedisStats{Stats: m}, nil }) } } //通过sentinel维护codis集群中每一组的主备关系 for _, server := range ctx.sentinel.Servers { goStats(server, func(addr string) (*RedisStats, error) { c, err := s.ha.redisp.GetClient(addr) if err != nil { return nil, err } //实际上就是将client加入到Pool的pool属性里面去,pool本质上就是map[String]*list.List //键是client的addr,值是client本身 //如果client不存在,就新建一个空的list defer s.ha.redisp.PutClient(c) m, err := c.Info() if err != nil { return nil, err } sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth) //获得map[string]*SentinelGroup,键是每一组的master的名字,SentinelGroup则是主从对 p, err := sentinel.MastersAndSlavesClient(c) if err != nil { return nil, err } return &RedisStats{Stats: m, Sentinel: p}, nil }) } //前面的所有gostats执行完之后,遍历Future的vmap,将值赋给Topom.stats.servers go func() { stats := make(map[string]*RedisStats) for k, v := range fut.Wait() { stats[k] = v.(*RedisStats) } s.mu.Lock() defer s.mu.Unlock() s.stats.servers = stats }() return &fut, nil } func (p *Pool) GetClient(addr string) (*Client, error) { c, err := p.getClientFromCache(addr) if err != nil || c != nil { return c, err } return NewClient(addr, p.auth, p.timeout) } func (p *Pool) getClientFromCache(addr string) (*Client, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { return nil, ErrClosedPool } if list := p.pool[addr]; list != nil { for i := list.Len(); i != 0; i-- { c := list.Remove(list.Front()).(*Client) //一个client可被回收的条件是,Pool的timeout为0,或者这个client上一次使用距离现在小于Pool.timeout //ha和stats里面的Pool的timeout为5秒,action的则根据配置文件dashboard.toml中的migration_timeout一项来决定 if p.isRecyclable(c) { return c, nil } else { c.Close() } } } return nil, nil } type Client struct { conn redigo.Conn Addr string Auth string Database int LastUse time.Time Timeout time.Duration }
RedisStats中的sentinel如下所示,有几组主备,就有几组SentinelGroup,键是product-name与group-id拼起来的
newContext一步主要就是调用refillCache,重载了四个缓存,分别是refillCacheSlots,refillCacheGroup,refillCacheProxy和refillCacheSentinel。这四个方法基本一致,以refillCacheSlots为例。方法传入的是Topom.cache.slots
type context struct { slots []*models.SlotMapping group map[int]*models.Group proxy map[string]*models.Proxy sentinel *models.Sentinel hosts struct { sync.Mutex m map[string]net.IP } method int } //重新填充topom.cache中的数据,并赋给context结构 func (s *Topom) newContext() (*context, error) { if s.closed { return nil, ErrClosedTopom } if s.online { if err := s.refillCache(); err != nil { return nil, err } else { ctx := &context{} ctx.slots = s.cache.slots ctx.group = s.cache.group ctx.proxy = s.cache.proxy ctx.sentinel = s.cache.sentinel ctx.hosts.m = make(map[string]net.IP) ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod) return ctx, nil } } else { return nil, ErrNotOnline } } func (s *Topom) refillCacheSlots(slots []*models.SlotMapping) ([]*models.SlotMapping, error) { //如果cache中的slots为空,就直接返回store里面的slots if slots == nil { return s.store.SlotMappings() } for i, _ := range slots { //如果cache中的slots[i]不为空,直接进入下一个循环 if slots[i] != nil { continue } //如果slots[i]为空,就从store中取出对应的SlotMapping并赋值给cache中的这个slot m, err := s.store.LoadSlotMapping(i, false) if err != nil { return nil, err } if m != nil { slots[i] = m } else { //如果store中取出的对应的SlotMapping也为空,就新建一个SlotMapping赋值给当前slot slots[i] = &models.SlotMapping{Id: i} } } return slots, nil } func (s *Store) LoadSlotMapping(sid int, must bool) (*SlotMapping, error) { //返回值b是zkClient根据路径转化成的byte数组 b, err := s.client.Read(s.SlotPath(sid), must) if err != nil || b == nil { return nil, err } m := &SlotMapping{} //将byte数组封装在实体类SlotMapping实体类中 if err := jsonDecode(m, b); err != nil { return nil, err } return m, nil } func (s *Store) SlotPath(sid int) string { return SlotPath(s.product, sid) } //这里的codisDir是/codis3 func SlotPath(product string, sid int) string { return filepath.Join(CodisDir, product, "slots", fmt.Sprintf("slot-%04d", sid)) } type SlotMapping struct { Id int `json:"id"` GroupId int `json:"group_id"` Action struct { Index int `json:"index,omitempty"` State string `json:"state,omitempty"` TargetId int `json:"target_id,omitempty"` } `json:"action"` }
总结一下,刷新redis的过程中,首先创建上下文,从cache中读取slots,group,proxy,sentinel等信息,如果读不到就通过store从zk上获取,如果zk中也为空就创建。遍历集群中的redis服务器以及主从关系,创建RedisStats并与addr关联形成map,存储在future的vmap中。全部存储完后,再把vmap写入Topom.stats.servers
我们可以在控制台上打印出Topom.stats.redisp的相关信息。因为goroutine中设置了每个一秒休眠,所以集群的redisp实际上是每秒刷新一次
stats redisp: &{{0 0} map[*.*.*.*:6379:0xc4206933e0 *.*.*.*:6380:0xc420693890 127.0.0.1:6379:0xc4206be540] 5000000000 {0xc420320000} false}
2 刷新proxy状态
刷新proxy状态的代码和刷新redis的类似,就不赘述了。可以参照Codis源码解析——proxy添加到集群最后的步骤
3 处理同步操作
首先要明白,同步操作,指的就是一个group中的主从codis-server服务器之间进行数据的同步,GroupServer是Group的一个属性,标明了当前group中的所有codis-server的地址和action等等信息type Group struct { Id int `json:"id"` Servers []*GroupServer `json:"servers"` Promoting struct { Index int `json:"index,omitempty"` State string `json:"state,omitempty"` } `json:"promoting"` OutOfSync bool `json:"out_of_sync"` } type GroupServer struct { Addr string `json:"server"` DataCenter string `json:"datacenter"` Action struct { Index int `json:"index,omitempty"` State string `json:"state,omitempty"` } `json:"action"` ReplicaGroup bool `json:"replica_group"` }
我们直接看ProcessSyncAction,在/pkg/topom/topom_action.go文件中
func (s *Topom) ProcessSyncAction() error { //同步操作之前的准备工作 addr, err := s.SyncActionPrepare() if err != nil || addr == "" { return err } log.Warnf("sync-[%s] process action", addr) //执行同步操作 exec, err := s.newSyncActionExecutor(addr) if err != nil || exec == nil { return err } return s.SyncActionComplete(addr, exec() != nil) }
同步操作之前的准备工作是,使用s.newContext()获取上下文,从上下文中,遍历每个group中的每个codis-server,从Action.State为pending的codis-server中,选出Action.Index最小的那台服务器,并获取其所在的group,如果这个group的Promoting.State为nothing,这台服务器就可以从主服务器同步数据。将这个codis-server的Action.Index设为0,Action.State设为syncing,更新zk中存储的信息,并将cache中关于这台服务器的信息设为nil,这样下次就会从store中重新载入数据到cache。
下一步,检查当前server在group中的index,如果index不为0,就表示这台server不是group中的主服务器(codis是将group中index为0的那台server作为主的),下一步就是当前server从主服务同步数据,通过redigo发送同步命令
return func() error { c, err := redis.NewClient(addr, s.config.ProductAuth, time.Minute*30) if err != nil { log.WarnErrorf(err, "create redis client to %s failed", addr) return err } defer c.Close() if err := c.SetMaster(master); err != nil { log.WarnErrorf(err, "redis %s set master to %s failed", addr, master) return err } return nil }, nil func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) { c, err := redigo.Dial("tcp", addr, []redigo.DialOption{ redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)), redigo.DialPassword(auth), redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout), }...) if err != nil { return nil, errors.Trace(err) } return &Client{ conn: c, Addr: addr, Auth: auth, LastUse: time.Now(), Timeout: timeout, }, nil } func (c *Client) SetMaster(master string) error { host, port, err := net.SplitHostPort(master) if err != nil { return errors.Trace(err) } c.conn.Send("MULTI") c.conn.Send("CONFIG", "SET", "masterauth", c.Auth) c.conn.Send("SLAVEOF", host, port) c.conn.Send("CONFIG", "REWRITE") c.conn.Send("CLIENT", "KILL", "TYPE", "normal") values, err := redigo.Values(c.Do("EXEC")) if err != nil { return errors.Trace(err) } for _, r := range values { if err, ok := r.(redigo.Error); ok { return errors.Trace(err) } } return nil }
同步之后,会将这台codis-server的Action.State设置为”synced”或者”synced_failed”,并在zk中更新相关信息,抹除cache。
注意,尽管整个过程中,都用了锁,每次还是会检查group的Promoting.State是否nothing,codis-server的Action.Index是否为0,Action.State是否为syncing,只有全部符合才进行同步
4 处理slot操作
对槽的操作是很复杂的,因为有五种状态,挂起、准备中、准备完成、迁移中、迁移完成,这个详见另外两篇博客Codis源码解析——处理slot操作(1) 以及 Codis源码解析——处理slot操作(2)到这里,dashboard的启动工作已经完成,可以看到,dashboard启动过程中,实际上启动了很多goroutine来对后续操作进行处理,这些我们都会在后面的文章的具体章节中做分析,这一节只需要关注到dashboard启动过程中做了什么即可。
说明
如有转载,请注明出处
http://blog.csdn.net/antony9118/article/details/76037488
相关文章推荐
- Codis源码解析——dashboard的启动(1)
- Codis源码解析——fe的启动
- Codis源码解析——proxy的启动
- Android ContentProvider启动流程源码解析(8.0)
- RocketMQ源码解析-Producer启动
- MTK Kernel启动流程源码解析 4 start_kernel 上
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(二)之 EurekaBootStrap
- codis 的dashboard服务无法启动 提示pid已经运行
- Android源码解析四大组件系列(四)---Activity启动详细流程
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- tomcat源码解析(一)——Bootstrap和Catalina启动部分
- Hyperledger Fabric继peer启动之后的源码解析三
- Android源码解析之(八)-->Zygote进程启动流程
- Android Zygote启动流程源码解析
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(二)之 EurekaBootStrap
- Quartz源码解析 ---- 触发器按时启动原理
- Android Zygote启动流程源码解析
- spring启动component-scan类扫描加载,以及@Resource,postConstruct等等注解的解析生效源码
- Android Small插件化框架--启动插件Activity源码解析(上)
- ActivityThread启动源码解析