for update

select column from table where column = ... for update

在select的sql上加上for update会对此记录加上行级锁,在超时,提交,回滚会进行开释。

缺陷

  1. 当恳求等候锁开释时,不能灵敏的操控加锁时刻、等候锁的时刻
  2. 假如在一个事务中,开始的时分就运用for update的话,则需要这个事务履行完提交或回滚才能够解锁,不能很好的操控锁的粒度,并发性会降低。
  3. 在Repeatable Read的隔离级别下有可能会产生死锁。www.cnblogs.com/micrari/p/8…

项目中的redis锁

public ResultMap<IDCardOCRVo> IDCardOCR(IDCardOCRDto dto){
  //部分省掉。。。
  //经过redis防重提交
		Boolean ifAbsent = stringRedisTemplate.opsForValue().setIfAbsent(userId, "1");
		if (ifAbsent) {
			stringRedisTemplate.expire(userId, 15, TimeUnit.SECONDS);
		}else {
			throw new BusinessException(ResultCode.NOT_FREQUENTLY_OPERATE);
		}
}

假如履行到if (ifAbsent)服务挂掉,那么这个userId就会一向存在redis中,其他恳求一向获取不到,相当于死锁。

Redisson

地址

github.com/redisson/re…

特点

Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格结构, 充分利用 Redis 键值数据库供给的一系列优势, 基于 Java 实用工具包中常用接口, 为运用者供给了 一系列具有分布式特性的常用工具类

  1. 指定一个 key 作为锁标记,存入 Redis 中,指定一个 仅有的用户标识 作为 value。
  2. 当 key 不存在时才能设置值,确保同一时刻只有一个客户端进程取得锁,满意 互斥性 特性。
  3. 设置一个过期时刻,防止因体系反常导致没能删除这个 key,满意 防死锁 特性。
  4. 当处理完事务之后需要铲除这个 key 来开释锁,铲除 key 时需要校验 value 值,需要满意 只有加锁的人才能开释锁。
  5. WatchDog 机制 能够很好的解决锁续期的问题,防备死锁。
  6. 能够灵敏的设置加锁时刻,等候锁时刻,开释锁失利后锁的存在时刻。

流程图

超详细的Redisson实现分布式锁原理解析

原理

构建进程

org.redisson.Redisson#getLock

public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //异步处理的指令履行器
        this.commandExecutor = commandExecutor;
        //生成仅有id
        this.id = commandExecutor.getConnectionManager().getId();
        //锁存活时刻,默许30s
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        //将id和事务key拼接,作为实际的key
        this.entryName = id + ":" + name;
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

加锁进程

org.redisson.RedissonLock#lock()

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }
            // waiting for message
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }
        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

咱们直接调用的lock办法,这时leaseTime为-1,不履行if分支。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

这时leaseTime为默许的30s,这段lua的履行是要点:

  1. 首要呢,他先用exists指令判别了待获取锁的key anyLock 存不存在,假如不存在,就运用hset指令将锁key testlock作为key的map结构中存入一对键值对,4afd01d9-48e8-4341-9358-19f0507a9dcc:397 1
  2. 一起还运用了pexpire指令给anyLock设置了过期时刻30000毫秒,然后回来为空;
  3. 假如anyLock已经存在了,会走另一个分支,此时会判别anyLock Map中是否存在37f75873-494a-439c-a0ed-f102bc2f3204:1,假如存在的话,就调用hincrby指令自增这个key的值,而且将anyLock的过期时刻设置为30000毫秒,而且回来空。
  4. 假如上面俩种情况都不是,那么就回来这个anyLock的剩余存活时刻。

脚本也能够确保履行指令的原子性。然后呢就直接回来了一个RFuture ttlRemainingFuture,而且给他加了一个监听器,假如当时的这个异步加锁的过程完结的时分调用,假如履行成功,就直接同步获取一个Long类型的ttlRemaining。经过加锁的lua脚本可知,假如加锁或许重入锁成功的话会发现TTLRemaining是为null的,那么就会履行下面的这一行代码,咱们能够看到注释 锁已取得。

// lock acquired
if (ttlRemaining == null) {
  scheduleExpirationRenewal(threadId);
}

以上咱们剖析了redisson加锁的进程,总结来说,流程不复杂,代码也很直观,首要是异步经过lua脚本履行了加锁的逻辑。

看门狗机制

其间,咱们注意到了一些细节,比如 RedissonLock中的变量internalLockLeaseTime,默许值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默许值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描绘这一机制,那么看门狗到底做了什么,为什么怎样做呢?下面咱们就来剖析和探讨一下。

加锁成功后的问题

  1. 假设在一个分布式环境下,多个服务实例恳求获取锁,其间服务实例1成功获取到了锁,在履行事务逻辑的进程中,服务实例忽然挂掉了或许hang住了,那么这个锁会不会开释,什么时分开释?
  2. 回答这个问题,天然想起来之前咱们剖析的lua脚本,其间第一次加锁的时分运用pexpire给锁key设置了过期时刻,默许30000毫秒,由此来看假如服务实例宕机了,锁终究也会开释,其他服务实例也是能够持续获取到锁履行事务。可是要是30000毫秒之后呢,要是服务实例1没有宕机可是事务履行还没有完毕,所开释掉了就会导致线程问题,这个redisson是怎样解决的呢?这个就一定要完结主动延伸锁有效期的机制。

之前,咱们剖析到异步履行完lua脚本履行完结之后,设置了一个监听器,来处理异步履行完毕之后的一些作业

private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
}
  1. 首要,会先判别在expirationRenewalMap中是否存在了entryName,这是个map结构,首要还是判别在这个服务实例中的加锁客户端的锁key是否存在,假如已经存在了,就直接回来;第一次加锁,肯定是不存在的。
  2. 接下来便是搞了一个TimeTask,推迟internalLockLeaseTime/3之后履行,这儿就用到了文章一开始就提到奇妙的变量,算下来便是大约10秒钟履行一次,调用了一个异步履行的办法,renewExpirationAsync办法,也是调用异步履行了一段lua脚本
private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
}

首要判别这个锁key的map结构中是否存在对应的4afd01d9-48e8-4341-9358-19f0507a9dcc:397,假如存在,就直接调用pexpire指令设置锁key的过期时刻,默许30000毫秒。

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
}
  1. 在上面使命调度的办法中,也是异步履行而且设置了一个监听器,在操作履行成功之后,会回调这个办法,假如调用失利会打一个错误日志并回来,更新锁过期时刻失利;
  2. 然后获取异步履行的结果,假如为true,就会调用自身,如此说来又会推迟10秒钟去履行这段逻辑,所以,这段逻辑在你成功获取到锁之后,会每隔十秒钟去履行一次,而且,在锁key还没有失效的情况下,会把锁的过期时刻持续延伸到30000毫秒,也便是说只要这台服务实例没有挂掉,而且没有主动开释锁,看门狗都会每隔十秒给你续约一下,确保锁一向在你手中。完美的操作。

其他实例没有取得锁的进程

这时假如有其他服务实例来测验加锁又会产生什么情况呢?或许当时客户端的其他线程来获取锁呢?很显然,肯定会阻塞住,咱们来经过代码看看是怎样做到的。还是把眼光放到之前剖析的那段加锁lua代码上。

当加锁的锁key存在的时分而且锁key对应的map结构中当时客户端的仅有key也存在时,会去调用hincrby指令,将仅有key的值自增一,而且会pexpire设置key的过期时刻为30000毫秒,然后回来nil,能够幻想这儿也是加锁成功的,也会持续去履行守时调度使命,完结锁key过期时刻的续约,这儿呢,就完结了锁的可重入性。

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

那么当以上这种情况也没有产生呢,这儿就会直接回来当时锁的剩余有效期,相应的也不会去履行续约逻辑。此时一向回来到上面的办法:

假如加锁成功就直接回来,不然就会进入一个死循环,去测验加锁,而且也会在等候一段时刻之后一向循环测验加锁,阻塞住,直到第一个服务实例开释锁。对于不同的服务实例测验会获取一把锁,也和上面的逻辑相似,都是这样完结了锁的互斥。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    try {
        while (true) {
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }
            // waiting for message
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

开释锁

public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
}
public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        RFuture<Boolean> future = unlockInnerAsync(threadId);
        future.onComplete((opStatus, e) -> {
            cancelExpirationRenewal(threadId);
            if (e != null) {
                result.tryFailure(e);
                return;
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            result.trySuccess(null);
        });
        return result;
}

判别当时客户端对应的仅有key的值是否存在,假如不存在就会回来nil;不然,值自增-1,判别仅有key的值是否大于零,假如大于零,则回来0;不然删除当时锁key,并回来1。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

回来到上一层办法,也是针对回来值进行了操作,假如回来值是1,则会去撤销之前的守时续约使命,假如失利了,则会做一些相似设置状态的操作。

void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
}

现在来说,redis分布式锁,redisson去加锁,也便是去redis集群中挑选一台master实例去完结锁机制,而且能由于一台master可能会挂载多台slave实例,这样也就完结了高可用性。可是呢,不得不去考虑,假如master和salve同步的进程中,master宕机了,偏偏在这之前某个服务实例刚刚写入了一把锁,这时分就为难了,salve还没有同步到这把锁,就被切换成了master,那么这时分能够说就有问题了,另一个服务实例在新的master上获取到一把新锁,这时分就会呈现俩台服务实例都持有锁,履行事务逻辑的场景,这个是有问题的。也是在出产环境中咱们需要去考虑的一个问题。

参考资料

  • blog.csdn.net/ice24for/ar…
  • blog.csdn.net/ice24for/ar…
  • mp.weixin.qq.com/s?__biz=MzU…