部门老大:redis 分布式锁再这么用,我就劝退你


-如有不谨慎或许过错之处,还望不吝赐教,轻点怼,人家仍是个孩子,嘤嘤嘤~

导言

最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈事务方催的紧,工期就在眼前只l b _ E o能硬着头皮上了。脑子浑浑噩噩的时分,写的就不能叫代码,能够直接叫做Bug。我就熬夜写了一个bug被骂惨了。

部门老大:redis 分布式锁再这么用,我就劝退你

由所以做商城事务,要频频的对h * M x r 7商品库存进行扣减,应用是集群布置,为防止并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本认为给扣库存的代码加上M m ; (lock.tryLock就万事大吉了

    /**
* @author xiaofu
* @descrip4 ^ [ @ o h dtion 扣减库存
* @dates + X [ } ; 2020/4/21 12:10
*/
public String stockLock() {
RLoh - 9 3 ;ck lock = redissonClient.getL) - + # v I G 9ock("stockLock");
try {
/**
* 获取锁
*/
if (lock.tryLock(10, TimeUnit.SECONDS)) {
/**
* 扣减库存
*/
。。。。。。
} else {
LOGGER.info; a - N t C Y + I("未获取到锁事务完毕..");
}
} catch (Exception e) {
LOGGER.i} g snfo("处理2 h n Q U 4反常", e);
}
return "ok";
}

结果# ^ v E 7事务代码履行完以后我忘了开释锁lock.unlock(),导致redis线程池被打满,redis服务大面积毛病,造成库存数据扣减紊乱,被领导一顿臭骂,这个月绩效~ 哎~。

随着 运用rK : Fedis 锁的时刻越长,我发现 redis 锁的坑远比幻想中要多。就算在面试题傍边redis分布式锁的出镜率也比较高,比方:“用锁遇到过哪些问题?a E U #” ,“又是如何处理的?” 基本都是一套连招问出来的。

今天就共享一下我用redis 分布式锁的踩坑日记,以及一些处理方案,和咱们一同共勉。

一、锁未被开释

这种状况是一种初级过错,便是我上边犯的错,因为当时线程* H 9 ! – 获取到redis 锁,处理完事务后未及时开释锁,导致其它) h [线程会一向测验获取锁阻塞,例如:用JedisE 9 | _ X a | k户端X R a a y会报如下的过错信息

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resouu l k n ? G e 4rce# N ) ` @ ~ e ] from the pool

redis线程池现已没有闲暇线程来处理客户端指令。

处理的办法也很简单,只需咱们细心一点,拿到 ` ) w s锁的线程处理完事务及时开释锁Q f K,假设是重入锁未拿到锁后,线程能够开释当时衔接而且sleep一段时刻。

  public void lock() {
while (true) {
boop ? 4 6 @lean flag = this: h F f h 2 E.getLock(key);
if (flag) {
TODO .........
} else {
// 开释当时reO i x ydis衔接
redis.close();
// 休眠1000毫秒
sleep(1000);
}
}
}

二、B的锁被A给开释了

咱们知道Redis完结锁的原理在于 SETNX指令。% a f 4 p ! 5 Ckey不存在时将 key的值设为 value ,回来值为 1;若给定6 / _ & + g ? key 现已存在,则 SETNX不做任何动作,回来值为 0

SETNX key vaQ V l R * E z -lue

咱们来设想一下这个场景:AB两个线程5 S – T y R G来测验给key myLock加锁,A线程先拿到锁(假设锁3秒后过期),B线程就在等候测验获取锁,到这一点毛病没有。

那假设此刻事务逻辑比较耗时,履行时刻7 2 = x a 8现已超过redis锁过期时刻,这时A线程的锁主动开释(删去keyS D b f),B线程检测到myLock这个key不存在,履行 SETNX指令也拿到了锁。

可是,此刻A线程履行完事务逻辑之后,, o u &仍是会去开释锁(删去key),这就导致B线程的锁被A线程给开释了。

为防止上边的状况,一般J M + y :咱们在每个线程加锁时要带上自己独有的value值来标识,只开释指定valuekey,不然就会呈现开释锁紊乱的场景。

三、数据库事务超时

emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:

   @Transaction
pW y [ O F c Kublic void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
}
}
}

给这个办法添加一个@Transaction注解开启事务,如代码中抛出反常进行回滚,要知道数据库事务可是有超时时刻限制的,并不会无条件的一向D a c j r等一个耗时的数据库操作。

比方:咱们解析一个大文件– A } + C,再将数据存入到数据库,假设履行时刻太长,就会导致事务超时主4 n t动回滚。

一旦你的key长时刻获取不到锁,获取{ L ? Z等候的时刻远超过数据库事务超时3 r 2 # ! C _时刻,程序就会报反常。

一般为处理这种问题,咱H { h F们就需要将数据库事务改为手动提o @ x + M X k交、回滚事务。

    @Autowi[ S t g W t x 0red
DataSourceTransactionManager dataSourceTransactionManager;
@Transaction
public void lock()= # ` ; {
//手动开启事务
TransactionStatus trang O 3 FsactionStatus = dataSourceTra! [ D h A Vnsactioh c c n pnManager.getTransaction(transactionDefinition);
try {
while (true) {
boolean flag = this.getd  x C mLock(key);
if. W J m (flag) {
insert();4 Z X b - f
//手动提交事务
dataSourceTransactionManager.commit(transactionStatus);b k 3
}
}
} catch (Exception e) {
//手动回滚e X p y W _ ; } y事务
dataSourceTransactionManager.rollback(transactionStatus);
}
}

四、锁4 j V过期了,事务还没履行B u 1 O ~

这种状况和咱们上边说到的第二种比较类似,但处理思路上略有不同。

同样是redis分布式锁过期x 3 h z O,而事务逻辑没履行完的场景,不过,这儿换一种思路想问题,redis锁的过期时刻再弄长点不就处d 0 E u ^ ; Q b理了吗?

那仍是有问题,咱们能够在加锁的时分,– h r r 0 ~ @手动调Z B +redis锁的过期时刻,c u x c可这个时刻多长适宜?事务逻辑的履行时刻a : w y W o $ T是不可控S I q的,调的过长又会影响操作功能a J _

要是redis锁的过期时刻能够主动续期就好了。

为了处理这个问题咱们运用red3 F =is客户端redissonredi1 5 + m csson很好的处@ , R 8理了redD L q q v ? {is在分布式环境下的一些棘手问题,它的主旨便是让运3 e v J ] L p用者削减对Redis的重视,将更多精力用在处理事务逻辑上。

rediss? W [ 0on对分布式锁做了很好封装,只需调用API即可。

  RLock lock = redissonClient.getLock(") _ I ZstockLock");

redisson在加锁成功后,会注册一个守时使命监听这个锁,每隔10秒就去查看这个锁,假设还持有锁,就对过期时刻进行续期。默许过期时刻30秒。这个机制也被叫做:“看门狗”,这姓名。。。

举例子:假设加锁的时刻是30秒,过10秒查b – z D看一次,一旦加锁的事务没有履行完,就会进行一次Q n 0 m V d续期,把锁的过期时刻再次重置成30秒。

经过分析下边redisson的源码完结能够发现,不管是加锁解锁续约都是客户端把一些杂乱的事务逻辑,经过封装在Lua脚本中发送给re3 T ;dis,保证这段杂乱事务逻辑履行的原子性


@Slf4j
@Service
public class RedisV l ( f W pDistributY , yionLockPlL 3 mus {
/**
* 加锁超时时刻,单位毫秒, 即:: W 2 P ~ s 9 s 4加锁时刻内履行完操作,假设未完结会有并 5 P 7发现象
*/
private st| ~ y N ? satic final long DEFAULT_LOCKW C @ ` B p U  ._TT c ; & 1 +IMEOUT = 30;
private static final long- Z T TIMEv e Y D M *_SECONDS_FIVn : { i | m hE =[ g j K 0 B X 5 ;
/**
* 每个key的过期时刻 {@link LockContent}
*/
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/**
* redis履行O o r成功的回来
*/
private static final Long EXEC_SUCCESS = 1L;
/**
* 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时= 3 u ) 8 ~ e 5时刻
*/
priv4 3 = : a N # j cate static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[) D ] p 6 B W B z2]) == 1 then ARGV[2] = m# v ? 1 K ( [ 7 cath.floor(redis.call('get', KEYS[2]) + 10) end " +
"if redis.call('exists', KEYSO / U }[1]) == 0 th/ 9 d i [en " +
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
"for k, v in1 6 W w 7 H ] ! pairs(t)2 U i c a p t do " +
"if v.  3 == 'OK' then return tonumber(ARGV[2]) end " +
"end " +1 x i C 1 4 R
"return 0 end";
/**
* 开释锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:事务耗时 arg3: 事务开端设置的timeout
*/
private static fina2 @ e a ! ql String UNLOCK_SCRIV ^ O q g PPT = "if redis.call('get', KEYS[1])( Y ; . . G e == ARGV[1] then " +
"local ctime = tou ) A G ?number(A { a -RGV[2]) D h v P + W T" +
"loca; n k *  A h J @l biz_timeout = tonumber(ARGV[3]) " +
"if ctime > 0 then  " +
"iE 6 q ( Mf redis.ci s u P w v T * =all('exists', KEYS[2]) == 1 then " +
"locN { T yal avg_time = rec w 6 Bdis.call('get'6 V w E P + J, KEYS[2]) " +
"avg_time = (tonumber6 A r P B Z ? J D(avg_time)( | , l * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'E7 E ( nX', 24*60*60) " +
"else redis.call('del', KEYS[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGR s {  # mV[2], 'EX', 24*60*60) end " +
"end " +
"return redis.call('del', KEYS[1]) "V L K R b ` v +
"else return 0 end";
/**8 A b | Y
* 续约lua脚本
*/
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplateZ r a v V O H ) redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
this.redis@ % i X Z w 6Template = redisTemplate;
Scheu m [ B eduleTask tx H 8 8 V Oask = new ScheduleTask(thiR B 3 k W % m ]s, lockContentMap);
// 发动守时使命
ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}
/**
* 加锁
* 取到锁加锁,取不到锁一向等候知道取得锁
*
* @param lockKey
* @param requestId 全局唯一
* @R r c a T : ( {param expire   锁过期时刻, 单位秒
* @return
*/
public boolean lock(String lockKey, String requestId, long expireK m a % , g o Q) {
log.Q o 8 n : X + )info("开端履行加锁, lockKeyC l q ( I 4 F ~ ={},a 7 U ? requestId={}", lockKey, requestId);h u 2 i @ 9 ` b c
for (; ; ) {
// 判别是否现已有线程持有锁,削减redis的压力
LockContent lockContentOld = lockContentMap.get(lockKeyt * @);
boolean unLocJ + 2ked = null =U o f 6 S R ^ B 4= lockContentOldZ g L o C =;
// 假设没有被锁,就获取锁b % c * D
if (unLocked) {
lonP 7 A S / V z ^g startTime = System.currentTimeMillis();
// 核算超时时刻1 U # ! 7 M d O 9
long bizExpire = expire == 0L ? DEFAUQ F Y $ 1 cLT_LOCK_TIMEOUT : expire;
String lockKeyRenew = lockKey| , B % _ + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
List<String&8 h , a ( : O D |gt; keys| ; 9 3 = new ArrayList<>();
keys.add+ ; $ : +(lockKey);
keys.add(loH P f + 6 ~ y U `ckKeyRenew);
Long lockExpire = redisTemplate.execute(script, kP K D Reys, requestId, Longc ? d.toString(bizExpire));
if (null != l_ , x C dockExpire && lE ; G OockExpire > 0) {
// 将锁放入map
LockContent lockContentQ 2 j H e  = new LockContent();
lockContent.o 6 ! 8setStartTime(startTime);e @ O 3 1 a Z V K
lockContent.setLockExpirX Z E / * # J Xe(lockExpire);
lockContenP u wt.setExpireTN O N 5 7 eime(startTime +% i Z t ? t d lockExpi: f % U x ; & }re * 1000);
lockContent.setRequestId(reqv  NuestId);
loci / Z Q z kCont7 B ^ent.setThread(Thread.currentThread());
lockContent.t ^ _ { asetBizExpire(bizExV J @ 6pire);
lockContent.setLockCount(1^ : ,);
lockContentMap.put(lockKey, lockContent);
log.info("加锁成功, lockK` L W | A 7 C $ Sey ={}, requestId={}", lockKey, requestId)s U ; ^ $ v;
returU x . . En true;
}
}
// 重复获取锁,在线程池中因为线程复用,线程持平并不能确定是该线程的锁
if (Thread.currentTG C Ohrh ~ z u D Wead() == lockCq 8 u montentOld.getThread()
&& requestId.equals(lockConx 3 P ! h ^tentOld.getRequestId())){
// 计数 +1
lockContentOg p [ ) u Vld.setLockCount(lockContentOld.getLockCount()+1);
return true;
}
// 假设被锁或获取锁 z V  N失利,则等候100毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// 这儿用lombok 有问题
log.v c p 6 }error("获取redis 锁失利, lockKey ={}, requestId={}", lockKey, reqE a [ X Q =uestId, e);
return false;
}
}
}
/**
*K B t 解锁
*
* @param lockKey
* @param lO O N ; k X B Z gockValue
*/
public booleanc S q _ 0 t ^ unlock(String lockKey8 s A 9, StrinV V V A a } }g lockValue) {
String lockKeyg | 4 C   W /Renew = li % VockKey + "_renew";
LockContent lockContent = lockContentMap.gN J ! 4 % D $et(lockKej I Zy);
long consumeTim^ Q e Q ne;
if (null == lockContg O # O P J (ent) {
con5 / 1sumeTime = 0L;
} el5 x % n 5 ` C } Cse if (lockValue.equals(lock0 K & : E _ #Content.getRequestId())) {
int lockCount = lockContent.getLockCount();
// 每次开释锁, 计数 -1,减到0时删去redn /  4is上的key
if (--lockCount > 0) {
lockContent.setLockCount(ls 1 E .ockCount);
r_ 2 :eturn false;
}
consumeTime = (System.currentD 0 fTime@ m ` # . @ UMillis() - lockContent.gv 4 v getSD 5 0tartTime()) / 1000;
} else {
log.info("开释h I k } w @ w锁失利{ q v ^ ? B,不是自己的锁。t 8 A P : E = V :");
return false;
}
// 删去已完结key,先删去本地缓存,削减redis压力, 分布式锁,只有一个,所以9 $ m I , r ; N这儿不a $ }加锁
lockContentMap.remov A : 9 e S P /e(lockKey);
RedisScript<Long> script = RedisScript.of(UR h @NLOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
k; & @ Reys.add(lockKey);
keys.add(lockKeyRm e m Senew);
LonN 7 2 B 5 G l dg resN o 7ult = redisTemplate.execute(script, k{ % S .eys, lockValue, Long.toString(consumeTime),
Long.toString(lockContent.getBizExpire()));
return EXEC_SUCCESS.equals(result);
}
/**
* 续约
*
* @param lockKey
* @param lockContent
* @return true:续约成功,false:续约失利(1、续约期间履行完结,锁被开释 2、不是自己的锁,3、续约期间锁过期了(未处理))
*/
public boolean renew~ X / . 5 M Q(String lockKey, LockContenti n - lockContent) {
// 检测履- 4 J行事务线程的状态
Thread.State state = lockContent.getThrea_ ^ / 0 S )d().ge. ! K 1 etState();
if (4 P ~ )Thread.State.TERMIN8 Z I 0 s )AV j Z D O 9 z aTED == state) {
log.info("履行事务的V C B D o 9 W线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockConteI G Q ^ w 7nt);
return false;
}i j % C f 5 ) J
String_ - T ~ R v k C requestId = lockContent.4 E $ { O ggetRequeD h r I W r h BstId();
long timeOut = (lockCoj 2 )ntent.get7 O / . / ;ExpireTime() - lockContent.getStartTime()) / 1000;
RedisScript<Long> scriptv h L G # n = RedisScript.of(RENEW_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
log.info("续约结果,True成A # X U Z (功,F0 l t * I Z % Yalse失利 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
ret[ f w # D %urn EXEC_SUCCESS.equals(result);
}
sy D # X S W  Ftatic class ScheduleExecuto: B G 4  $ .r {
public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
long delay = unit.toMillis(initialDelay);
long period_ = unit.toMillis(period);
// 守时履行
new Timer("Lock-Renew-Task").schedule(tas/ R ! M H k, delay, period_);
}
}
static class ScheduleTask extends Tiv ; & PmJ 9 G /  8erTask {
private final RedisDistributionLockPlus redisDistril b N Z !butionLock;
private final Map<String, LockCont( ; 6 d 5 w @ ment> lockContentMap;
public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockCont=  H u a / ] Oent> lockContentMap) {
this.redisDistributionLock = redisDistributionLock;
this.lockContentMap = lockContentMap;
}
@Override
public void run() {
if (lockContentMap.isEmpty()) {
return;
}
Set<MapK L _.Entry<c c b ^ ? @ )String, LockContent>> entries = lockContentMap.entN K BrySet();
for (Map.Entry<String, LockConX | T Z _ m Itent> entry : entries) {q & , h Z
String lockKey = entry.getKey();
LockContent lockContent = entry.getValue();
long expireTime = lockContent.getExpireTime();
// 削k T ? } a 2 | L `减线程池中使命数量
if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
//线程池异步续约
ThreadPool.submit(() -> {
boolean renew = redisDistributionLock.renI } o | V j % 5 =ew(lockKey, lockContent);
if (renew)C B V ; e 4 u I {
long expi! = ~ @reTimeNew = lockw v 5Content.getStartTime() + (expireTime - lockC{ , T ;ontent.getStartTime()) * 2 - TIME_SECOND+ 3 P a ^ I @ a wS_FIi R 4VE * 1000;
lockCS E vonteng / 5 L /t.setExpireTime(expireTimeNew);
} else {
//l k L W = 续约失利,阐明现已履行P g H 6 q ( Y r S完 OR redis 呈现问题
lockF 6 2 ; 4 UContentMap.remove(lo: J k Z B _ckKey);
}
});
}
}
}
}
}

五、redis主从复制的坑

redis高可用最常见的方案便是主从复制(master-slave),这种形式也给redis分布式锁挖了一坑。

redis cluster集群环境下,假设现在A客户端想要加锁,它{ @ a a w F o 7 ^会根据路由规矩选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。

假设此刻redis master节点宕机,为保证集群可用性,会进行主备切换{ r I c S 7 dslave变为了redis masterB客户端在新的master节点上加锁成j z 8 | ; 5功,而A客户端也认为自己仍是成功加了锁的。

T F e # b w } W刻就会导` 5 8 0致同一时刻内多个客户端对一个分布式锁完结了加锁,导致各种脏数据的发生。

至于处理办法嘛,现在看还没有什么根治的办法,只能尽量保证机器的y , S $ E稳定性,削减发生此事件的概率。

总结

上面便是6 4我在运用Redis 分布式锁时遇到的一些坑,有5 9 I |点小慨叹,经常用一个办法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的处理方案,哪有什么银弹,只不过是在权衡利弊J 3 G t v K ? w后,选一个E # 6 D n g f 0在承受范围内的折中 5 M k方案而已。


小福利:

重视我的公号,回复【666】,几百本各类技术电子书相送,嘘~免费 送给咱们6 [ u S P [ z,无B v q : }套路自行收取

部门老大:redis 分布式锁再这么用,我就劝退你

发表评论

提供最优质的资源集合

立即查看 了解详情