您的位置:首页 > 编程语言

Kubernetes编排工具-helm源码分析(模板解析流程)

2017-04-01 23:42 591 查看
应用编排一直是docker生态中,大家极力去解决的问题,作为docker生态中,发展最快的调度和编排引擎Kubernetes,其对应用的部署能力离大家的预期还有比较大的提升空间。helm作为Kubernetes一个包管理引擎,基于chart的概念,有效的对Kubernetes上应用的部署进行了优化。Chart通过模板引擎,下方对接Kubernetes中services模型,上端打造包管理仓库。最后的使得Kubernetes中,对应用的部署能够达到像使用apt-get和yum一样简单易用。本文将对helm中,模板引擎的解析流程进行分析,helm版本拉取master上代码,时间为2017年03月30号,当前最新版本为v2.2.3.Helm目录结构和Kubernetes中其他组件目录结构类似,模块的起始位置在cmd文件夹中,实际的工作代码在pkg包中。从目录结构上可以看出,helm包含两个组件helm客户端和tiller服务端。helm客户端主要是将对应分命令进行解析,然后使用gRPC协议发送到服务端处理。这里将分析的重点放在服务端的处理流程,后续的文章中会对helm客户端的解析流程做分析。Tiller主要通过gRPC协议从客户端接收对应的命令,然后通过模板解析,生成对应的yaml文件,再调用kubeClient创建服务。同时,将生成的yaml文件保存到后端存储中,用于版本升级和回退。Tiller的main函数在 cmd/tiller文件中,进行简单的命令解析后,调用cobra包(github.com/spf13/cobra)中的Execute函数。
func main() {
p := rootCommand.PersistentFlags()
p.StringVarP(&grpcAddr, "listen", "l", ":44134", "address:port to listen on")
p.StringVar(&store, "storage", storageConfigMap, "storage driver to use. One of 'configmap' or 'memory'")
p.BoolVar(&enableTracing, "trace", false, "enable rpc tracing")

if err := rootCommand.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
}
var rootCommand = &cobra.Command{
Use:   "tiller",
Short: "The Kubernetes Helm server.",
Long:  globalUsage,
Run:   start,
}
Execute然后中,跳转到start函数,start函数中,开始真正的初始化话逻辑:
func start(c *cobra.Command, args []string) {
clientset, err := kube.New(nil).ClientSet()
if err != nil {
fmt.Fprintf(os.Stderr, "Cannot initialize Kubernetes connection: %s\n", err)
os.Exit(1)
}

switch store {
case storageMemory:
env.Releases = storage.Init(driver.NewMemory())
case storageConfigMap:
env.Releases = storage.Init(driver.NewConfigMaps(clientset.Core().ConfigMaps(namespace())))
}

lstn, err := net.Listen("tcp", grpcAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "Server died: %s\n", err)
os.Exit(1)
}

fmt.Printf("Starting Tiller %s\n", version.GetVersion())
fmt.Printf("GRPC listening on %s\n", grpcAddr)
fmt.Printf("Probes listening on %s\n", probeAddr)
fmt.Printf("Storage driver is %s\n", env.Releases.Name())

if enableTracing {
startTracing(traceAddr)
}

srvErrCh := make(chan error)
probeErrCh := make(chan error)
go func() {
svc := tiller.NewReleaseServer(env, clientset)
services.RegisterReleaseServiceServer(rootServer, svc)
if err := rootServer.Serve(lstn); err != nil {
srvErrCh <- err
}
}()

go func() {
mux := newProbesMux()
if err := http.ListenAndServe(probeAddr, mux); err != nil {
probeErrCh <- err
}
}()

select {
case err := <-srvErrCh:
fmt.Fprintf(os.Stderr, "Server died: %s\n", err)
os.Exit(1)
case err := <-probeErrCh:
fmt.Fprintf(os.Stderr, "Probes server died: %s\n", err)
}
}
start函数中初始化分为如下几步:1、创建一个kubeclient对象,引用helm/kube/client.go2、创建后端存储对象,目前支持的存储有memory和confgmap,并设置到env变量中。3、创建http服务器,用于与helm客户端通信4、创建ReleaseServer对象5、启动协程,进行监听,开启工作循环env对象为全局对象,其中包含yard对象,storage对象和KubeClient。tiller的工作流程,都基于这三个对象展开。
// Environment provides the context for executing a client request.
//
// All services in a context are concurrency safe.
type Environment struct {
// EngineYard provides access to the known template engines.
EngineYard EngineYard
// Releases stores records of releases.
Releases *storage.Storage
// KubeClient is a Kubernetes API client.
KubeClient KubeClient
}
创建的ReleaseServer对象,真正的处理了客户端发送过来的请求ReleaseServer对象中,函数结构如下图所示
从图中可以看出,ReleaseServer对外暴露的函数与helm提供的命令,基本是一一对应的。接下来,选择InstallRelease命令,分析ReleaseServer中的处理流程,下面是InstallRelease 函数的源代码:// InstallRelease installs a release and stores the release record.
func (s *ReleaseServer) InstallRelease(c ctx.Context, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) {rel, err := s.prepareRelease(req)if err != nil {log.Printf("Failed install prepare step: %s", err)res := &services.InstallReleaseResponse{Release: rel}// On dry run, append the manifest contents to a failed release. This is// a stop-gap until we can revisit an error backchannel post-2.0.if req.DryRun && strings.HasPrefix(err.Error(), "YAML parse error") {err = fmt.Errorf("%s\n%s", err, rel.Manifest)}return res, err}res, err := s.performRelease(rel, req)if err != nil {log.Printf("Failed install perform step: %s", err)}return res, err}
InstallRelease中,分为以下几步进行处理:1、install之前的准备,主要完成模板的渲染和组装(s.prepareRelease)2、执行部署操作s.performRelease(rel, req)(1)模板渲染和组装处理源代码如下:
// prepareRelease builds a release for an install operation.func (s *ReleaseServer) prepareRelease(req *services.InstallReleaseRequest) (*release.Release, error) {if req.Chart == nil {return nil, errMissingChart}name, err := s.uniqName(req.Name, req.ReuseName)if err != nil {return nil, err}caps, err := capabilities(s.clientset.Discovery())if err != nil {return nil, err}revision := 1ts := timeconv.Now()options := chartutil.ReleaseOptions{Name:      name,Time:      ts,Namespace: req.Namespace,Revision:  revision,IsInstall: true,}valuesToRender, err := chartutil.ToRenderValuesCaps(req.Chart, req.Values, options, caps)if err != nil {return nil, err}hooks, manifestDoc, notesTxt, err := s.renderResources(req.Chart, valuesToRender, caps.APIVersions)if err != nil {// Return a release with partial data so that client can show debugging// information.rel := &release.Release{Name:      name,Namespace: req.Namespace,Chart:     req.Chart,Config:    req.Values,Info: &release.Info{FirstDeployed: ts,LastDeployed:  ts,Status:        &release.Status{Code: release.Status_UNKNOWN},Description:   fmt.Sprintf("Install failed: %s", err),},Version: 0,}if manifestDoc != nil {rel.Manifest = manifestDoc.String()}return rel, err}// Store a release.rel := &release.Release{Name:      name,Namespace: req.Namespace,Chart:     req.Chart,Config:    req.Values,Info: &release.Info{FirstDeployed: ts,LastDeployed:  ts,Status:        &release.Status{Code: release.Status_UNKNOWN},Description:   "Initial install underway", // Will be overwritten.},Manifest: manifestDoc.String(),Hooks:    hooks,Version:  int32(revision),}if len(notesTxt) > 0 {rel.Info.Status.Notes = notesTxt}err = validateManifest(s.env.KubeClient, req.Namespace, manifestDoc.Bytes())return rel, err}
主要的处理流程为:1、chart和服务信息的预处理2、value中格式的转换,转换成map形式(ToRenderValuesCaps)3、根据chart的原始文件和value表,生成渲染后的结果(renderResources)4、根据渲染后的结果,拼接成release.Release对象5、修改和更新Manifest。release.Release结构如下:
// Release describes a deployment of a chart, together with the chart// and the variables used to deploy that chart.type Release struct {// Name is the name of the releaseName string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`// Info provides information about a releaseInfo *Info `protobuf:"bytes,2,opt,name=info" json:"info,omitempty"`// Chart is the chart that was released.Chart *hapi_chart3.Chart `protobuf:"bytes,3,opt,name=chart" json:"chart,omitempty"`// Config is the set of extra Values added to the chart.// These values override the default values inside of the chart.Config *hapi_chart.Config `protobuf:"bytes,4,opt,name=config" json:"config,omitempty"`// Manifest is the string representation of the rendered template.Manifest string `protobuf:"bytes,5,opt,name=manifest" json:"manifest,omitempty"`// Hooks are all of the hooks declared for this release.Hooks []*Hook `protobuf:"bytes,6,rep,name=hooks" json:"hooks,omitempty"`// Version is an int32 which represents the version of the release.Version int32 `protobuf:"varint,7,opt,name=version" json:"version,omitempty"`// Namespace is the kubernetes namespace of the release.Namespace string `protobuf:"bytes,8,opt,name=namespace" json:"namespace,omitempty"`}
renderResources的处理逻辑为:func (s *ReleaseServer) renderResources(ch *chart.Chart, values chartutil.Values, vs chartutil.VersionSet) ([]*release.Hook, *bytes.Buffer, string, error) {renderer := s.engine(ch)files, err := renderer.Render(ch, values)先获取全局的engine对象,然后使用engine对象的Render函数进行渲染。render函数,需要跳转到engine包中engine.go文件,其源代码如下:
// render takes a map of templates/values and renders them.func (e *Engine) render(tpls map[string]renderable) (map[string]string, error) {// Basically, what we do here is start with an empty parent template and then// build up a list of templates -- one for each file. Once all of the templates// have been parsed, we loop through again and execute every template.//// The idea with this process is to make it possible for more complex templates// to share common blocks, but to make the entire thing feel like a file-based// template engine.t := template.New("gotpl")if e.Strict {t.Option("missingkey=error")} else {// Not that zero will attempt to add default values for types it knows,// but will still emit <no value> for others. We mitigate that later.t.Option("missingkey=zero")}funcMap := e.alterFuncMap(t)files := []string{}for fname, r := range tpls {t = t.New(fname).Funcs(funcMap)if _, err := t.Parse(r.tpl); err != nil {return map[string]string{}, fmt.Errorf("parse error in %q: %s", fname, err)}files = append(files, fname)}rendered := make(map[string]string, len(files))var buf bytes.Bufferfor _, file := range files {// At render time, add information about the template that is being rendered.vals := tpls[file].valsvals["Template"] = map[string]interface{}{"Name": file, "BasePath": tpls[file].basePath}if err := t.ExecuteTemplate(&buf, file, vals); err != nil {return map[string]string{}, fmt.Errorf("render error in %q: %s", file, err)}// Work around the issue where Go will emit "<no value>" even if Options(missing=zero)// is set. Since missing=error will never get here, we do not need to handle// the Strict case.rendered[file] = strings.Replace(buf.String(), "<no value>", "", -1)buf.Reset()}return rendered, nil}
render的处理,主要使用 t := template.New("gotpl")对象,其来自text/template包。其主要包括下面几步:1、创建template对象2、获取funcMap,funcMap使用了"github.com/Masterminds/sprig"库3、funcMap的处理,变量所有的文件4、遍历所有的文件,进行变量替换5、返回处理后的结果。(2) 执行部署操作s.performRelease(rel, req)
performRelease其源代码如下:
// performRelease runs a release.func (s *ReleaseServer) performRelease(r *release.Release, req *services.InstallReleaseRequest) (*services.InstallReleaseResponse, error) {res := &services.InstallReleaseResponse{Release: r}if req.DryRun {log.Printf("Dry run for %s", r.Name)res.Release.Info.Description = "Dry run complete"return res, nil}// pre-install hooksif !req.DisableHooks {if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PreInstall, req.Timeout); err != nil {return res, err}}switch h, err := s.env.Releases.History(req.Name); {// if this is a replace operation, append to the release historycase req.ReuseName && err == nil && len(h) >= 1:// get latest release revisionrelutil.Reverse(h, relutil.SortByRevision)// old releaseold := h[0]// update old release statusold.Info.Status.Code = release.Status_SUPERSEDEDs.recordRelease(old, true)// update new release with next revision number// so as to append to the old release's historyr.Version = old.Version + 1if err := s.performKubeUpdate(old, r, false, req.Timeout, req.Wait); err != nil {msg := fmt.Sprintf("Release replace %q failed: %s", r.Name, err)log.Printf("warning: %s", msg)old.Info.Status.Code = release.Status_SUPERSEDEDr.Info.Status.Code = release.Status_FAILEDr.Info.Description = msgs.recordRelease(old, true)s.recordRelease(r, false)return res, err}default:// nothing to replace, create as normal// regular manifestsb := bytes.NewBufferString(r.Manifest)if err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait); err != nil {msg := fmt.Sprintf("Release %q failed: %s", r.Name, err)log.Printf("warning: %s", msg)r.Info.Status.Code = release.Status_FAILEDr.Info.Description = msgs.recordRelease(r, false)return res, fmt.Errorf("release %s failed: %s", r.Name, err)}}// post-install hooksif !req.DisableHooks {if err := s.execHook(r.Hooks, r.Name, r.Namespace, hooks.PostInstall, req.Timeout); err != nil {msg := fmt.Sprintf("Release %q failed post-install: %s", r.Name, err)log.Printf("warning: %s", msg)r.Info.Status.Code = release.Status_FAILEDr.Info.Description = msgs.recordRelease(r, false)return res, err}}r.Info.Status.Code = release.Status_DEPLOYEDr.Info.Description = "Install complete"// This is a tricky case. The release has been created, but the result// cannot be recorded. The truest thing to tell the user is that the// release was created. However, the user will not be able to do anything// further with this release.//// One possible strategy would be to do a timed retry to see if we can get// this stored in the future.s.recordRelease(r, false)return res, nil}
其主要步骤为:1、基于输入的Release结构体,构造res := &services.InstallReleaseResponse{Release: r)2、执行设置的Hook操作3、使用后端存储进行记录 s.recordRelease(old, true)4、执行真正的部署操作 err := s.env.KubeClient.Create(r.Namespace, b, req.Timeout, req.Wait)5、在执行设置的Hook操作6、后端存储记录信息,完成部署至此,tiller中大致的处理请求的过程基本完成。主要基于env对象中,yard(engine)对象,store对象,kubeClient对象。

                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息