引子

假如设计一个客户端埋点上报库,日志的完整性、高效传输、日志的及时性都是需求考量的点。

其间“高效传输”除了采用更高效的序列化计划、压缩日志、还包括减少通信次数。若每产生一条日志就上报一次就糟蹋流量了。一般的做法是“批量上报”,即先将日志堆积在内存中,数量抵达阈值时才触发一次上报。

批量上传 V1.0

假定埋点上报的完成如下:

object EasyLog {
    var maxSize = 50
    // 用于堆积日志的列表
    private val logs = mutableListOf<Any>()
    fun log(any: Any){
        logs.add(any)
        if(logs.size() >= maxSize) {
            uploadLogs(logs)
            logs.clear()
        }
    }
}

这样完成存在多线程安全问题,当log()被多线程并发拜访时,共享变量logs并不是线程安全的。在多线程环境下调用 ArrayList.add() 会产生数据损坏,由于 ArrayList.add() 完成如下:

public boolean add(E e) {
    ensureCapacityInternal(size + 1);
    elementData[size++] = e;
    return true;
}

其间的前两句都不是线程安全的。

榜首句是扩容,重新申请内存并将原数组复制至新数组。假如两个线程发现容量缺乏一起进行扩容,由于复制数组进程不是原子的,若被打断,则已复制的内容或许会被掩盖。

第二句是索引自增,++ 操作也不是原子的,多线程环境下或许产生现有数据被掩盖。

运用@Synchronized注解能够处理该问题:

object EasyLog {
    @Synchronized
    fun log(any: Any){}
}

相当于为整段代码套上 synchronized,同一瞬间只要一个线程能够输出日志。

性能更好的做法是运用线程安全的容器,比如ConcurrentLinkedQueue,它运用无锁机制完成线程安全的并发读写,关于它源码等级的分析能够点击面试题 | 徒手写一个非堵塞线程安全行列。

批量上传 V2.0 —— 调控日志出产消费速度

埋点上报场景中,日志的出产的速度远大于消费速度(上传是耗时操作)。这样用于堆积日志的容器或许无限增长,内存有爆破的风险。

得运用一种机制调控日志出产和消费速度。比如,丢掉新/旧日志、暂停出产。

Kotlin 中的 Channel 提供了需求的一切功用,包括线程安全以及调控出产消费速度。

运用 Channel 重构如下:

object EasyLog {
    // 容量为50的 Channel
    private val channel = Channel<Any>(50)
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
    private var maxSize = 50
    private val logs = mutableListOf<Any>()
    init {
        //新起协程作为日志顾客
        scope.launch { channel.consumeEach { innerLog(it) } }
    }
    fun log(any: Any){
        // 新起协程作为日志出产者
        scope.launch { channel.send(any) }
    }
    private fun innerLog(any: Any){
        logs.add(any)// 堆积日志
        if(logs.size() >= maxSize) {// 日志数量超阈值后上传
            uploadLogs(logs)
            logs.clear()
        }
    }
}

每一条新日志都会转发到 Channel 上,在 Channel 另一头有一个独自的协程消费日志。

Channel 就像一个行列,出产者队尾刺进,顾客队头取出。它是一个线程安全的容器,多线程并发写没有问题,并且现在只要一个顾客,所以消费日志的代码不会有线程安全问题。就比如四面八方涌入的购票者只能在一个窗口前排队。将并行问题串行化是完成线程安全的一种办法。

Channel 的结构办法可传入三个参数:

public fun <E> Channel(
    // 缓存巨细
    capacity: Int = RENDEZVOUS,
    // 溢出战略
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    // 如何处理未传递元素
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

缓冲巨细表明出产速度大于消费速度时最多缓存的元素数。当缓存满后持续产生的元素会触发溢出战略,默许战略是BufferOverflow.SUSPEND,表明挂起出产者,而不是堵塞,出产者线程能够持续运转。这是 Channel 相较于 Java 堵塞行列的优势。

批量上传 V3.0 —— 推迟去小尾巴

若客户端产生了49条日志,运用被杀,那这49条日志就丢了,由于还未抵达50条上传阈值。为了确保日志的完整性,不得不对每一条日志进行耐久化。然后在下一次运用发动时从磁盘读取并上传之。

这样仍然不能满足日志的及时性,比如该用户一周之后才发动运用。

需求一种机制及时处理日志的小尾巴(未达批量阈值的日志):当每一条日志抵达时,敞开倒计时,假如倒计时归零前无新日志请求,则将已堆积日志批量上传,不然关闭前一个倒计时,敞开新的倒计时。

日志&埋点&上报(一)中运用 Handler 完成了这套机制。

这次换用协程完成:

object EasyLog {
    // 容量为50的 Channel
    private val channel = Channel<Any>(50)
    private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
    private var maxSize = 50
    private val logs = mutableListOf<Any>()
    // 冲刷job
    private var flushJob: Job? = null
    init {
        scope.launch { channel.consumeEach { innerLog(it) } }
    }
    fun log(any: Any){
        scope.launch { channel.send(any) }
    }
    private fun innerLog(any: Any){
        logs.add(log)
        flushJob?.cancel() // 撤销上一次倒计时
        // 若日志数量抵达阈值,则直接冲刷,不然推迟冲刷
        if (logs.size() >= maxSize) {
            flush()
        } else {
            flushJob = delayFlush()
        }
    }
    // 冲刷:上传内存中堆积的批量日志
    private fun flush() {
        uploadLogs(logs)
        logs.clear()
    }
    // 推迟冲刷
    private fun delayFlush() = scope.launch {
        delay(5000)// 推迟5秒,假如没有新日志产生,则冲刷
        flush()
    }
}

批量上报 V4.0 —— 并行问题串行化

上述代码存在多线程安全问题。

由于推迟冲刷时新起一个协程,导致 logs.clear() 和 logs.add() 或许并发履行。

为了复现这个多线程问题,写了一个压测demo:

// 线程安全的自增 Int 值
val count = AtomicInteger(0)
// 用于验证多线程问题的 Int 数组,初始值都是 -1
private val testArray = Array(100) { -1 }
val list = mutableListOf<Int>()
var flushJob: Job? = null
// 模仿并产出产日志
repeat(100) {
    scope.launch(Dispatchers.Default) {
        delay((50L..10_00L).random())
        channel.send(count.getAndIncrement())
    }
}
// 模仿串行消费日志
scope.launch {
    channel.consumeEach {
        delay((50L..200L).random())
        log(it)
    }
}
//模仿日志库进口办法
private fun log(value: Int) {
    list.add(value)
    flushJob?.cancel()
    if (list.size >= 5) {// 5条为一批
        flush()
    } else {
        flushJob = delayFlush()
    }
}
// 推迟冲刷
fun delayFlush() = scope.launch {
    delay(100)
    flush()
}
// 冲刷
private fun flush() {
    // 以批量日志的值赋值测验数组
    list.forEach { testArray[it.value] = it.value }
    list.clear()
}

检测办法是并发输出100条日志(内容为一个自增序号0-99),一起声明一个巨细为100的数组,一切元素初始化为-1,每当日志被批量处理时将序号对应的数组索引值置为序号值。一切日志处理完毕后,检测数组中是否有-1,若有则表明复现多线程问题,检测代码如下:

val isOk = testArray.any { it == -1 }

若数组中一切的值都不是 -1,则表明 0-100 的日志都被正确地处理了。

运转 demo,isOk 是 false。。。。持续运转了 n 次,居然全都是 false。。。。开端置疑我对多线程的认知,然后就问 ChatGPT:“理论上这段代码会产生线程安全问题,但是我的测验代码没有复现这个问题,如何才能复现?”

这段代码中的线程安全问题或许不容易复现,由于涉及到多个线程和协程一起运转,并且推迟时刻是随机的。要复现这个问题,能够尝试以下办法:

  1. 添加重复次数:添加 repeat(100) 中的数字,这将添加线程安全问题产生的或许性。
  2. 减小推迟时刻规模:将 delay((50L..10_00L).random()) 和 delay((50L..200L).random()) 中的推迟时刻规模减小,以便让线程和协程更密集地运转。

虽然这些办法能够添加线程安全问题产生的或许性,但并不能保证一定能复现问题。线程安全问题的复现一般取决于操作系统、硬件和运转时环境等多种要素。

听了他的建议调整了 delay 的时刻间隔,并添加次数到1000次,多线程问题显现了:

java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.next(ArrayList.java:860)
    at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.flush(ConcurrentInitActivity.kt:134)
    at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.log(ConcurrentInitActivity.kt:66)

在遍历列表的一起增删列表元素就会产生这个问题。即便没有产生上述 crash,也或许会打印出 isOk=true,表明array中有元素为-1。

运用线程安全的ConcurrentLinkedQueue作为容器,仍然无法处理问题,由于它的clear()办法是弱一致性的,它需求遍历整个行列,但遍历是基于快照机制的。

最终还是运用 “将并行问题串行化” 来处理这个多线程问题:

// 单线程 Dispatcher
val logDispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
// 在单线程上推迟冲刷
fun delayFlush() = scope.launch(logDispatcher) {
    delay(50)
    flush()
}
// 在单线程上消费日志
scope.launch(logDispatcher) {
    channel.consumeEach {
        delay((25L..100L).random())
        log(it)
    }
}

构建一个独自的线程,使得日志的消费和冲刷都在该线程进行。

单线程会降低性能吗? 不会,由于推迟冲刷是挂起剩下的代码,而不会堵塞线程。在单线程上推迟冲刷就比如运用 Handler.postDelay() 将冲刷逻辑排到主线程音讯行列的末尾。

引荐阅览

每次调试打印日志都很头痛

客户端日志&埋点&上报的接口设计

客户端日志&埋点&上报的性能优化