做过企业微信开发的同学应该知道,企业微信有一个很厌烦的报错–接口并发超越约束(45033)。报错的原因呢便是由于有多个线程在一起调用企业微信的接口,为了不让接口调用一向报错,我的服务就也要有一个接口并发操控体系。

首要想到的便是用线程池来完成

只用了半个Redisson的Semaphore实现并发控制

刚开始看起来好像没什么问题,可是企业微信的接口之间并发约束是互相阻隔的,那就意味着,我需要创立几十个线程池。。。你以为这就完了?其实不同企业用户调用的时分又是互相阻隔的,所以,我需要为每个企业。。。。

只用了半个Redisson的Semaphore实现并发控制

线程池肯定是用不了了,我猛然想起前天晚上做梦,大师好像跟我讲了信号量能够用来操控并发,考虑到线上环境有多个节点,一起Redisson也供给了Semaphore,打算用一下试试。想想间隔上次运用信号量已经过去了∞天(究竟我从来都没有用过),我直接翻开IDEA,先用java的试试水,public static void main一气呵成

public static void main(String[] args) {
    //创立一个信号量
    Semaphore semaphore = new Semaphore(1);
    //假如没获取到就抛出反常
    if (!semaphore.tryAcquire()) {
        throw new RuntimeException();
    }
    //try不能括到上面,获取不到的时分是不需要开释的
    try{
        //做一些事务
        System.out.println("do something");
    }finally {
        //开释掉
        semaphore.release();
    }
}

又检查了一下逻辑,看起来好像没什么问题,然后我突发奇想,假如我直接release()会怎么样

Semaphore semaphore = new Semaphore(1);
semaphore.release();
System.out.println(semaphore.availablePermits());

只用了半个Redisson的Semaphore实现并发控制

。。。0.0

此刻我的心态出现了亿点变化,我意识到,其实初始化的new Semaphore(1)仅仅设置了一个初始值,并不是一个最大值,直接调用release(),可获取信号量的值会直接+1,这时我赶忙用Redisson的api redissonClient.getSemaphore("test"); (其他代码和上面的相同)试了一下,假如呢

果然作用和Java的是一模相同。

不过,问题好像也不是很大?假如我确保事务在拿到信号后必定会在finally{..}中开释掉,并且semaphore.trySetPermits(1);这个办法只初始化一次,并且必定是单线程,那其实就没有问题。

只用了半个Redisson的Semaphore实现并发控制

企业微信的这个并发约束,没有明文规定,那上线之后,大概是要在违法的边缘多试探试探的,所以就涉及到更新信号量的问题了

只用了半个Redisson的Semaphore实现并发控制

能够看到,假如在多线程的情况下,轻率更新信号量,由于模型没有最大值的概念,更新后的信号量总量比起期望值是只多不少的。

问题首要还是出在release()这个办法没有上限概念上,那我来弄一个有上限的release办法就好了。此刻第一个思路便是继承RedissonSemaphore这个类重写一下release(),可是后来发现自己的子类无法用redissonClient初始化出来,计划就pass掉了。所谓工作时间长了,什么都敢干,我直接翻开源码,我只需仿照着写个相同的是不就行了!

只用了半个Redisson的Semaphore实现并发控制

public RFuture<Void> releaseAsync(int permits) {
    ...
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    ...
}

好家伙,核心逻辑其实就一行lua脚本redis.call('incrby', KEYS[1], ARGV[1]);,我只需一个判别上限的就好了。虽然之前也没写过lua脚本,可是好在逻辑不杂乱,只需要一个if判别就行了,redisson源码中有许多这样的判别,现学现卖就能够了。

另外,鉴于面向目标的思想,假如我自己工具类写一个release()办法,并让这个办法直接出现在事务中多少有一些不妥。所以新建了一个目标,封装一下获取和开释两个办法,之后项目中所有信号量的相关事务,都用这个目标,redisson的信号量概念就不会再出现在具体的事务中了,能够进步事务代码的可读性,也会信号量相关操作的修改操控在这个类中。

经过了一番折腾,总算是把功能完成了。部分代码如下,tryAcquire()办法还是运用redisson供给的,可是release()办法用了自己完成的~

public class SemaphoreModel {
    /**
     * 信号量本体
     */
    private RedissonSemaphore semaphore;
    private RedissonClient redissonClient;
    /**
     * 获取到信号的mark
     */
    private boolean mark = false;
    public void release() {
         //获取信号量的值
        String script = "local value = redis.call('get', '" + semaphore.getRawName() + "'); " +
                //假如没有设置过(false意味着undefind)或者当时值小于5(我设置的最大值)就做release的逻辑
                "if (value ~= false and tonumber(value) < 5) then " +
                //信号量加一
                "local val = redis.call('incrby', '" + semaphore.getRawName() + "', 1); " +
                "redis.call('publish', '" + RedissonSemaphore.getChannelName(semaphore.getRawName()) + "', val); " +
                "return 1; " +
                "end; " +
                "return 0;";
        redissonClient.getScript().eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.VALUE);
    }
    /**
     * 获取一个信号量
     */
    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }
}

经过了这样的规划,就造出了一个有上限的信号量,问题也都处理了。

本文的内容到这里就结束了,假如有不对的地方,欢迎在评论区指出,觉得有用的话别忘了点个赞哦~