golang 利用mysql 实现分布式锁
需求背景
我们的服务近期刚开始上线,之前在测试环境一直都是单实例运行。其中有一项功能是搜集kubernetes pod中的日志,上线之后发现,日志搜集有重复,问题定位较容易,因为重复数量恰好是我们线上服务的副本数,想到没有做分布式锁。线上运行的每个实例都在搜集每个pod的日志,导致日志重复。
对于实现分布式锁,最常见的是通过zk、redis来实现,因为项目刚开始上线,目前暂不打算额外引入其他中间件,所以打算通过mysql来实现分布式锁。参考了以下两篇文章:
mysql实现分布式锁
简单实现依赖 MySQL 的分布式锁
通过对比 上面第一篇文章中提高的集中方式,我们采用了文章中提到的第一种方式,也就是基于表主键唯一做分布式锁。
文章提到的集中方式中,基于mysql排他锁来实现是最简单的方式,缺点是可能引起表锁。再者,我们要搜集日志的pod数量是不断变化的,所以通过select~for update这种方式前提还需要不断的更新维护数据。既然如此,不如直接insert更新数据时,利用主键唯一直接来实现分布式锁。这两点最大区别是一个利用了mysql自身的排他锁,一个则是自己像mysql写入主键相同的数据来实现。
另外,文章中针对这种方式提出的几个问题,我们也有一些常见的场景:
-
可重入锁问题:项目中对于pod日志收集的实现,是通过每个pod单独开启一个协程去处理的,并且搜集日志时,保持一个长链接,read流不断搜集日志,即在我们目前的场景中是可以忽略分布式锁不可重入的问题的。但这里有一个问题,既然协程是需要长久占用锁的,那就需要不断更新其超时时间,使对锁永不超时,保持对锁的占用。直至进程因为某些原因挂掉,自动超时,被其他协程获取。
针对这一点,扩展到golang项目中,在golang开发中使用多个协程处理公共资源一般情况下不需要考虑锁的重入问题,因为golang 协程一般不像在java线程那样,使用线程池对线程进行复用,协程处理完任务golang进行回收,所以不存在协程再次需要获取锁的情况。对于java多线程的情况,一般考虑在数据库中增加字段owner来记录当前资源在被哪个线程占用,java中可以直接获取线程唯一标识threadname,将其写入到表中的owner字段即可,golang中获取唯一标识这点也略微麻烦
-
锁释放问题,分布式锁需要设置超时时间,避免在获取锁的协程挂掉之后锁不能被释放。
-
mysql单点问题:我们项目直接使用了公司层面的数据库,本身是高可用的,单点问题不用我们考虑。
//分布式锁实现代码 //过期时间10分钟 const TIMEOUT int64 = 10 * 60 // mysql table struct type distributedLock struct { Id string `gorm:"primary_key"` ExpireTime int64 } type lockObject struct { id string expireTime int64 db *gorm.DB } func NewLock(id string, db *gorm.DB) *lockObject { return &lockObject{ id: id, expireTime: time.Now().Unix() + TIMEOUT, db: db, } } func (lock *lockObject) TryLock() bool { // clean timeout lock lock.deleleExpiredLock() var newLock = distributedLock{ Id: lock.id, ExpireTime: lock.expireTime, } // insert lock if err := lock.db.Table("distributed_lock").Create(&newLock).Error; err != nil { return false } return true } //我们业务的特殊场景,因为协程保持读取流监听,常驻任务,所以需要保持锁不超时,如果超时后,其他协程获取锁再对pod进行搜集处理,就会出现重复 func (lock *lockObject) KeepLock(ctx context.Context) { for { select { case <-ctx.Done(): return default: var oldLock = distributedLock{ Id: lock.id, } lock.db.Table("distributed_lock").Model(&oldLock).Update("expire_time", time.Now().Unix()+TIMEOUT) timer := time.NewTimer(time.Second * 10) <-timer.C } } } func (lock *lockObject) deleleExpiredLock() { var now = time.Now().Unix() lock.db.Table("distributed_lock").Where("id = ? AND expire_time < ?", lock.id, now).Delete(distributedLock{}) } func (lock *lockObject) DeleleLock() { lock.db.Table("distributed_lock").Where("id = ?", lock.id).Delete(distributedLock{}) }
go func() { //等待日志生成 time.Sleep(time.Second * 10) for { //定时获取搜集的pod,将每个pod交由一个协程处理 act.pollPods() timer := time.NewTimer(time.Second * 10) <-timer.C } }()
//pollPods方法 for _, podItem := range podList.Items { pod := podItem go func() { lock := NewLock(pod.GetObjectMeta().GetName(), act.db) if !lock.TryLock() { return } ctx, cancel := context.WithCancel(context.Background()) go lock.KeepLock(ctx) defer cancel() //搜集日志任务 act.writeActivityInfo(&pod) }() } }
写在最后
我司专业专注各种服务器代理:
1 来自于阿里、腾讯等大厂开发人员免费答疑,针对您的使用场景规划最合理产品方案和架构,最大限度节约成本。
2.各类云服务器全网超低价。 联想、华为、阿里、腾讯等厂商指定合作代理公司,购买有保障。
以远低于原厂的价格享受原厂的服务!
- 利用Sqoop实现MySQL与HDFS数据互导
- 利用mysql-cluster软件实现mysql集群
- 利用MySQL的一个特性实现MySQL查询结果的分页显示
- 利用ZooKeeper简单实现分布式锁
- 利用简易Tomcat服务器结合MysqL实现Android手机注册与登录(客户端部分)
- 利用golang的反射包,实现根据函数名自动调用函数。
- MySQL5.7 利用keepalived来实现mysql双主高可用方案的详细过程
- PHP利用闭包实现MySQL事务场景下缓存一致性的模型
- MySQL5.7 利用keepalived来实现mysql双主热备
- 利用PHP的GD2图像函数 + mysql实现的一个简单的投票系统
- 在MySQL中利用外键实现级联删除
- 利用Ajax实现数据的同步传输,从mysql中提取数据,通过echarts可视化
- golang使用go-sql-driver实现mysql增删改操作 推荐
- Linux-(C)利用Mysql相关API实现类似mysql的操作
- 利用MySQL的一个特性实现MySQL查询结果的分页显示
- 利用MySQLDriverCS实现在.NET环境下访问MySQL数据库
- 在MySQL中利用外键实现级联删除!
- 利用简易Tomcat服务器结合MysqL实现Android手机注册与登录(服务器部分)
- MySQL利用正则匹配函数实现多个条件查询
- 利用MyCAT实现MySQL的读写分离和主从切换