Codis源码解析——dashboard的启动(1)
2017-07-23 18:23
836 查看
dashboard是codis的集群管理工具,支持proxy和server的添加、删除、数据迁移,所有对集群的操作必须通过dashboard。dashboard的启动过程和proxy类似。dashboard的启动只是初始化一些必要的数据结构,复杂的在于对集群的操作,这个日后的文章会有详细的描述,本文先不管这些。
启动的时候,首先读取配置文件,填充config信息。根据coordinator的信息,如果是zookeeper而不是etcd的话,就创建一个zk客户端。然后根据client和config创建一个topom。首先来看看topom中有哪些信息。这个类在/pkg/topom/topom.go中。Topom非常重要,这个结构里面存储了集群中某一时刻所有的节点信息,在深入codis的过程中我们会逐步看到
创建topom的方法如下所示,这里传入的client,是根据coordinator创建的zkClient
新建redis pool的方法在/pkg/utils/redis/client.go中。auth如果在配置文件中没有设置,就是一个空字符串。timeout时间的单位是纳秒,配置文件中默认是30s,也就是3乘以10的10次方纳秒。
如果连接池收到退出的消息,就直接return,并且每隔一分钟清理连接池中的数据。清理规则是,从当前Pool的pool中,遍历取出每个pool属性。前面已经说过,这个pool属性其实就是map[string]*list.List,从每个list中取出头一个元素,转为Client类型,判断是否还是可再利用的,如果是可再利用的,就重新将该Client放回到队列的尾部。可再利用的规则是,如果Pool的timeout为0,或者该Client上次距离最近一次被引用到现在的时间小于Pool的timeout,就是可再利用的。
细心的读者可能已经发现,上一步初始化的redis pool是Topom.action.Pool,在Topom中实际上还有另外两个池,分别在stats和ha中。可以看一下redis pool此时的结构,在dashboard的初始化过程中,这三个池都只是把基础的数据结构建好。
与之前初始化Proxy的方式类似,现在我们正在初始化的结构是/pkg/topom/topom.go中的结构,而/pkg/models/topom.go中存储了系统的相关信息。接下来几步,在/pkg/models/topom.go中填充相关信息,初始化完成之后可以在zk中看到。
创建Topom的最后两步,监听、并得到路由handler
以上两步的处理方式和proxy的启动中类似,监听18080端口(dashboard与codis集群交互的默认接口),并采用martini框架对发送过来的请求进行转发
topom初始化成功之后,看一下控制台上打印的日志。
创建Topom之后,下一步就是创建一个channel,专门用来接收系统signal。这个signal在随不同的系统而变化。着重说一下signal.Notify方法。第一个参数是channel,后面的可变参数是往channel中写入的信号。如果没有制定任何信号参数,就默认所有收到的信号都会写入channel。
这样做的目的是什么呢?
比方说,当你强行停止掉dashboard进程,console上就会出现日志
然后调用defer s.Close()来删除dashboard在zk的注册路径,下次启动dashboard就不会报acquire lock of codis-demo failed的错。
到这里,Topom创建结束,下一步就是传入固定参数true,调用Topom的启动方法。
很多人启动dashboard的时候会报错acquire lock of codis-demo failed,就是这里报的错。意思是说,创建路径filepath.Join(CodisDir, product, “topom”)错误,报了node already exits。通常解决这个问题的方式就是递归删除codis3文件夹的内容,然后重新创建
之前我们已经说过,整个集群的操作和管理都要经过dashboard,因此dashboard中必须存有集群状态。codis如何处理状态缓存的有效性和过期问题呢?没错,就是上面代码中看起来很像的四个Goroutine。前两个方法的具体实现在/pkg/topom/topom_stats.go中,后两个方法的具体实现则是在/pkg/topom/topom_action.go。在下一节 Codis源码解析——dashboard的启动(2)我们具体讲这四个方法的实现
总结一下,启动dashboard的过程中,需要连接zk,创建Topom这个struct,通过18080这个端口与集群进项交互,并将该端口收到的信息进行转发。最重要的是启动了四个goroutine,刷新集群中的redis和proxy的状态,以及处理slot和同步操作。
说明
如有转载,请注明出处:
http://blog.csdn.net/antony9118/article/details/75944458
启动的时候,首先读取配置文件,填充config信息。根据coordinator的信息,如果是zookeeper而不是etcd的话,就创建一个zk客户端。然后根据client和config创建一个topom。首先来看看topom中有哪些信息。这个类在/pkg/topom/topom.go中。Topom非常重要,这个结构里面存储了集群中某一时刻所有的节点信息,在深入codis的过程中我们会逐步看到
type Topom struct { mu sync.Mutex //初始化之后,这个属性中的信息可以在zk中看到,就像models.Proxy一样 //路径是/codis3/codis-wujiang/topom model *models.Topom //存储着zkClient以及product-name,Topom与zk交互都是通过这个store store *models.Store //缓存结构,如果缓存为空就通过store从zk中取出slot的信息并填充cache //不是只有第一次启动的时候cache会为空,如果集群中的元素(server,slot等等)发生变化,都会调用dirtyCache,将cache中的信息置为nil //这样下一次调用s.newContext()获取上下文信息获取上下文信息的时候,就会通过Topom.store从zk中重新拉取 cache struct { hooks list.List slots []*models.SlotMapping group map[int]*models.Group proxy map[string]*models.Proxy sentinel *models.Sentinel } exit struct { C chan struct{} } //与dashboard相关的所有配置信息 config *Config online bool closed bool ladmin net.Listener //槽进行迁移的时候使用 action struct { //这个pool,其实就是map[string]*list.List,用于保存redis的结构,里面有addr,auth和Timeout。相当于缓存,需要的时候从这里取,否则就新建然后put进来 //键为redis服务器的地址,值为与这台服务器建立的连接,过期的连接会被删除 //timeout为配置文件dashboard.toml中的migration_timeout选项所配 redisp *redis.Pool interval atomic2.Int64 disabled atomic2.Bool progress struct { status atomic.Value } //一个计数器,有一个slot等待迁移,就加一;执行一个slot的迁移,就减一 executor atomic2.Int64 } //存储集群中redis和proxy详细信息,goroutine每次刷新redis和proxy之后,都会将结果存在这里 stats struct { //timeout为5秒 redisp *redis.Pool servers map[string]*RedisStats proxies map[string]*ProxyStats } //这个在使用哨兵的时候会用到,存储在fe中配置的哨兵以及哨兵所监控的redis主服务器 ha struct { //timeout为5秒 redisp *redis.Pool monitor *redis.Sentinel masters map[int]string } }
创建topom的方法如下所示,这里传入的client,是根据coordinator创建的zkClient
func New(client models.Client, config *Config) (*Topom, error) { //配置文件校验 if err := config.Validate(); err != nil { return nil, errors.Trace(err) } if err := models.ValidateProduct(config.ProductName); err != nil { return nil, errors.Trace(err) } s := &Topom{} s.config = config s.exit.C = make(chan struct{}) //新建redis pool s.action.redisp = redis.NewPool(config.ProductAuth, config.MigrationTimeout.Duration()) s.action.progress.status.Store("") s.ha.redisp = redis.NewPool("", time.Second*5) s.model = &models.Topom{ StartTime: time.Now().String(), } s.model.ProductName = config.ProductName s.model.Pid = os.Getpid() s.model.Pwd, _ = os.Getwd() if b, err := exec.Command("uname", "-a").Output(); err != nil { log.WarnErrorf(err, "run command uname failed") } else { s.model.Sys = strings.TrimSpace(string(b)) } s.store = models.NewStore(client, config.ProductName) s.stats.redisp = redis.NewPool(config.ProductAuth, time.Second*5) s.stats.servers = make(map[string]*RedisStats) s.stats.proxies = make(map[string]*ProxyStats) if err := s.setup(config); err != nil { s.Close() return nil, err } log.Warnf("create new topom:\n%s", s.model.Encode()) go s.serveAdmin() return s, nil }
新建redis pool的方法在/pkg/utils/redis/client.go中。auth如果在配置文件中没有设置,就是一个空字符串。timeout时间的单位是纳秒,配置文件中默认是30s,也就是3乘以10的10次方纳秒。
如果连接池收到退出的消息,就直接return,并且每隔一分钟清理连接池中的数据。清理规则是,从当前Pool的pool中,遍历取出每个pool属性。前面已经说过,这个pool属性其实就是map[string]*list.List,从每个list中取出头一个元素,转为Client类型,判断是否还是可再利用的,如果是可再利用的,就重新将该Client放回到队列的尾部。可再利用的规则是,如果Pool的timeout为0,或者该Client上次距离最近一次被引用到现在的时间小于Pool的timeout,就是可再利用的。
func NewPool(auth string, timeout time.Duration) *Pool { p := &Pool{ auth: auth, timeout: timeout, pool: make(map[string]*list.List), } p.exit.C = make(chan struct{}) if timeout != 0 { go func() { var ticker = time.NewTicker(time.Minute) defer ticker.Stop() for { select { case <-p.exit.C: return case <-ticker.C: //每隔一分钟清理Pool中无效的Client p.Cleanup() } } }() } return p } //RedisClient结构,对于每台redis服务器,都会有多个连接,过期的连接将会被清除 type Client struct { conn redigo.Conn Addr string Auth string Database int //上次使用时间,用于看某个client是否应该被回收 LastUse time.Time Timeout time.Duration }
细心的读者可能已经发现,上一步初始化的redis pool是Topom.action.Pool,在Topom中实际上还有另外两个池,分别在stats和ha中。可以看一下redis pool此时的结构,在dashboard的初始化过程中,这三个池都只是把基础的数据结构建好。
与之前初始化Proxy的方式类似,现在我们正在初始化的结构是/pkg/topom/topom.go中的结构,而/pkg/models/topom.go中存储了系统的相关信息。接下来几步,在/pkg/models/topom.go中填充相关信息,初始化完成之后可以在zk中看到。
创建Topom的最后两步,监听、并得到路由handler
//Topom的ladmin监听配置文件中的admin_addr,生成Token和Xauth if err := s.setup(config); err != nil { s.Close() return nil, err } log.Warnf("create new topom:\n%s", s.model.Encode()) //采用martini框架,得到路由,并从路由得到handler。这一步的原理和proxy的类似,就不再赘述 go s.serveAdmin()
以上两步的处理方式和proxy的启动中类似,监听18080端口(dashboard与codis集群交互的默认接口),并采用martini框架对发送过来的请求进行转发
func (s *Topom) setup(config *Config) error { if l, err := net.Listen("tcp", config.AdminAddr); err != nil { return errors.Trace(err) } else { s.ladmin = l x, err := utils.ReplaceUnspecifiedIP("tcp", l.Addr().String(), s.config.HostAdmin) if err != nil { return err } s.model.AdminAddr = x } s.model.Token = rpc.NewToken( config.ProductName, s.ladmin.Addr().String(), ) s.xauth = rpc.NewXAuth(config.ProductName) return nil } func (s *Topom) serveAdmin() { if s.IsClosed() { return } defer s.Close() log.Warnf("admin start service on %s", s.ladmin.Addr()) eh := make(chan error, 1) go func(l net.Listener) { h := http.NewServeMux() h.Handle("/", newApiServer(s)) hs := &http.Server{Handler: h} eh <- hs.Serve(l) }(s.ladmin) select { case <-s.exit.C: log.Warnf("admin shutdown") case err := <-eh: log.ErrorErrorf(err, "admin exit on error") } } func newApiServer(t *Topom) http.Handler { m := martini.New() m.Use(martini.Recovery()) m.Use(render.Renderer()) m.Use(func(w http.ResponseWriter, req *http.Request, c martini.Context) { path := req.URL.Path if req.Method != "GET" && strings.HasPrefix(path, "/api/") { var remoteAddr = req.RemoteAddr var headerAddr string for _, key := range []string{"X-Real-IP", "X-Forwarded-For"} { if val := req.Header.Get(key); val != "" { headerAddr = val break } } log.Warnf("[%p] API call %s from %s [%s]", t, path, remoteAddr, headerAddr) } c.Next() }) m.Use(gzip.All()) m.Use(func(c martini.Context, w http.ResponseWriter) { w.Header().Set("Content-Type", "application/json; charset=utf-8") }) api := &apiServer{topom: t} r := martini.NewRouter() r.Get("/", func(r render.Render) { r.Redirect("/topom") }) r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) { http.DefaultServeMux.ServeHTTP(w, req) }) r.Group("/topom", func(r martini.Router) { r.Get("", api.Overview) r.Get("/model", api.Model) r.Get("/stats", api.StatsNoXAuth) r.Get("/slots", api.SlotsNoXAuth) }) r.Group("/api/topom", func(r martini.Router) { r.Get("/model", api.Model) r.Get("/xping/:xauth", api.XPing) r.Get("/stats/:xauth", api.Stats) r.Get("/slots/:xauth", api.Slots) r.Put("/reload/:xauth", api.Reload) r.Put("/shutdown/:xauth", api.Shutdown) r.Put("/loglevel/:xauth/:value", api.LogLevel) r.Group("/proxy", func(r martini.Router) { r.Put("/create/:xauth/:addr", api.CreateProxy) r.Put("/online/:xauth/:addr", api.OnlineProxy) r.Put("/reinit/:xauth/:token", api.ReinitProxy) r.Put("/remove/:xauth/:token/:force", api.RemoveProxy) }) r.Group("/group", func(r martini.Router) { r.Put("/create/:xauth/:gid", api.CreateGroup) r.Put("/remove/:xauth/:gid", api.RemoveGroup) r.Put("/resync/:xauth/:gid", api.ResyncGroup) r.Put("/resync-all/:xauth", api.ResyncGroupAll) r.Put("/add/:xauth/:gid/:addr", api.GroupAddServer) r.Put("/add/:xauth/:gid/:addr/:datacenter", api.GroupAddServer) r.Put("/del/:xauth/:gid/:addr", api.GroupDelServer) r.Put("/promote/:xauth/:gid/:addr", api.GroupPromoteServer) r.Put("/replica-groups/:xauth/:gid/:addr/:value", api.EnableReplicaGroups) r.Put("/replica-groups-all/:xauth/:value", api.EnableReplicaGroupsAll) r.Group("/action", func(r martini.Router) { r.Put("/create/:xauth/:addr", api.SyncCreateAction) r.Put("/remove/:xauth/:addr", api.SyncRemoveAction) }) r.Get("/info/:addr", api.InfoServer) }) r.Group("/slots", func(r martini.Router) { r.Group("/action", func(r martini.Router) { r.Put("/create/:xauth/:sid/:gid", api.SlotCreateAction) r.Put("/create-some/:xauth/:src/:dst/:num", api.SlotCreateActionSome) r.Put("/create-range/:xauth/:beg/:end/:gid", api.SlotCreateActionRange) r.Put("/remove/:xauth/:sid", api.SlotRemoveAction) r.Put("/interval/:xauth/:value", api.SetSlotActionInterval) r.Put("/disabled/:xauth/:value", api.SetSlotActionDisabled) }) r.Put("/assign/:xauth", binding.Json([]*models.SlotMapping{}), api.SlotsAssignGroup) r.Put("/assign/:xauth/offline", binding.Json([]*models.SlotMapping{}), api.SlotsAssignOffline) r.Put("/rebalance/:xauth/:confirm", api.SlotsRebalance) }) r.Group("/sentinels", func(r martini.Router) { r.Put("/add/:xauth/:addr", api.AddSentinel) r.Put("/del/:xauth/:addr/:force", api.DelSentinel) r.Put("/resync-all/:xauth", api.ResyncSentinels) r.Get("/info/:addr", api.InfoSentinel) r.Get("/info/:addr/monitored", api.InfoSentinelMonitored) }) }) m.MapTo(r, (*martini.Routes)(nil)) m.Action(r.Handle) return m }
topom初始化成功之后,看一下控制台上打印的日志。
创建Topom之后,下一步就是创建一个channel,专门用来接收系统signal。这个signal在随不同的系统而变化。着重说一下signal.Notify方法。第一个参数是channel,后面的可变参数是往channel中写入的信号。如果没有制定任何信号参数,就默认所有收到的信号都会写入channel。
go func() { defer s.Close() c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM) //将收到的系统信号读取并打印在日志 sig := <-c log.Warnf("[%p] dashboard receive signal = '%v'", s, sig) }()
这样做的目的是什么呢?
比方说,当你强行停止掉dashboard进程,console上就会出现日志
Process finished with exit code 137 (interrupted by signal 9: SIGKILL)
然后调用defer s.Close()来删除dashboard在zk的注册路径,下次启动dashboard就不会报acquire lock of codis-demo failed的错。
到这里,Topom创建结束,下一步就是传入固定参数true,调用Topom的启动方法。
很多人启动dashboard的时候会报错acquire lock of codis-demo failed,就是这里报的错。意思是说,创建路径filepath.Join(CodisDir, product, “topom”)错误,报了node already exits。通常解决这个问题的方式就是递归删除codis3文件夹的内容,然后重新创建
cd /app/zookeeper-3.4.6/bin/ ./zkCli.sh rmr /codis3
func (s *Topom) Start(routines bool) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return ErrClosedTopom } if s.online { return nil } else { //创建zk路径 if err := s.store.Acquire(s.model); err != nil { log.ErrorErrorf(err, "store: acquire lock of %s failed", s.config.ProductName) return errors.Errorf("store: acquire lock of %s failed", s.config.ProductName) } s.online = true } if !routines { return nil } go func() { for !s.IsClosed() { if s.IsOnline() { //刷新redis状态 w, _ := s.RefreshRedisStats(time.Second) if w != nil { w.Wait() } } time.Sleep(time.Second) } }() go func() { for !s.IsClosed() { if s.IsOnline() { //刷新proxy状态 w, _ := s.RefreshProxyStats(time.Second) if w != nil { w.Wait() } } time.Sleep(time.Second) } }() go func() { for !s.IsClosed() { if s.IsOnline() { //处理slot操作 if err := s.ProcessSlotAction(); err != nil { log.WarnErrorf(err, "process slot action failed") time.Sleep(time.Second * 5) } } time.Sleep(time.Second) } }() go func() { for !s.IsClosed() { if s.IsOnline() { //处理同步操作 if err := s.ProcessSyncAction(); err != nil { log.WarnErrorf(err, "process sync action failed") time.Sleep(time.Second * 5) } } time.Sleep(time.Second) } }() return nil }
之前我们已经说过,整个集群的操作和管理都要经过dashboard,因此dashboard中必须存有集群状态。codis如何处理状态缓存的有效性和过期问题呢?没错,就是上面代码中看起来很像的四个Goroutine。前两个方法的具体实现在/pkg/topom/topom_stats.go中,后两个方法的具体实现则是在/pkg/topom/topom_action.go。在下一节 Codis源码解析——dashboard的启动(2)我们具体讲这四个方法的实现
总结一下,启动dashboard的过程中,需要连接zk,创建Topom这个struct,通过18080这个端口与集群进项交互,并将该端口收到的信息进行转发。最重要的是启动了四个goroutine,刷新集群中的redis和proxy的状态,以及处理slot和同步操作。
说明
如有转载,请注明出处:
http://blog.csdn.net/antony9118/article/details/75944458
相关文章推荐
- Codis源码解析——dashboard的启动(2)
- Codis源码解析——proxy的启动
- Codis源码解析——fe的启动
- SuperSocket源码解析之启动过程
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(一)之 ServerConfig
- android源码解析之(十一)-->应用进程启动流程
- Netty 4源码解析:服务端启动
- Spring源码解析-Web容器启动过程
- tomcat的启动过程(Tomcat源码解析(三))
- Tomcat启动解析web.xml源码分析
- MTK Kernel启动流程源码解析 4 start_kernel 上
- 源码解析:Activity启动过程全解析
- Flume-ng源码解析之启动流程
- Spring源码解析之:Spring Security启动细节和工作模式--转载
- Android源码基础解析之系统启动并解析Manifest的流程
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- 注册中心 Eureka 源码解析 —— Eureka-Server 启动(一)之 ServerConfig
- Tomcat源码解析(十):启动和关闭
- Android应用程序的启动过程源码解析—点击图标启动过程
- RocketMQ源码解析-Consumer启动(2)