二.Go微服务--令牌桶
2021-09-01 07:57
826 查看
1. 令牌桶
1.1 原理
- 我们以 r/s 的速度向桶内放置令牌,桶的容量为 b , 如果桶满了令牌将会丢弃
- 当请求到达时,我们向桶内获取令牌,如果令牌足够,我们就通过转发请求
- 如果桶内的令牌数量不够,那么这个请求会被缓存等待令牌足够时转发,或者是被直接丢弃掉
由于桶的存在,所以令牌桶算法不仅可以限流还可以应对突发流量的情况
举个例子:假设我们桶的容量是 100,速度是 10 rps,那么在我们桶满的情况下,如果突然来 100 个请求是可以满足的,但是后续的请求就会被限制到 10 rps
存在下面两种特殊情况
- 如果桶的容量为 0,那么相当于禁止请求,因为所有的令牌都被丢弃了
- 如果令牌放置速率为无穷大,那么相当于没有限制
令牌桶最常见的实现就是 Go 官方的 golang.org/x/time/rate
1.2 使用方法
方法如下
type Limiter struct { // contains filtered or unexported fields } // 构建一个限流器,r 是每秒放入的令牌数量,b 是桶的大小 func NewLimiter(r Limit, b int) *Limiter // 分别返回 b 和 r 的值 func (lim *Limiter) Burst() int func (lim *Limiter) Limit() Limit // token 消费方法 func (lim *Limiter) Allow() bool func (lim *Limiter) AllowN(now time.Time, n int) boolfunc (lim *Limiter) Reserve() *Reservation func (lim *Limiter) ReserveN(now time.Time, n int) *Reservationfunc (lim *Limiter) Wait(ctx context.Context) (err error) func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) // 动态流控 func (lim *Limiter) SetBurst(newBurst int) func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) func (lim *Limiter) SetLimit(newLimit Limit) func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
1.2.1 初始化令牌桶
直接调用
NewLimiter(r Limit, b int)即可,
r表示每秒产生 token 的速度,
b表示桶的大小
1.2.2 Token 消费
总共有三种 token 消费的方式,最常用的是使用 Wait 阻塞等待
Allow
Allow就是
AllowN(now,1)的别名,
AllowN表示截止到 now 这个时间点,是否存在 n 个 token,如果存在那么就返回
true反之返回
false,如果我们限流比较严格,没有资源就直接丢弃可以使用这个方法
func (lim *Limiter) Allow() bool func (lim *Limiter) AllowN(now time.Time, n int) bool
Reserve 同理
Reserve也是
ReserveN(now, 1)的别名,
ReserveN其实和
AllowN类似,表示截止到
now这个时间点,是否存在 n 个
token,只是
AllowN直接返回 true or false,但是
ReserveN返回一个
Reservation对象
func (lim *Limiter) Reserve() *Reservation func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
Reservation有 5 个方法,通过调用 OK 我们可以知道是否通过等待可以获取到 N 个 token,如果可以通过 Delay 方法我们可以得知需要等待的时间,如果我们不想等了可以调用 Cancel 方法归还 token
type Reservation func (r *Reservation) Cancel() func (r *Reservation) CancelAt(now time.Time) func (r *Reservation) Delay() time.Duration func (r *Reservation) DelayFrom(now time.Time) time.Duration func (r *Reservation) OK() bool
Wait Wait 是最常用的,
Wait是
WaitN(ctx, 1)的别名,
WaitN(ctx, n)表示如果存在 n 个令牌就直接转发,不存在我们就等,等待存在为止,传入的 ctx 的 Deadline 就是等待的 Deadline
func (lim *Limiter) Wait(ctx context.Context) (err error) func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
1.2.3 动态流控
通过调用
SetBurst和
SetLimit可以动态的设置桶的大小和 token 生产速率,其中
SetBurstAt和
SetLimitAt会将传入的时间 now 设置为流控最后的更新时间
func (lim *Limiter) SetBurst(newBurst int) func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) func (lim *Limiter) SetLimit(newLimit Limit) func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit)
1.3 基于ip的gin限流中间件
主要就是使用了
sync.map来为每一个
ip创建一个 limiter,当然这个 key 也可以是其他的值,例如用户名等
func NewLimiter(r rate.Limit, b int, t time.Duration) gin.HandlerFunc { limiters := &sync.Map{} return func(c *gin.Context) { // 获取限速器 // key 除了 ip 之外也可以是其他的,例如 header,user name 等 key := c.ClientIP() l, _ := limiters.LoadOrStore(key, rate.NewLimiter(r, b)) // 这里注意不要直接使用 gin 的 context 默认是没有超时时间的 ctx, cancel := context.WithTimeout(c, t) defer cancel() if err := l.(*rate.Limiter).Wait(ctx); err != nil { // 这里先不处理日志了,如果返回错误就直接 429 c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": err}) } c.Next() } }
使用的时候只需要 use 一下中间件就可以了
func main() { e := gin.Default() // 新建一个限速器,允许突发 10 个并发,限速 3rps,超过 500ms 就不再等待 e.Use(NewLimiter(3, 10, 500*time.Millisecond)) e.GET("ping", func(c *gin.Context) { c.String(http.StatusOK, "pong") }) e.Run(":8080") }
我们使用
go-stress-testing来压测一下,20 个并发
~/gopath/bin/go-stress-testing -c 20 -n 1 -u http://127.0.0.1:8080/ping 开始启动 并发数:20 请求数:1 请求参数: ─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬──────── 耗时│ 并发数│ 成功数│ 失败数│ qps │ 最长耗时│ 最短耗时│ 平均耗时│ 下载字节│ 字节每秒│ 错误码 ─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼──────── 1s│ 20│ 11│ 9│ 63.79│ 438.48│ 45.37│ 313.53│ 152│ 259│200:11;429:9 ************************* 结果 stat **************************** 处理协程数量: 20 请求总数(并发数*请求数 -c * -n): 20 总请求时间: 0.586 秒 successNum: 11 failureNum: 9 ************************* 结果 end ****************************
可以发现总共成功了 11 个请求,失败了 9 个,这是因为我们桶的大小是 10 ,所以前 10 个请求都很快就结束了,第 11 个请求等待 333.3 ms 就可以完成,小于超时时间 500ms,所以可以放行,但是后面的请求确是等不了了,所以就都失败了,并且可以看到最后一个成功的请求的耗时为 336.83591ms 而其他的请求耗时都很短
[GIN-debug] Listening and serving HTTP on :8080 [GIN] 2021/03/29 - 13:15:55 | 200 | 1.48104ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.107689ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.746222ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 866.35µs | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.870403ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 2.231912ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.832506ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 613.741µs | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.454753ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.37802ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.428062ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 40.782µs | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.046146ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.7624ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 429 | 1.803124ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 41.67µs | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.42315ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 1.371483ms | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 731.091µs | 127.0.0.1 | GET "/ping" [GIN] 2021/03/29 - 13:15:55 | 200 | 336.83591ms | 127.0.0.1 | GET "/ping"
1.3 完整代码
demo.main
package main import ( "context" "fmt" "net/http" "sync" "time" "github.com/gin-gonic/gin" "golang.org/x/time/rate" ) // NewLimiter, 定义中间件 func NewLimiter(r rate.Limit, b int, t time.Duration) gin.HandlerFunc { limiters := &sync.Map{} return func(c *gin.Context) { // 获取限速器 // key 除了 ip 之外也可以是其他的,例如 header,user name 等 key := c.ClientIP() l, _ := limiters.LoadOrStore(key, rate.NewLimiter(r, b)) // 这里注意不要直接使用 gin 的 context 默认是没有超时时间的 ctx, cancel := context.WithTimeout(c, t) defer cancel() if err := l.(*rate.Limiter).Wait(ctx); err != nil { // 这里先不处理日志了,如果返回错误就直接 429 c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": err}) } c.Next() } } func main() { e := gin.Default() // 新建一个限速器,允许突发 10 个并发,限速 3rps,超过 500ms 就不再等待 e.Use(NewLimiter(3, 10, 500*time.Millisecond)) e.GET("ping", func(c *gin.Context) { c.String(http.StatusOK, "pong") }) err := e.Run(":8080") if err != nil { fmt.Print("start server err:", err.Error()) } }
下载go-stress-test
wget https://github.91chifun.workers.dev/https://github.com//link1st/go-stress-testing/releases/download/v1.0.3/go-stress-testing-linux
将gostress-tesing添加环境变量
mv go-stress-testing-linux /usr/local/bin/go-stress-testing
启动测试
go-stress-testing -c 20 -n 1 -u http://172.20.80.1:8080/ping
2. 参考
相关文章推荐
- 三.Go微服务--令牌桶实现原理
- 如何用 Go 快速编写出 HTTP REST API 服务?
- [斗鱼]没人比我更懂微服务--Go微服务框架Jupiter
- 使用Go Hijack和jQuery轻松实现异步推送服务
- 微信开发者获取服务令牌(component_access_token){"errcode":41002,"errmsg":"appid missing"}
- GO --微服务框架(二) goa
- 用 Go + WebSocket 快速实现一个 chat 服务
- Android O_GO后台启动服务改动
- 基于Go技术栈的微服务构建
- 清晰架构(Clean Architecture)的Go微服务: 日志管理
- [分享]一个在线网页缩略图服务(中国聚客网www.crossgo.com)
- 构建自定义安全令牌服务
- 你还在手撕微服务?快试试 go-zero 的微服务自动生成
- gomicro微服务系列之一
- go 简单的web服务
- PHP调用Go服务的正确方式 - Unix Domain Sockets
- 清晰架构的 Go 微服务: 程序容器
- Go 微服务实践
- 斗鱼开源基于Go实现的微服务框架 Jupiter
- 服务计算作业6——cloudgo-data