咱们来自字节跳动飞书商业运用研发部(Lark Business Applications),现在咱们在北京、深圳、上海、武汉、杭州、成都、广州、三亚都设立了工作区域。咱们重视的产品范畴主要在企业经历管理软件上,包含飞书 OKR、飞书绩效、飞书招聘、飞书人事等 HCM 范畴体系,也包含飞书审批、OA、法务、财务、收购、差旅与报销等体系。欢迎各位参加咱们。

本文作者:飞书商业运用研发部 李成武

欢迎咱们重视飞书技能,每周定时更新飞书技能团队技能干货内容,想看什么内容,欢迎咱们谈论区留言~

为什么需求锁

小热身

下面程序运转的成果是多少?

package main
import (
   "sync"
)
func main() {
   count := 0
   wg := &sync.WaitGroup{}
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         count++
      }()
   }
   wg.Wait()
   println(count)
}

答案

屡次运转会得到不同的成果。那么怎么得到正确的成果呢

package main
import (
   "sync"
)
func main() {
   count := 0
   wg := &sync.WaitGroup{}
   lock := &sync.Mutex{}
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         lock.Lock()
         count++
         lock.Unlock()
      }()
   }
   wg.Wait()
   println(count)
}

当某个资源数据具有同享性的时分,假如同一时刻有多个恳求更新同享资源就或许导致数据的纷歧致,这时分就必须要求在同一时刻只能被一个恳求访问,所以咱们需求运用锁来协调关于同享数据的更新,以确保数据的一致性

Golang中的锁/单机锁

互斥锁 Mutex

type Mutex struct {
   state int32
   sema  uint32
}
type Locker interface {
   Lock()
   Unlock()
}
// Mutex 完成了Locker接口
func (m *Mutex) Lock()
func (m *Mutex) Unlock()

运用留意点:

  1. 调用 unlock 办法时假如锁未被占用则会触发 panic fatal error: sync: unlock of unlocked mutex
  2. 调用 lock 办法时假如锁被占用则该协程会堵塞知道锁被开释,所以一定要记住履行完后及时开释锁,防止这种状况的最有效办法便是运用defer。
lock.Lock()
defer lock.Unlock()
  1. 互斥锁不支撑可重入,假如接连调用 lock 则会触发panic fatal error: all goroutines are asleep - deadlock!
  2. 锁作为参数时需求传递指针。原因在于 Mutex 是一个有状况的方针,假如传递方针那么会仿制一个 Mutex 并将相应的状况也仿制过来,这样每个线程履行函数时内部便是独自的锁,就达不到预期效果了。
func Add(lock *sync.Mutex) {
   lock.Lock()
   defer lock.Unlock()
   count++
}

读写锁 RWMutex

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}
// 加读锁
func (rw *RWMutex) RLock()
// 解读锁
func (rw *RWMutex) RUnlock()
// 加写锁
func (rw *RWMutex) Lock()
// 解写锁
func (rw *RWMutex) Unlock()

读写锁是针对读写操作的互斥锁,能够别离针对读操作与写操作进行锁定和解锁操作,这样关于功能有一定的提高。

读写锁的访问操控规则如下:

  1. 多个写操作之间是互斥的
  2. 写操作与读操作之间也是互斥的
  3. 多个读操作之间不是互斥的
不互斥 互斥
互斥 互斥

运用留意点: 读写锁运用留意点和互斥锁类似,可是因为多个读操作之间不是互斥的,因而关于读解锁更简略被忽视。关于同一个读写锁,添加多少个读锁定,就必要有等量的读解锁,不然程序会呈现异常。

分布式锁

到了分布式体系时代,这种线程之间的锁就发挥不了效果了,体系会有多份并且部署在不同的机器上,这些资源现已不是在线程之间同享了,而是属于不同实例之间同享的资源。为了处理这个问题,咱们就要引进「分布式锁」。现在常见的分布式锁完成办法有三种,别离是依据数据库完成、依据Redis完成以及依据ZooKeeper完成。

分布式锁的三个特点:

  • 互斥(Mutual Exclusion): 这是锁最根本的功能,同一时刻只能有一个客户端持有锁。
  • 防止死锁(Dead lock free): 假如某个客户端取得锁之后花了太长时刻处理,或许客户端产生了毛病,锁无法开释会导致整个处理流程无法进行下去,所以要防止死锁。
  • 容错(Fault tolerance): 为防止单点毛病,锁服务需求具有高可用性。

依据数据库

简略场景

比较简略的场景是并发更新数据库中某条记载。关于这种场景咱们能够选用达观锁的办法来完成,即在更新之前先查出更新字段的值,更新时查看对应值是否和方才查出的相同,假如相同则更新。

-- 账户原有100元全部转出
update account set balance=0 where id=账户ID and balance=100

可是这种办法存在一个问题,即ABA问题:假设要修正的值V原先为A,它被更新到B,后面又被更新到A,此时客户端经过CAS操作无法分辨当时V值是否产生过改动。这样就会导致程序运转成果不符合预期。

聊聊「锁」事

  1. T1小明查询账户余额100元并全部转出,可是更新之前产生了程序暂停。
  2. T2时刻小明发现卡顿并重新操作。
  3. T3时刻小明成功转出100元。
  4. T4时刻小红又给小明转账100元并操作成功。
  5. T5时刻程序康复时去履行更新,成果更新成功。

上述操作成果实践小明转出了200元,不符合预期只转出100元。处理ABA问题的一个办法便是在数据库表中引进一个版本号(version)字段来完成的。 当咱们要从数据库中读取数据的时分,一起把这个version字段也读出来,更新后写回数据库时则需求将version加1,且必须在更新的时分一起查看现在数据库里version值是不是之前的那个version。假如是则正常更新,假如不是则更新失利,需求重新读取数据再操作。

聊聊「锁」事

因为T4时刻version现已被更新成2,这样T5时刻再履行 where version=1 的时分会履行失利,防止了预期外的更新

通用场景

假如要完成一个通用化的锁时,就要在数据库中创立一张独自的表用来记载锁,以下介绍一下shedlock的完成。

-- 这儿name是全局仅有用来作为锁的标识
CREATE  TABLE shedlock
(
  name VARCHAR(64),
  lock_until TIMESTAMP(3) NULL,
  locked_at TIMESTAMP(3) NULL,
  locked_by VARCHAR(255),
  PRIMARY KEY (name)
)
  • 加锁进程

    • 经过插入同一个name(primary key)或许更新同一个name进行抢锁。先依据name查询数据库里是否有该记载,假如没有则履行insert操作,假如有则履行update操作。
    • -- insert
      INSERT  INTO shedlock
      (name, lock_until, locked_at, locked_by)
      VALUES
      (锁名字,当时时刻+最多锁多久,当时时刻, 客户端称号)
      -- update
      UPDATE shedlock 
      SET lock_until = 当时时刻+最多锁多久,
      locked_at = 当时时刻,
      locked_by = 客户端称号 WHERE name = 锁名字 AND lock_until <= 当时时刻 
      
  • 解锁进程

    • 经过设置lock_until来开释锁
    • UPDATE shedlock
      SET lock_until = 当时时刻 WHERE name = 锁名字 and locked_by = 客户端称号
      
  • 存在的问题

    • 写操作只能访问主库,高并发场景下对数据库的压力比较大

依据Redis

依据Redis完成分布式锁应该是最常见了,说到 Redis 分布锁咱们第一反响想到的便是 setnx SET if Not Exists

加锁办法

一说到 setnx 有些同学会想到 setnx 这个指令,那么加锁会是

SETNX lockKey value
EXPIRE lockKey time

可是这样却存在问题:这两个指令履行不是原子的,假如客户端在履行完 SETNX 后溃散了,那么就没有机会履行 EXPIRE 了,导致它一向持有这个锁。

正确的加锁姿态是运用 SET 指令加上 NX 参数

SET lockKey lockValue NX PX 30000
  • lockKey: 加锁的锁名。
  • lockValue: 由客户端生成的一个随机字符串,需求确保其仅有性。
  • NX: 表明只有当lockKey对应的key值不存在的时分才能SET成功。这确保了只有第一个恳求的客户端才能取得锁,而其它客户端在锁被开释之前都无法取得锁。
  • PX 30000: 设置过期时刻,单位是毫秒(ms),表明这个锁有一个30000ms即30s的自动过期时刻,当然这儿需求依据不同的事务场景设置合适的过期时刻。

lockValue的效果

咱们有没有遇到这样的代码?这样的加锁办法存在什么问题?

lock.Lock(key, "1", lockTime)

聊聊「锁」事

  1. T1时刻客户端A获取锁成功。
  2. 客户端A在某个操作上堵塞了很长时刻。
  3. T2过期时刻到了,锁自动开释了。
  4. T3时刻客户端B获取到了对应同一个资源的锁。
  5. 客户端A履行结束,开释掉了客户端B持有的锁。
  6. 客户端B还未履行结束,此时客户端C现已能够成功恳求到锁了。

所以lockValue的效果便是仅有标识加锁的 客户端 ,确保不能去开释他人的锁

解锁办法

上面介绍了lockValue要用来标识客户端,那么解锁的时分就需求判别value是否是自己设置的value,假如是才能履行解锁操作。

// 伪代码
String uuid := xxxx;
// 加锁
set lockName uuid NX PX 3000;
... 履行操作
// 解锁
if redis.get("lockName")==uuid {
    redis.del("lockName");
}

到了这儿又有细心的小伙伴发现了问题:这解锁的时分先get,判别后再 del ,这不是原子操作啊! 假如客户端A在比照后产生了程序暂停,此时锁到了过期时刻自动开释了,客户端B成功申请到了锁,那么客户端A康复后就又会开释了客户端B的锁。

那么删去锁的正确姿态之一便是运用 lua 脚本,经过 redis 的 eval/evalsha 指令来运转:

  -- 这段Lua脚本在履行的时分需求传递参数
 -- lockKey 作为KEYS[1]的值传进去
 -- lockValue 作为ARGV[1]的值传进去
 if redis.call("get",KEYS[1]) == ARGV[1] then  return redis.call("del",KEYS[1])  else   return 0  end

存在的问题

  1. 锁超时问题:客户端A获取锁后履行的进程中,因为履行时刻太长锁过期后客户端B能够成功获取锁。处理这个问题的办法便是锁续期, 这个机制在redisson框架中现已完成,参考资料:redisson中的看门狗机制总结。
  2. 锁丢掉问题:Redis的高可用都是依据「主从架构数据同步仿制」完成的。这就意味着假如在master节点上拿到了锁,可是这个加锁的key还没有同步到slave节点,这时分master产生毛病转移,slave节点升级为master节点,因为slave节点上没有刚刚加锁的key,所以导致呈现了锁丢掉。针对这个问题,redis之父antirez规划了Redlock算法,官网链接: Distributed Locks with Redis。

依据Zookeeper

Zookeeper简略介绍

ZooKeeper 是一个分布式运用程序协同服务,它规划方针是将那些杂乱且简略犯错的分布式一致性服务封装起来,构成一个高效牢靠的原语集,并以一系列简略易用的接口供给给用户运用。

ZooKeeper供给的命名空间与标准文件体系的称号空间十分相似,命名空间中的每个节点都由途径标识。与标准文件体系不同的是 ZooKeeper 中每个节点都能够具有与其关联的数据以及子节点。这就像具有一个允许文件也成为目录的文件体系,运用标准文件体系的“数据节点“的概念,ZooKeeper 数据节点称之为Znode。

聊聊「锁」事

在 Zookeeper 中 Znode 都是有生命周期的,其生命周期的长短取决于Znode的节点类型。节点类型可分为耐久节点(PERSISTENT)、暂时节点(EPHEMERAL)和次序节点(SEQUENTIAL)三大类,在节点创立进程中能够运用以下四种组合型节点类型:

  • 耐久节点: 数据节点被创立之后,会一向存在于Zookeeper服务器上,直至有删去操作来自动删去这个节点。
  • 耐久次序节点:和耐久节点的根本特性保持一致,不同之处表现在次序上。在Zookeeper中,每一个父节点都会为它的第一级子节点维护一份次序,在创立子节点时,Zookeeper会自动给节点称号添加一个数字后缀,作为Znode的最终称号。
  • 暂时节点: 暂时节点的生命周期和客户端的会话绑定,客户端的会话一旦失效,那么这个节点就会被自动清理掉。
  • 暂时次序节点:和暂时节点的根本特性保持一致,不同之处表现在次序上。在创立子节点时,Zookeeper依据创立的时刻次序给该节点称号进行编号。

Zookeeper的Watch机制

Zookeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些事件(如节点创立、节点改动、节点删去等)触发了这个 Watcher,那么就会给指定客服端发送一个事件告诉。

依据这个机制就能够完成堵塞式锁。客户端在获取锁失利时能够注册一个 Watcher 监听,等候 lock 节点被删去时客户端会收到告诉,这样就能够获取锁持续进行操作了。这个特功能够让分布式锁在客户端用起来就像一个本地的锁一样:加锁失利就堵塞住,直到获取到锁为止。

加锁&&解锁流程

  • 比较简略的完成是创立暂时节点的办法,运用 Zookeeper 同一级节点 key 称号是仅有的特性。

    • 加锁的时分创立暂时节点,创立成功则加锁成功
    • 加锁失利则等候并给锁节点添加监听器
    • 收到锁节点被删去的告诉再次测验加锁

聊聊「锁」事

可是这儿一切节点都会监听锁节点,那么当锁节点删去时一切监听的客户端都要被唤醒,这便是惊群效应。 这种状况很多告诉操作会形成zookeeper功能忽然下降。

  • 要处理这个问题能够运用创立暂时次序节点的办法完成,Zookeeper 会确保子节点的有序性。

    • 加锁的时分在耐久节点下创立暂时次序节点,然后获取该耐久节点的一切子节点,假如当时节点为一个节点,则表明加锁成功。
    • 加锁失利则等候并给前一个节点添加监听器,这样开释锁后就只会唤醒一个客户端
    • 客户端收到锁节点被删去的告诉,然后重新获取耐久节点的一切子节点,判别加锁是否成功。

聊聊「锁」事

存在的问题

到这儿咱们这个锁看起来形似现已比较完美了,支撑公平排队(先恳求的先得到锁)并且既不必设置过期时刻还能够在客户端呈现问题时自动开释锁。可是问题来了:怎么判别客户端呈现问题呢?

实践上Zookeeper是经过与每个客户端维护着一个Session,而这个Session依赖定时的心跳(heartbeat)来维持。假如ZooKeeper长时刻收不到客户端的心跳(这个时刻称为Sesion的过期时刻),那么它就认为Session过期了,经过这个Session所创立的一切的暂时节点都会被自动删去。这种状况下就或许会呈现 Zookeeper 对 Session 过期的误判,比如客户端呈现长时刻GC Pause 或许网络呈现问题等等,那么就会呈现两个客户端一起获取到锁的场景。

小结

  1. 依据mysql的分布式锁思路简略,可是需求新建独自的表,并且并发高后对数据库压力较大,或许会引发很多失利操作。另外在实施的进程中会遇到各种不同的问题(比如表数据量越来越大等),为了处理这些问题,完成办法将会越来越杂乱。故一般咱们不运用这种完成。
  2. 依据Redis的分布式锁功能高,支撑锁自动过期,不过需求事务设置合理的超时时刻,另外存在锁丢掉的问题。所以适用于并发量大、功能要求很高的场景,其次关于牢靠性需求其他计划确保。
  3. 依据Zookeeper的分布式锁正常状况下支撑客户端持有锁恣意长的时刻,能够确保事务履行完操作后再开释锁。可是因为在加锁和解锁进程中需求创立和销毁节点,所以功能相对Redis较差。故一般运用于高牢靠一起并发量不是很大的场景。

看了这儿咱们能够发现要完成一个肯定安全的分布式锁势必会引进更多的杂乱性,所以咱们在实践事务中需求考虑事务对极点状况的容忍度,寻找成本与收益之间的平衡点。

怎么更好的运用锁

咱们为了功能运用并发履行,因为并发访问同享数据会导致数据的纷歧致性,所以需求锁来确保数据一致性。可是锁会使得本来的并发履行转为串行履行,然而最后锁反过来又约束了功能。所以咱们运用锁时要留意运用办法,在确保数据一致性的一起,尽量削减体系功能的丢失。

削减锁的持有时刻

布景:咱们需求依据职工信息结构事务数据(以下运用BusinessData表明),然后将BusinessData写入DB,要求BusinessData不能呈现重复。为了确保BusinessData不重复,所以咱们写之前需求先查询DB中是否现已存在,不存在才履行写入操作。这样就或许遇到两个协程都查询到DB中不存在某条记载,之后都履行写操作就会呈现插入了重复的BusinessData,故咱们需求加锁来防止这种状况。

注:下面比如中 lock 为分布式锁,不是本地锁

func HandleCreateBusinessData(empID) error {
   // 加锁
   lock.Lock()
   defer lock.Unlock()
   // 1. 获取职工信息
   emp := GetEmpInfo(empID)
   // 2. 依据Emp构建BusinessData
   data := BuildBusinessData(emp)
   // 3. 假如现已存在则无需创立
   if IsExist(data) {
      return nil
   }
   // 4. 不存在则创立BusinessData
   return CreateBusinessData(data)
}
  • 这时分咱们会发现上述 1、2 步在并发场景下也不会导致问题,所以无需在锁的范围内。
func HandleCreateBusinessData(empID) error {
   // 1. 获取职工信息
   emp := GetEmpInfo(empID)
   // 2. 依据Emp构建BusinessData
   data := BuildBusinessData(emp)
   // 加锁
   lock.Lock()
   defer lock.Unlock()
   // 3. 假如现已存在则无需创立
   if IsExist(data) {
      return nil
   }
   // 4. 不存在则创立BusinessData
   return CreateBusinessData(data)
}
  • 可是这种写法还有一个问题,假如后续有人持续添加代码,比如第4步后还有一些操作,那么锁的范围就又会自动扩大了,所以愈加主张将锁的范围抽出独自的函数。
func HandleCreateBusinessData(empID) error {
   // 1. 获取职工信息
   emp := GetEmpInfo(empID)
   // 2. 依据Emp构建BusinessData
   data := BuildBusinessData(emp)
   // 3. 加锁创立BusinessData
   err := CreateBusinessDataWithLock(data)
   if err != nil {
       return err
   }
   // 4. 进行后续操作
   OtherOperation()
   return nil
}
func CreateBusinessDataWithLock(businessData) err {
    // 加锁
   lock.Lock()
   defer lock.Unlock()
    // 假如现已存在则无需创立
   if IsExist(businessData) {
      return nil
   }
   // 不存在则创立BusinessData
   return CreateBusinessData(businessData)
}

优化锁的粒度

上述创立BusinessData事例中要求其不能重复,可是关于SaaS体系而言,不能重复一般是指同一租户内不能重复。那么针对这种场景咱们会发现在上述加锁办法下,该接口同一时刻只能处理一个恳求,可是不同租户间其实是不存在冲突的。所以咱们能够优化为对租户加锁。

func CreateBusinessDataWithLock(businessData) err {
    // 对当时租户加锁
    tenantID := GetCurrentTenantID()
    lockKey := "lock_CreateBusinessDataWithLock_" + tenantID
    lock.Lock(lockKey, uuid, lockTime)
    defer lock.Unlock(lockKey, uuid)
    // 假如现已存在则无需创立
   if IsExist(businessData) {
      return nil
   }
   // 不存在则创立BusinessData
   return CreateBusinessData(businessData)
}

无锁

关于并发操作的场景咱们纷歧定要运用锁来确保安全,能够运用原子操作或许额定空间来确保数据一致。

  • 运用原子操作
// 比如咱们开头的比如能够写成
package main
import (
   "sync"
   "sync/atomic"
)
func main() {
   count := int32(0)
   wg := &sync.WaitGroup{}
   for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
         defer wg.Done()
         atomic.AddInt32(&count, 1)
      }()
   }
   wg.Wait()
   println(count)
}
  • 空间换时刻
func ParallelBatchGetGetDataByID(ctx context.Context, idList []string) ([]*Data, error) {
   if len(idList) == 0 {
      return []*Data{}, nil
   }
   goroutineSize := constant.BatchGetGetDataByIDGoroutineSize
   // 将 idList 分红 goroutineSize 组
   idListArray := util.SpitTask(ctx, idList, goroutineSize)
   retVal := make([]*Data, 0, len(idList))
   errList := make([]error, 0, goroutineSize)
   wg := &WaitGroupWrapper{}
   lock := &sync.Mutex{}
   for i := idListArray {
      batchIDList := idListArray[i]
      wg.Wrap(ctx, func() {
         data, err := BatchGetGetDataByID(ctx, batchIDList)
         lock.Lock()
         retVal = append(retVal, data...)
         errList = append(errList, err)
         lock.Unlock()
      })
   }
   wg.Wait()
   for _, err := range errList {
      if err != nil {
         return nil, err
      }
   }
   return retVal, nil
}

上面代码能够优化成

func ParallelBatchGetGetDataByID(ctx context.Context, idList []string) ([]*Data, error) {
   if len(idList) == 0 {
      return []*Data{}, nil
   }
   goroutineSize := constant.BatchGetGetDataByIDGoroutineSize
   // 将 idList 分红 goroutineSize 组
   idListArray := util.SpitTask(ctx, idList, goroutineSize)
   retVal := make([]*Data, 0, len(idList))
   dataList := make([][]*Data, goroutineSize)
   errList := make([]error, goroutineSize)
   wg := &WaitGroupWrapper{}
   for i := idListArray {
      // 需求留意这儿要copy
      idx := i
      wg.Wrap(ctx, func() {
         dataList[idx], errList[idx] = BatchGetGetDataByID(ctx, idListArray[idx])
      })
   }
   wg.Wait()
   for _, err := range errList {
      if err != nil {
         return nil, err
      }
   }
   for _, data := range dataList {
      retVal = append(retVal, data...)
   }
   return retVal, nil
}

尽量防止锁嵌套

实践开发中尽量防止锁嵌套,不然或许呈现死锁

/* 运转成果:
A1 get A lock
B2 get B lock
fatal error: all goroutines are asleep - deadlock!
*/
package main
import (
   "fmt"
   "sync"
)
var lockA = &sync.Mutex{}
var lockB = &sync.Mutex{}
func main() {
   A1()
   go func() {
      A2()
   }()
}
func A1() {
   lockA.Lock()
   fmt.Println("A1 get A lock")
   B2()
   defer lockA.Lock()
}
func A2() {
   lockA.Lock()
   fmt.Println("A2 get A lock")
   defer lockA.Lock()
}
func B1() {
   lockB.Lock()
   fmt.Println("B1 get B lock")
   A2()
   defer lockB.Lock()
}
func B2() {
   lockB.Lock()
   fmt.Println("B2 get B lock")
   defer lockB.Lock()
}

参考资料

  • Go 为什么不支撑可重入锁? –
  • 聊聊分布式锁 –
  • 细说Redis分布式锁 –
  • 深度分析:Redis分布式锁究竟安全吗?看完这篇文章彻底懂了! – 文章概况
  • Redis Redlock 的争论 –
  • Zookeeper 完成分布式锁 –

参加咱们

扫码发现职位&投递简历

聊聊「锁」事

官网投递

job.toutiao.com/s/FyL7DRg