您的位置:首页 > 编程语言 > Java开发

Tars | 第5篇 基于TarsGo Subset路由规则的Java JDK实现方式(上)

2021-09-11 14:47 846 查看

TOC

前言

利开园导师(下称“利导师")用Go语言实现了Subset路由规则,并在中期汇报分享会里介绍出来;这篇文章将基于利导师的实现方式,对Subset路由规则的细节做些理解与补充。

此篇文章为上半部分,旨在记录利导师对TarsGo代码的修改,并对分析其Subset路由规则。下半部分将对照与参考Go语言JDK的实现方式,对TarsJava相关Subset路由规则做代码改进。

上下部分文章在目录上一一对应,上半注重TarsGo分析,下半部分注重TarsJava实现方式。如上篇文章第一点修改.tars协议文件记录利导师在TarsGo的代码修改,下片文章第一点也是修改.tars协议文件,侧重点在如何用Java语言实现。上下文章相辅相成,建议对照学习。

一些资源链接如下:

下半部分文章链接 https://www.cnblogs.com/dlhjw/p/15245116.html

TarsJava 实现Subset路由规则JDK链接地址 https://github.com/dlhjw/TarsJava/commit/cc2fe884ecbe8455a8e1f141e21341f4f3dd98a3

TarsGo 实现Subset路由规则JDK链接地址 https://github.com/defool/TarsGo/commit/136878e9551d68c4b54c402df564729f51f3dd9c#

1. 修改.tars协议文件

在Tars协议文件里;

1.1 Go语言修改部分

协议文件共有两处地方需要更改,一是给EndpointF节点增加Subset配置,二是在查找助手里添加根据ID获取Subset配置信息的接口配置;

给EndpointF节点增加Subset配置

根据ID获取Subset配置信息的接口

1.2 修改地方的逻辑

原逻辑 现逻辑
给节点增加Subset配置,增加的是一个Tars协议的结构体
增加获取Subset信息的接口,同样Tars协议的结构体

注意

  • 第一处:给EndpointF节点增加Subset配置 最终结果可能不是这样,要看Registry接口是怎样的;
  • 修改协议文件后需要运行一段命令自动生成相应代码;
  • TarsGo的自动生成命令在
    tars/protocol/res/Makefile
    里;
  • 第二处:根据ID获取Subset配置信息的接口
      String id 为“应用名.服务名.端口名”;
    • id这样设置是考虑到与其他接口命令对其;
    • 该接口要与Tars Registry新增的接口名对上;

    1.3 通过协议文件自动生成代码

    Tars有个强大的功能,它能根据.tars里的配置文件自动生成相应Bean代码;

    而在TarsGo里,对应代码如下:

    执行上述命令后,对应代码会发生改变,如下:

    这告诉我们老师发布的TarsGo代码里,有些代码不需要我们手动去更改,而是通过命令自动生成的。在Java中命令为在项目根路径执行

    mvn tars:tars2java
    。具体过程将在下半部分文章里详解,这里仅记录TarsGo代码在哪发生变化。


    2. 【核心】增添Subset核心功能

    Go语言在tars/subset.go内

    2.1 Go语言修改部分

    package tars
    
    import (
    "encoding/json"
    "math/rand"
    "regexp"
    "strconv"
    "sync"
    "time"
    
    "github.com/TarsCloud/TarsGo/tars/protocol/res/endpointf"
    "github.com/TarsCloud/TarsGo/tars/protocol/res/queryf"
    "github.com/TarsCloud/TarsGo/tars/util/consistenthash"
    "github.com/TarsCloud/TarsGo/tars/util/endpoint"
    "github.com/serialx/hashring"
    )
    
    var (
    enableSubset = true
    subsetMg     = &subsetManager{}
    )
    
    type hashString string
    
    func (h hashString) String() string {
    return string(h)
    }
    
    type subsetConf struct {
    enable    bool
    ruleType  string // ratio/key
    ratioConf *ratioConfig
    keyConf   *keyConfig
    
    lastUpdate time.Time
    }
    
    type ratioConfig struct {
    ring *hashring.HashRing
    }
    
    type keyRoute struct {
    action string
    value  string
    route  string
    }
    
    type keyConfig struct {
    rules        []keyRoute
    defaultRoute string
    }
    
    type subsetManager struct {
    lock  *sync.RWMutex
    cache map[string]*subsetConf
    
    registry *queryf.QueryF
    }
    
    //根据服务名获取它的Subset方法,返回subsetConf配置项
    func (s *subsetManager) getSubsetConfig(servantName string) *subsetConf {
    s.lock.RLock()
    defer s.lock.RUnlock()
    var ret *subsetConf
    //如果缓存在最近时间之内,就直接返回
    if v, ok := s.cache[servantName]; ok {
    ret = v
    if v.lastUpdate.Add(time.Second * 10).After(time.Now()) {
    return ret
    }
    }
    //如果上次获取时间超时,则调用registry接口获取对应配置
    // get config from registry
    conf := &endpointf.SubsetConf{}
    retVal, err := s.registry.FindSubsetConfigById(servantName, conf)
    if err != nil || retVal != 0 {
    // log error
    return ret
    }
    
    ret = &subsetConf{
    ruleType:   conf.RuleType,
    lastUpdate: time.Now(),
    }
    s.cache[servantName] = ret
    
    // 解析从registry那获取的配置信息
    // parse subset conf
    if !conf.Enable {
    ret.enable = false
    return ret
    }
    //按比例路由
    if conf.RuleType == "ratio" {
    kv := make(map[string]int)
    json.Unmarshal([]byte(conf.RuteData), &kv)
    ret.ratioConf = &ratioConfig{ring: hashring.NewWithWeights(kv)}
    } else {
    keyConf := &keyConfig{}
    kvlist := make([]map[string]string, 0)
    json.Unmarshal([]byte(conf.RuteData), &kvlist)
    for _, kv := range kvlist {
    //默认路由
    if vv, ok := kv["default"]; ok {
    keyConf.defaultRoute = vv
    }
    if vv, ok := kv["match"]; ok {
    //精确匹配
    keyConf.rules = append(keyConf.rules, keyRoute{
    action: "match",
    value:  vv,
    route:  kv["route"],
    })
    } else if vv, ok := kv["equal"]; ok {
    //正则匹配
    keyConf.rules = append(keyConf.rules, keyRoute{
    action: "equal",
    value:  vv,
    route:  kv["route"],
    })
    }
    }
    ret.keyConf = keyConf
    }
    return ret
    }
    
    func (s *subsetManager) getSubset(servantName, routeKey string) string {
    // check subset config exists
    subsetConf := subsetMg.getSubsetConfig(servantName)
    if subsetConf == nil {
    return ""
    }
    // route key to subset
    if subsetConf.ruleType == "ratio" {
    return subsetConf.ratioConf.findSubet(routeKey)
    }
    return subsetConf.keyConf.findSubet(routeKey)
    }
    
    //根据subset规则过滤节点
    func subsetEndpointFilter(servantName, routeKey string, eps []endpoint.Endpoint) []endpoint.Endpoint {
    if !enableSubset {
    return eps
    }
    subset := subsetMg.getSubset(servantName, routeKey)
    if subset == "" {
    return eps
    }
    
    ret := make([]endpoint.Endpoint, 0)
    for i := range eps {
    if eps[i].Subset == subset {
    ret = append(ret, eps[i])
    }
    }
    return ret
    }
    
    func subsetHashEpFilter(servantName, routeKey string, m *consistenthash.ChMap) *consistenthash.ChMap {
    if !enableSubset {
    return m
    }
    subset := subsetMg.getSubset(servantName, routeKey)
    if subset == "" {
    return m
    }
    
    ret := consistenthash.NewChMap(32)
    for _, v := range m.GetNodes() {
    vv, ok := v.(endpoint.Endpoint)
    if ok && vv.Subset == subset {
    ret.Add(vv)
    }
    }
    return ret
    }
    
    func (k *ratioConfig) findSubet(key string) string {
    // 为空时使用随机方式
    if key == "" {
    key = strconv.Itoa(rand.Int())
    }
    v, _ := k.ring.GetNode(key)
    return v
    }
    
    func (k *keyConfig) findSubet(key string) string {
    for _, v := range k.rules {
    if v.action == "equal" && key == v.value {
    return v.route
    } else if v.action == "match" {
    if matched, _ := regexp.Match(v.value, []byte(key)); matched {
    return v.route
    }
    }
    }
    return k.defaultRoute
    }

    2.2 新增地方的逻辑

    新增类型 新增内容
    结构体 新增Subset配置项的结构体
    subsetConf
    结构体 新增路由规则配置项的结构体
    ratioConfig
    结构体 新增染色路径的结构体
    keyRoute
    结构体 新增染色配置项的结构体
    keyConfig
    结构体 新增subset管理者的结构体
    subsetManager
    方法 新增获取subset配置项的方法
    getSubsetConfig
    方法 新增获取比例 / 染色路由配置项的方法
    getSubset
    方法 新增根据subset规则过滤节点的方法
    subsetEndpointFilter
    方法 新增根据一致hash的subset规则过滤节点的方法
    subsetHashEpFilter
    方法 新增按比例路由路由路径的方法
    findSubet
    方法 新增按默认路由路径
    findSubet

    3. 添加常量与获取染色key的方法

    在tars/util/current/tarscurrent相关包里,处理上下文信息相关;

    3.1 Go语言修改部分

    3.2 修改地方的逻辑

    原逻辑 现逻辑
    新增一个常量字段
    STATUS_ROUTE_KEY
    新增两个方法,分别是设置与获取染色Key

    4. 【核心】修改获取服务IP规则

    在节点管理的相关文件里;方法是实现在第8点;

    4.1 Go语言修改部分

    4.2 修改地方的逻辑

    原逻辑 现逻辑
    获取所有的服务IP列表 在原来IP列表的基础上根据请求包的
    current.STATUS_ROUTE_KEY
    值过滤部分节点

    5. 实现透传染色Key功能(客户端)

    在tars/tarsprotocol相关文件里;

    5.1 Go语言修改部分

    5.2 修改地方的逻辑

    原逻辑 现逻辑
    无透传染色Key 在客户端最终执行的方法里增加透传染色Key功能

    注意:

    • 这里的染色Key为新建的,与源代码里的染色Key不同;
    • 可以参考染色一致的实现方式,区别是Key的名称不同,实现思路类似;
    • 在TarsJava客户端中,这里的最终执行的方法指
      TarsInvoker
      类里的三个执行方法; 即:同步调用方法
      invokeWithSync()
      、异步调用方法
      invokeWithAsync()
      和协程调用方法
      invokeWithPromiseFuture()

    6. 实现透传染色Key功能(服务端)

    在tars/tarsprotocol.go相关文件里;

    6.1 Go语言修改部分

    6.2 修改地方的逻辑

    原逻辑 现逻辑
    无透传染色Key 在服务端最终执行的方法里增加透传染色Key功能
    • 在TarsJava服务端中,这里的最终执行的方法指
      TarsServantProcessor.process()

    7. 给节点信息增添Subset字段

    在节点信息相关文件里;

    7.1 Go语言修改部分

    7.2 修改地方的逻辑

    原逻辑 现逻辑
    给节点信息增加Subset字段
    修改解析函数,能识别出sunset字段

    注意

    • 不一定是String类型,只要在Endpoint对象结构体里添加一个Subset相关属性,有地方用即可;
    • 这部分在Java中仅为Endpoint.java类,故放在一起;

    * 8. 新增工具类

    在工具类包里;

    8.1 Go语言修改部分


    最后

    新人制作,如有错误,欢迎指出,感激不尽! 欢迎关注公众号,会分享一些更日常的东西! 如需转载,请标注出处!
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: 
    相关文章推荐