前言

最近在接手的某项目中,主管给我发来了一个留传以久的 BUG,让我看看排查一下,把它修复了。

项意图问题大概是在某项事务中,需求向数据库刺进数据,而且需求确保同种类型的数据只被刺进一次,可是现在却呈现了数据被重复刺进的状况。

我点开代码一看,上一个跑路的老哥写的十分慎重啊,判别重复的逻辑嵌套了一层又一层,先在本地数据库查询一次没有重复后又恳求服务器查询一次,最后在刺进前再查询本地数据库一次。一共写了三层判重逻辑。可是为什么仍是重复了呢?

再细看,哦,原来是用了协程异步查询啊,怪不得。

可是,不对啊,你不是用 Mutex 上锁了吗?怎样还会重复?

Mutex 你在干什么?你锁了什么?你看看你都守护了什么啊。

此刻的 Mutex 就像我一般,什么都守护不住。

可是,真的怪 Mutex 吗?这篇文章咱们就来浅析一下运用 Mutex 实现协程的并发或许导致失效的问题,为咱们厚道本份的 Mutex 洗清冤屈。

前置知识:关于协程和并发

众所周知,关于多线程程序,或许会呈现同步问题,例如,下面这个经典的比如:

fun main() {
    var count = 0
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                count++
            }
        }
    }
    println(count)
}

你们说,以上代码会输出什么?

我不知道,我也无法知道,没错,的确是这样的。

由于在上面的代码中,咱们循环 1000 次,每次都发动一个新的协程,然后在协程中对 count 进行自增操作。

问题就在于,咱们无法确保对 count 的操作是同步的,由于咱们不知道这些协程何时会被履行,也无法确保这些协程在履行时 count 的值没有被其他协程修正过。

这就导致,count 值终究会是不确定的。

另一个众所周知,kotlin 中的协程其实可以简略了解成对线程的封装,所以实践上不同的协程或许运转在同一个线程也或许运转在不同的线程。

咱们给上面的代码加一个打印地点线程:

fun main() {
    var count = 0
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                println("Running on ${Thread.currentThread().name}")
                count++
            }
        }
    }
    println(count)
}

截取其间一部分输出:

Running on DefaultDispatcher-worker-1
Running on DefaultDispatcher-worker-4
Running on DefaultDispatcher-worker-3
Running on DefaultDispatcher-worker-2
Running on DefaultDispatcher-worker-5
Running on DefaultDispatcher-worker-5
Running on DefaultDispatcher-worker-2
Running on DefaultDispatcher-worker-6
Running on DefaultDispatcher-worker-2
Running on DefaultDispatcher-worker-2
Running on DefaultDispatcher-worker-7
Running on DefaultDispatcher-worker-7
Running on DefaultDispatcher-worker-7
Running on DefaultDispatcher-worker-7
……

可以看到,不同的协程或许运转在不同的线程上,也或许同一个线程会被用来运转不同的协程。由于这个特性,所以协程也会存在多线程的并发问题。

那么,什么是并发呢?

简略了解,便是在同一个时间段内履行多个使命,此刻为了实现这个意图,不同的使命或许会被拆分开来穿插着履行。

与之对应的,还有一个并行的概念,简略说便是多个使命在同一个时间点一同履行:

Kotlin 协程中的并发问题:我明明用 mutex 上锁了,为什么没有用?

总归,不论是并行仍是并发,都会涉及到对资源的“抢夺”问题,由于在同一时间或许会有多个线程需求对同一个资源进行操作。此刻就会呈现上面举例的状况,由于多个线程都在对 count 进行操作,所以导致终究 count 的值会小于 1000,这也很好了解,比方此刻 count 是 1,被线程 1 读取到之后,线程 1 开端对它进行 +1 操作,可是在线程1还没写完的时候,来了个线程2,也读了一下 count 发现它是1,也对它进行 +1 操作。此刻,不论线程1和2谁先写完,终究 count 也只会是 2,显然,依照咱们的需求,应该是想让它是 3 才对。

那处理这个也简略啊,咱们就不要让有这么多线程不就行了,只需只有一个线程不就行了?

的确,咱们指定所有协程只在一个线程上履行:

fun main() {
    // 创建一个单线程上下文,并作为发动调度器
    val dispatcher = newSingleThreadContext("singleThread")
    var count = 0
    runBlocking {
        repeat(1000) {
            // 这儿也可以直接不指定调度器,这样就会运用默认的线程履行这个协程,换言之,都是在同一个线程履行
            launch(dispatcher) {
                println("Running on ${Thread.currentThread().name}")
                count++
            }
        }
    }
    println(count)
}

截取最后的输出成果如下:

……
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
Running on singleThread
1000
Process finished with exit code 0

可以看到,输出的 count 成果终所以正确了,那么,为什么还会有我这篇文章的问题呢?

哈哈,其实你被我绕进去了。

咱们用协程(线程)的意图是什么?不便是为了可以履行耗时使命或者可以让多个使命一起履行,削减履行时间吗?即然你都用单线程了,那么有什么含义?

究竟这儿咱们举例的代码只对 count 这一个变量进行操作,的确没有开多线程的必要,可是实践工作中肯定不止这么一个操作啊,莫非咱们要由于某个变量被其他线程占用了就不持续往下走了?就这么呆呆的堵塞住原地等待?显然不现实,醒醒吧,国际不是只有 count ,还有许多数据等待咱们处理。所以咱们用多线程的意图便是为了可以在某个变量(资源)不可用的时候可以去处理其他未被占用的资源,然后缩短总的履行时间。

可是,假如其他的代码履行到必定程度,绕不开有必要要运用被占用的资源怎样办?

不论正在占用的线程是否免除占用直接硬去拿这个资源持续处理?显然不现实,由于这样就会形成咱们前言中所述的状况发生。

所以假如咱们遇到需求运用被占用的资源时,应当暂停当时线程,直至占用被免除。

在 java 中通常有三种办法处理这个问题:

  1. synchronized
  2. AtomicInteger
  3. ReentrantLock

可是在 kotlin 的协程中运用它们不太适宜,由于协程对错堵塞式的,当咱们需求协程“暂停”的时候(如 delay(1000)),协程通常是被挂起,挂起的协程并不会堵塞它地点的线程,此刻这个线程就可以腾出身去履行其他的使命。

而在 java 中需求线程暂停时(如 Thread.sleep(1000)),通常便是直接堵塞这个线程,此刻这个线程就会被限制,直到堵塞结束。

在 kotlin 中,提供了一个轻量级的同步锁:Mutex

什么是 Mutex

Mutex 是在 kotlin 协程中用于替代 java 线程中 synchronizedReentrantLock 的类,用于为不应该被多个协程一起履行的代码上锁,例如为前面比如中的 count 自增代码上锁,这样可以确保它在同一时间点只会被一个协程履行,然后避免了由于多线程导致的数据修正问题。

Mutex 有两个核心办法: lock()unlock() ,别离用于上锁和解锁:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                println("Running on ${Thread.currentThread().name}")
                mutex.lock()
                count++
                mutex.unlock()
            }
        }
    }
    println(count)
}

上述代码输出截取如下:

……
Running on DefaultDispatcher-worker-47
Running on DefaultDispatcher-worker-20
Running on DefaultDispatcher-worker-38
Running on DefaultDispatcher-worker-15
Running on DefaultDispatcher-worker-14
Running on DefaultDispatcher-worker-19
Running on DefaultDispatcher-worker-48
1000
Process finished with exit code 0

可以看到,尽管协程运转在不同的线程,可是仍然可以正确的对 count 进行修正操作。

这是由于咱们在修正 count 值时调用了 mutex.lock() 此刻确保了之后的代码块仅答应被当时协程履行,直至调用 mutex.unlock() 免除了锁定,其他协程才能持续履行这个代码块。

Mutex 的 lockunlock 原理可以简略的了解成,当调用 lock 时,假如这个锁没有被其他协程持有则持有该锁,并履行后边的代码;假如这个锁已经被其他协程持有,则当时协程进入挂起状态,直至锁被开释,并拿到了锁。当被挂起时,它地点的线程并不会被堵塞,而是可以去履行其他的使命。详细的原理可以看看参考资料2。

在实践运用中,咱们一般不会直接运用 lock()unlock() ,由于假如在上锁后履行的代码中呈现异常的话,将会形成持有的锁永久不会被开释,此刻就会形成死锁,其他的协程将永久等待不到这个锁被开释,然后永久被挂起:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                try {
                    mutex.lock()
                    println("Running on ${Thread.currentThread().name}")
                    count++
                    count / 0
                    mutex.unlock()
                } catch (tr: Throwable) {
                    println(tr)
                }
            }
        }
    }
    println(count)
}

上述代码输出:

Running on DefaultDispatcher-worker-1
java.lang.ArithmeticException: / by zero

而且程序将会一向履行下去,无法停止。

其实要处理这个问题也很简略,咱们只需求加上 finally ,使这段代码不管是否履行成功都要开释掉锁即可:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                try {
                    mutex.lock()
                    println("Running on ${Thread.currentThread().name}")
                    count++
                    count / 0
                    mutex.unlock()
                } catch (tr: Throwable) {
                    println(tr)
                } finally {
                    mutex.unlock()
                }
            }
        }
    }
    println(count)
}

上述代码输出成果截取如下:

……
Running on DefaultDispatcher-worker-45
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-63
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-63
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-63
java.lang.ArithmeticException: / by zero
1000
Process finished with exit code 0

可以看到,尽管每个协程都报错了,可是程序是能履行结束的,不会被完全挂起不动。

其实这儿咱们可以直接运用 Mutex 的扩展函数 withLock

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        repeat(1000) {
            launch(Dispatchers.IO) {
                mutex.withLock {
                    try {
                        println("Running on ${Thread.currentThread().name}")
                        count++
                        count / 0
                    } catch (tr: Throwable) {
                        println(tr)
                    }
                }
            }
        }
    }
    println(count)
}

上述代码输出内容截取如下:

……
Running on DefaultDispatcher-worker-31
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-31
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-51
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-51
java.lang.ArithmeticException: / by zero
Running on DefaultDispatcher-worker-51
java.lang.ArithmeticException: / by zero
1000

可以看到,运用 withLock 后就不需求咱们自己处理上锁和解锁了,只需求把要确保只被一起履行一次的代码放进它的参数中的高阶函数里就行。

这儿看一下 withLock 的源码:

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    // ……
    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

其实也十分的简略,便是在履行咱们传入的 action 函数前调用 lock() 履行结束后在 finally 中调用 unlock()

说了这么多,或许读者想问了,你在这讲了半天,是不是偏题了啊?你的标题呢?怎样还不说?

别急别急,这不就来了吗?

为什么我都 mutex.withLock 了却没用呢?

回到咱们的标题和前言中的场景,为什么项目中分明运用了 mutex.Unlock 将查重代码上锁了,仍是会呈现重复刺进的状况?

我知道你很急,可是你别急,容我再给你看个比如:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        mutex.withLock {
            repeat(10000) {
                launch(Dispatchers.IO) {
                    count++
                }
            }
        }
    }
    println(count)
}

你猜这段代码能输出 10000 吗?再看一段代码:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        mutex.withLock {
            repeat(100) {
                launch(Dispatchers.IO) {
                    repeat(100) {
                        launch(Dispatchers.IO) {
                            count++
                        }
                    }
                }
            }
        }
    }
    println(count)
}

这段呢?你们猜能输出 10000 吗?

其实只需咱们略微想一想就知道,这个显然不或许输出 10000 啊。

尽管咱们在最顶层加了 mutex.lockWith 。可是,咱们却在其间新开了许多新的协程,这就意味着,事实上这个锁约等于没有加。

还记得咱们上面看过的 mutex.lockWith 的源码吗?

此处相当于刚 lock 上,发动了一个新协程,直接 unlock 了,可是实践需求上锁的代码应该是新发动的协程里边的代码啊。

所以,咱们在上锁时应该尽或许的缩小上锁的粒度,只给需求的代码上锁:

fun main() {
    var count = 0
    val mutex = Mutex()
    runBlocking {
        repeat(100) {
            launch(Dispatchers.IO) {
                repeat(100) {
                    launch(Dispatchers.IO) {
                        mutex.withLock {
                            count++
                        }
                    }
                }
            }
        }
    }
    println(count)
}

这儿,咱们需求上锁的其实便是对 count 的操作,所以咱们只需求把上锁代码加给 count++ 即可,运转代码,完美输出 10000 。

有了上面的铺垫,咱们再来看看我接手项意图简化代码原型:

fun main() {
    val mutex = Mutex()
    runBlocking { 
        mutex.withLock {
        	// 模拟一起调用了许屡次刺进函数
            insertData("1")
            insertData("1")
            insertData("1")
            insertData("1")
            insertData("1")
        }
    }
}
fun insertData(data: String) {
    CoroutineScope(Dispatchers.IO).launch {
        // 这儿写一些无关数据的事务逻辑
        // xxxxxxx
        // 这儿进行查重 查重成果 couldInsert
        if (couldInsert) {
            launch(Dispatchers.IO) { 
                // 这儿将数据刺进数据库
            }
        }
    }
}

你们猜,此刻数据库会被刺进几个 1

答案显然是无法预知,一二三四五次都有或许。

咱们来猜一猜,这哥们在写这段代码时的心路历程:


产品:这儿的刺进数据需求注意一个类型只让刺进一个数据啊
开发:好嘞,这还不简略,我在刺进前加个查重就行了
提测后
测试:开发兄弟,你这儿有问题啊,这个数据可以被重复刺进啊
开发:哦?我看看,哦,这儿查询数据库用了协程异步履行,那不便是并发问题吗?我搜搜看 kotlin 的协程这么处理并发,哦,用 mutex 啊,那简略啊。
所以开发一顿操作,直接在调用查重和刺进数据的最上级函数中加了个 mutex.withlock 将整个处理逻辑全部上锁。而且觉得这样就满有把握了,无忧无虑了,末端还不忘给 kotlin 点个赞,加锁居然这么方便,不像 java 还得自己写一堆处理代码。

那么,我是这么处理这个问题的呢?最好的处理方案,其实应该是可以将上锁粒度细化到详细的数据库操作的当地,可是还记得我上面说的吗,这个项目中嵌套了一层又一层的查询代码,想要在这其间刺进上锁代码显然不容易,我可不想由于往里边插一个锁直接导致整座大山坍毁。

所以我的选择是给每个 launch 了新协程的当地又加了一堆锁……

这座山,由于我,变得更高了,哈哈哈哈哈。

所以,其实并不是 mutex 有问题,有问题的仅仅运用的人算了。

参考资料

  1. Kotlin协程-并发安全的几种处理方案与功能对比
  2. Kotlin协程中的并发问题处理方案
  3. 协程 并发 同步 Mutex Actor
  4. 一文读懂并发与并行
  5. Shared mutable state and concurrency

本文正在参加「金石计划」