您的位置:首页 > 数据库 > SQL

golang 利用mysql 实现分布式锁

2020-06-09 04:34 609 查看

需求背景
我们的服务近期刚开始上线,之前在测试环境一直都是单实例运行。其中有一项功能是搜集kubernetes pod中的日志,上线之后发现,日志搜集有重复,问题定位较容易,因为重复数量恰好是我们线上服务的副本数,想到没有做分布式锁。线上运行的每个实例都在搜集每个pod的日志,导致日志重复。
对于实现分布式锁,最常见的是通过zk、redis来实现,因为项目刚开始上线,目前暂不打算额外引入其他中间件,所以打算通过mysql来实现分布式锁。参考了以下两篇文章:
mysql实现分布式锁
简单实现依赖 MySQL 的分布式锁

通过对比 上面第一篇文章中提高的集中方式,我们采用了文章中提到的第一种方式,也就是基于表主键唯一做分布式锁。
文章提到的集中方式中,基于mysql排他锁来实现是最简单的方式,缺点是可能引起表锁。再者,我们要搜集日志的pod数量是不断变化的,所以通过select~for update这种方式前提还需要不断的更新维护数据。既然如此,不如直接insert更新数据时,利用主键唯一直接来实现分布式锁。这两点最大区别是一个利用了mysql自身的排他锁,一个则是自己像mysql写入主键相同的数据来实现。

另外,文章中针对这种方式提出的几个问题,我们也有一些常见的场景:

  1. 可重入锁问题:项目中对于pod日志收集的实现,是通过每个pod单独开启一个协程去处理的,并且搜集日志时,保持一个长链接,read流不断搜集日志,即在我们目前的场景中是可以忽略分布式锁不可重入的问题的。但这里有一个问题,既然协程是需要长久占用锁的,那就需要不断更新其超时时间,使对锁永不超时,保持对锁的占用。直至进程因为某些原因挂掉,自动超时,被其他协程获取。

    针对这一点,扩展到golang项目中,在golang开发中使用多个协程处理公共资源一般情况下不需要考虑锁的重入问题,因为golang 协程一般不像在java线程那样,使用线程池对线程进行复用,协程处理完任务golang进行回收,所以不存在协程再次需要获取锁的情况。对于java多线程的情况,一般考虑在数据库中增加字段owner来记录当前资源在被哪个线程占用,java中可以直接获取线程唯一标识threadname,将其写入到表中的owner字段即可,golang中获取唯一标识这点也略微麻烦

  2. 锁释放问题,分布式锁需要设置超时时间,避免在获取锁的协程挂掉之后锁不能被释放。

  3. mysql单点问题:我们项目直接使用了公司层面的数据库,本身是高可用的,单点问题不用我们考虑。

//分布式锁实现代码

//过期时间10分钟
const TIMEOUT int64 = 10 * 60

// mysql table struct
type distributedLock struct {
Id         string `gorm:"primary_key"`
ExpireTime int64
}

type lockObject struct {
id         string
expireTime int64
db         *gorm.DB
}

func NewLock(id string, db *gorm.DB) *lockObject {
return &lockObject{
id:         id,
expireTime: time.Now().Unix() + TIMEOUT,
db:         db,
}
}

func (lock *lockObject) TryLock() bool {
// clean timeout lock
lock.deleleExpiredLock()

var newLock = distributedLock{
Id:         lock.id,
ExpireTime: lock.expireTime,
}
// insert lock
if err := lock.db.Table("distributed_lock").Create(&newLock).Error; err != nil {
return false
}

return true
}

//我们业务的特殊场景,因为协程保持读取流监听,常驻任务,所以需要保持锁不超时,如果超时后,其他协程获取锁再对pod进行搜集处理,就会出现重复
func (lock *lockObject) KeepLock(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
var oldLock = distributedLock{
Id: lock.id,
}
lock.db.Table("distributed_lock").Model(&oldLock).Update("expire_time", time.Now().Unix()+TIMEOUT)

timer := time.NewTimer(time.Second * 10)
<-timer.C
}
}
}

func (lock *lockObject) deleleExpiredLock() {
var now = time.Now().Unix()
lock.db.Table("distributed_lock").Where("id = ? AND expire_time < ?", lock.id, now).Delete(distributedLock{})
}

func (lock *lockObject) DeleleLock() {
lock.db.Table("distributed_lock").Where("id = ?", lock.id).Delete(distributedLock{})
}
go func() {
//等待日志生成
time.Sleep(time.Second * 10)
for {
//定时获取搜集的pod,将每个pod交由一个协程处理
act.pollPods()
timer := time.NewTimer(time.Second * 10)
<-timer.C
}
}()
//pollPods方法
for _, podItem := range podList.Items {
pod := podItem
go func() {
lock := NewLock(pod.GetObjectMeta().GetName(), act.db)
if !lock.TryLock() {
return
}
ctx, cancel := context.WithCancel(context.Background())
go lock.KeepLock(ctx)
defer cancel()

//搜集日志任务
act.writeActivityInfo(&pod)
}()
}
}

写在最后
我司专业专注各种服务器代理:
1 来自于阿里、腾讯等大厂开发人员免费答疑,针对您的使用场景规划最合理产品方案和架构,最大限度节约成本。
2.各类云服务器全网超低价。 联想、华为、阿里、腾讯等厂商指定合作代理公司,购买有保障。
以远低于原厂的价格享受原厂的服务!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: