广度成就多维视角,深度利于快速定位。 — 微微一笑
引言
上一篇《Redis分布式锁运用及问题解决》咱们经过场景切入,首要介绍了Redis分布式锁在并发场景下的运用。这一章从多种常用分布式锁的解决方案下手,深化探讨原理及运用场景。在技术广度上参加一些思考。首要内容如下:

分布式锁特性
分布式锁作为一种数据一致性同步机制,在规划和运用时需求具有哪特性来保证其可靠性和有用性?
- 原子性(Atomicity) : 分布式锁的获取和开释操作应该是原子的,保证在任何时刻只有一个节点能够成功获取锁,防止竞态条件。
- 可靠性(Reliability) : 分布式锁应该在各种反常情况下都能够可靠地工作,包含网络分区、节点毛病等。即便在某个节点出现问题时,其他节点依然能够正常运用锁。
- 互斥性(Mutual Exclusion) : 分布式锁应该保证在任何时刻只有一个节点能够持有锁,其他节点必须等候或者测验从头获取锁。
- 超时机制(Timeout) : 分布式锁通常应该支撑超时机制,防止死锁情况的产生。假如一个节点获取锁后在必守时刻内没有开释锁,体系应该能够自动开释锁。
- 可重入性(Reentrancy) : 有些分布式锁允许同一个节点屡次获取同一把锁,即可重入性。这关于某些场景(比如递归调用)或许是有用的。
常见分布式锁解决方案
在分布式体系中,咱们需求经过分布式锁操控多节点并发导致的数据一致问题。常见的分布式锁解决方案如下,咱们经过具体的介绍,清晰的比照,简易的了解办法为读者打开新的视角。
分布式锁一: 数据库达观锁(CAS思想)
先来大致了解下失望锁和达观锁的差异:

在分布式体系中失望锁易形成功能瓶颈。这里只介绍达观锁的运用。还是从前的场景,用一张商品库存表,说明下怎么了解数据库达观锁。内容如下:
mysql>select*fromproduct_stock;
+----+------------+-------+---------+
|id|product_id|stock|version|
+----+------------+-------+---------+
|1|1|100|1|
+----+------------+-------+---------+
1rowinset(0.00sec)
多线程环境下运用version版别号的机制,履行扣库存操作,根本流程如下:

流程描绘:
- 事务A履行
START TRANSACTION;
-- 查询当时版别号:1
SELECT version FROM product_stock WHERE product_id = 1;
-- 履行操作,更新版别号
UPDATE product_stock SET stock = stock - 1, version = version + 1 WHERE product_id = 1 AND version = <当时查询到的版别号>;
-- 提交事务
COMMIT;
- 事务B履行
START TRANSACTION;
-- 查询当时版别号:1
SELECT version FROM product_stock WHERE product_id = 1;
-- 履行操作,更新版别号,此处A履行完version=2,更新失利
UPDATE product_stock SET stock = stock - 1, version = version + 1 WHERE product_id = 1 AND version = <当时查询到的版别号>;
-- 提交事务
COMMIT;
由此可见,达观锁的办法能有用解决并发安全问题,适用于一些读多写少的场景,尤其是关于短事务、容忍性好的运用场景,它能够提高体系的并发性和吞吐量。但是并发量大的时分会导致很多的update失利。
分布式锁二: RedisTemplate: setNX expire
关于运用和常见问题,在上一篇《Redis分布式锁运用及问题解决》现已具体介绍,在运用中,咱们曾经埋雷:redis分布式锁过期时刻怎么设置?接下来咱们就聊聊Redission一些原理和运用中的问题。


分布式锁三: Redission结构
咱们就上一篇中扣库存的事例,运用Redission结构再次升级。建立一个简易版的单机环境,具体进程:
- 添加 Maven 依靠:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.5</version><!--最新版别请参考官方文档-->
</dependency>
- 配置Redission
@Configuration
publicclassRedisConfig{
@Value("${config.redis.host}")
privateStringhost;
@Value("${config.redis.port}")
privateStringport;
@Value("${config.redis.password}")
privateStringpassword;
@Bean
publicRedissonredisson(){
//此为单机形式
Configconfig=newConfig();
config.useSingleServer()
.setAddress("redis://"+host+":"+port)
.setPassword(password)
.setDatabase(1);
return(Redisson)Redisson.create(config);
}
}
- 事务逻辑
@RequestMapping("/redissonLock/reduceStock")
publicStringreduceStock(){
StringlockKey="product_01";
RLocklock=redisson.getLock(lockKey);
Stringmsg="";
try{
//加锁
lock.lock();
intstock=Integer.parseInt(redisTemplate.opsForValue().get("stock"));
if(stock>0){
intrealStock=stock-1;
redisTemplate.opsForValue().set("stock",realStock+"");
msg="减库存成功,剩下:"+realStock;
System.out.println(msg);
}else{
msg="减库存失利,库存缺乏";
System.out.println(msg);
}
}finally{
//开释锁
lock.unlock();
}
returnmsg;
}
- 测验成果
Pod:8080

Pod:8081

- 成果分析
明显,Redission结构确实解决了分布式环境并发产生的数据问题。咱们提炼出中心逻辑:
//创立分布式锁
RLocklock=redisson.getLock("myLock");
try{
//测验获取锁
lock.lock();
//事务逻辑
System.out.println("Businesslogicinsidethelock.");
}finally{
//开释锁
lock.unlock();
}
十分简略易用。但世间蛮多简略的工作其实蛮杂乱。正如:哪有什么年月静好,只由于有人为你负重前行。 那么强壮的背后是怎样的支撑?
咱们先看下大致的流程,稍后经过源码深化原理:

流程描绘:
– 客户端A
1、获取到锁;
2、判别是否设置过期时刻,假如未指定锁的持续时刻,则运用内部默认的持续时刻
3、fork后台线程,守时使命续期;
4、履行完成,开释锁
– 客户端B
1、获取锁失利,由于锁被A持有;
2、while循环,以自旋办法不断测验获取锁;
3、等候A开释锁,直到获取锁,否则继续进程 2;
下面经过中心深化源码,具体了解上述流程,并思考几个问题.
这段代码是 Redisson 中用于测验获取分布式锁的中心事务逻辑。
private<T>RFuture<Long>tryAcquireAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId){
RFuture<Long>ttlRemainingFuture;
//假如指定了加锁时刻,加锁运用指守时刻
if(leaseTime!=-1){
ttlRemainingFuture=tryLockInnerAsync(waitTime,leaseTime,unit,threadId,RedisCommands.EVAL_LONG);
}else{
//假如未指定锁的持续时刻,则运用内部默认的持续时刻
ttlRemainingFuture=tryLockInnerAsync(waitTime,internalLockLeaseTime,
TimeUnit.MILLISECONDS,threadId,RedisCommands.EVAL_LONG);
}
//异步回调,处理锁获取成果
ttlRemainingFuture.onComplete((ttlRemaining,e)->{
//省掉部分代码...
//锁获取成功
if(ttlRemaining==null){
if(leaseTime!=-1){
//更新内部锁持续时刻
internalLockLeaseTime=unit.toMillis(leaseTime);
}else{
//发动守时使命,定期续约锁的过期时刻
scheduleExpirationRenewal(threadId);
}
}
});
returnttlRemainingFuture;
}
1)加锁逻辑具体说明tryLockInnerAsync
<T>RFuture<T>tryLockInnerAsync(longwaitTime,longleaseTime,TimeUnitunit,longthreadId,RedisStrictCommand<T>command){
returnevalWriteAsync(getRawName(),LongCodec.INSTANCE,command,
//运用Lua脚本履行原子性操作获取锁
"if(redis.call('exists',KEYS[1])==0)then"+//假如锁不存在
"redis.call('hincrby',KEYS[1],ARGV[2],1);"+//将线程ID作为哈希字段,并递增其值
"redis.call('pexpire',KEYS[1],ARGV[1]);"+//设置锁的过期时刻
"returnnil;"+//回来nil表明锁获取成功
"end;"+
"if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+//假如线程ID已存在
"redis.call('hincrby',KEYS[1],ARGV[2],1);"+//递增线程ID的值
"redis.call('pexpire',KEYS[1],ARGV[1]);"+//设置锁的过期时刻
"returnnil;"+//回来nil表明锁获取成功
"end;"+
"returnredis.call('pttl',KEYS[1]);",//回来锁的剩下过期时刻
Collections.singletonList(getRawName()),unit.toMillis(leaseTime),getLockName(threadId));
}
标示:
在这段 Lua 脚本中,KEYS[1]、ARGV[1] 和 ARGV[2] 是 Lua 脚本中的参数,对应 evalWriteAsync 办法中传递的参数。
- KEYS[1]: 代表第一个参数,对应 Collections.singletonList(getRawName()),即 Redis 锁的称号,也就是键值(key)。
- ARGV[1]: 代表第一个额定参数,对应 unit.toMillis(leaseTime),即以毫秒为单位的锁的持续时刻。
- ARGV[2]: 代表第二个额定参数,对应 getLockName(threadId),即获取锁的线程ID。
2)锁续期:scheduleExpirationRenewal(threadId).办法调用链路:scheduleExpirationRenewal –> renewExpiration。供给简化代码:
//开启一个守时使命履行续期逻辑
privatevoidrenewExpiration(){
Timeouttask=commandExecutor.getConnectionManager().newTimeout(newTimerTask(){
@Override
publicvoidrun(Timeouttimeout)throwsException{
//省掉部分代码...
//异步履行续约操作
RFuture<Boolean>future=renewExpirationAsync(threadId);
future.onComplete((res,e)->{
//省掉部分代码...
if(res){
//假如续约成功,从头调本身履行续约操作
renewExpiration();
}else{
cancelExpirationRenewal(null);
}
});
}
//留意:盯梢代码发现:这里守时使命:lockWatchdogTimeout的1/3=10S时刻去履行
},internalLockLeaseTime/3,TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
3)几个细节
- 加锁逻辑中的参数
Collections.singletonList(getRawName())=LockKey
getLockName(threadId)==UUID:threadId
- WatchDog默认时刻30s,每隔:lockWatchdogTimeout/3 = 10s履行一次守时续期。
privatelonglockWatchdogTimeout=30*1000; internalLockLeaseTime/3
- 异步思想:经过Future异步线程回调,主线程履行事务逻辑,后台线程履行续期逻辑,削减堵塞

就上述流程,咱们做个小总结盘下几个问题:
Q1:怎么保证加锁操作原子性?
运用Lua脚本(天然生成原子)履行加锁逻辑,Redis-Server端自己保护。例如,上述代码中运用Lua脚本履行判别+加锁+超时设置。
if(redis.call('hexists',KEYS[1],ARGV[2])==1)then"+
"redis.call('pexpire',KEYS[1],ARGV[1]);"+
"return1;"+
"end;"+
"return0;
Q2:怎么自动续期?
获取锁成功后,运用看门狗机制每隔10s中履行一次续期。
privatevoidrenewExpiration(){
Timeouttask=commandExecutor.getConnectionManager().newTimeout(newTimerTask(){
@Override
publicvoidrun(Timeouttimeout)throwsException{
//异步履行续约操作
future.onComplete((res,e)->{
if(res){
//假如续约成功,回调本身履行续约操作
renewExpiration();
}
});
}
//守时使命:lockWatchdogTimeout的1/3=10S时刻去履行
},internalLockLeaseTime/3,TimeUnit.MILLISECONDS);
}
Q3:可重入怎么了解?
先看下可重入锁的界说,咱们知道synchronized 关键字在 Java 中天然生成支撑可重入锁。
简略的示例代码如下:
publicclassReentrantLockDemo{
publicstaticvoidmain(String[]args){
//创立一个示例目标
ReentrantObjectreentrantObject=newReentrantObject();
//发动一个线程
newThread(()->{
try{
reentrantObject.performTask();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}
}
classReentrantObject{
//界说一个可重入锁
privatefinalObjectlock=newObject();
publicvoidperformTask()throwsInterruptedException{
//第一次获取锁
synchronized(lock){
System.out.println(Thread.currentThread().getName()+"第一次获取锁");
//在持有锁的情况下再次获取锁
synchronized(lock){
System.out.println(Thread.currentThread().getName()+"第2次获取锁");
//履行使命,模仿工作
Thread.sleep(1000);
}
}
//开释第一次获取的锁
System.out.println(Thread.currentThread().getName()+"第一次开释锁");
}
}
测验成果:

当一个线程持有一个目标的锁时,它能够再次获取相同目标的锁,而不会被堵塞。可重入锁会保护一个持有锁的计数器。当线程第一次获取锁时,计数器值加一;当线程开释锁时,计数器值减一。只有当计数器值为零时,表明锁被完全开释,其他线程才有机会获取锁。
那么在分布式环境中,怎么完成可重入?
在 Redisson 的源码中,可重入锁的完成首要涉及到 Redis 的 Lua 脚本和底层的 Redis 命令。(上面已介绍,看看是那段,哈哈~~~)
简略归纳一下 Redisson 中可重入锁的首要完成进程:

1、创立锁目标: 当用户恳求创立一个可重入锁时,Redisson 会在 Redis 中创立一个相应的数据结构表明该锁。这个数据结构通常是一个哈希表。
2、线程标识: 每个恳求锁的线程都有一个仅有的标识符。Redisson 运用线程 ID 来标识不同的线程。
3、计数器: 在 Redisson 中,可重入锁的计数器用于记录某个线程持有锁的次数。当线程首次恳求锁时,计数器被设置为 1。每次重入恳求都会添加计数器。
4、Lua 脚本: Redisson 运用 Lua 脚原本完成原子性的获取锁和开释锁的操作。Lua 脚本能够保证这两个操作是原子的,从而防止了并发问题。
5、获取锁: 获取锁时,Lua 脚本会查看当时线程是否现已持有锁。假如是,则添加计数器。假如不是,则测验获取锁。获取锁的进程包含判别是否锁现已被其他线程持有,假如没有,则设置锁的所有者和计数器。
6、开释锁: 开释锁时,Lua 脚本会查看当时线程是否持有锁。假如是,则削减计数器。假如计数器减为 0,表明锁能够被开释。开释锁的进程包含判别是否锁的所有者是当时线程,假如是,则开释锁。
Q4:Redis主从架构中锁失效问题?
如,出产环境redis集群主节点宕机,如下场景:
- Client-A在master节点获取锁成功。还没有把获取锁的信息同步到 slave 的时分,master 宕机。
- slave 被选举为新 master,这时分没有线程A获取锁的数据。
- Client-B 就能成功的取得客户端A持有的锁,违背分布式锁互斥性。
为保证数据强一致性,通常有些运用Zookeeper完成分布式锁
分布式锁四: Zookeeper + Curator完成分布式锁
标示:Curator是Zookeeper一个客户端,类似于Redission和Redis的关系。
首要运用Zookeeper的节点仅有途径去完成.其首要原理:

逻辑参考ZK原理和网络文章:juejin.cn/post/703859…
具体描绘:
(1)线程到 /locks 途径下面创立一个带序号的临时节点。
(2)判别自己创立的这个节点是不是/locks途径下序号最小的节点,假如是,则获取锁;假如不是最小节点,则监听自己的前一个节点。
(3)获取到锁后,履行事务逻辑,然后删去自己创立的节点。监听它的后一个节点收到告诉后,履行进程(2)
运用ZooKeeper和Curator完成分布式锁的根本进程:
1、引进Curator依靠
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version><!--请查看最新版别-->
</dependency>
2、创立Curator客户端: 创立一个CuratorFramework实例来连接到ZooKeeper服务器
CuratorFrameworkclient=CuratorFrameworkFactory.newClient("your_zookeeper_connection_string",newExponentialBackoffRetry(1000,3));
client.start();
3、运用分布式锁: Curator供给了InterProcessMutex类来完成分布式锁
InterProcessMutexlock=newInterProcessMutex(client,"/locks/path");
try{
//获取锁
lock.acquire();
//履行其它
}catch(Exceptione){
//处理反常
}finally{
//开释锁
lock.release();
}
在上述代码中,”/locks/path”是ZooKeeper中的节点途径,用于存储锁信息
4、处理锁超时: 在获取锁时,你能够选择传递超时时刻。
//这样能够防止死锁,假如在指守时刻内无法获取锁,能够选择履行其他逻辑。
if(lock.acquire(5,TimeUnit.SECONDS)){
try{
//履行其它
}finally{
//开释锁
lock.release();
}
}else{
//获取锁超时的处理逻辑
}
5、关闭Curator客户端: 在运用程序关闭时,保证关闭CuratorFramework客户端
client.close();
总结
综上,对各种分布式锁的运用咱们给出如下建议:
- 数据库达观锁
-
长处
- 经过版别号或时刻戳等字段的比对,完成相对简略
- 支撑跨进程,适用于分布式环境,能够跨多个数据库实例运用
-
缺陷
- 功能: 由于需求比对版别号或时刻戳,并发量大会导致很多update失利
- 死锁: 假如运用层未正确处理死锁,或许出现死锁问题。
- Redis的setNX+EXPIRE
-
长处
- Redis的原子性操作,完成简略而高效
- 支撑分布式: 能够轻松在分布式环境中运用
-
缺陷
- 单点毛病
- 当锁的持有者因某种原因宕机,锁或许长时刻无法开释
- Redisson
- 长处
- 支撑可重入锁、公平锁、读写锁等
- ZooKeeper
-
长处
- 强一致性,适用于对一致性要求较高的场景。
- 高可用性,支撑主从架构,半数以上选举机制保证在主节点失效时仍能供给服务。
-
缺陷
- 功能相对较低: 相比于Redis等内存数据库,ZooKeeper的功能或许相对较低。CP原理。功能和一致性只能取其一。
- 杂乱性: 部署和保护ZooKeeper集群相对杂乱。
值得留意的是:他们都有共性的问题,就是 死锁: 所有分布式锁的完成都需求留意死锁的问题,即锁被永久性地占用。
结尾
感知日子:一半有用,一半有趣。 感谢耐性的你阅读到最后。期望本篇文章对你有协助,也欢迎你参加咱们,大众号【码易有道】。一起做长期且正确的工作!!!