引子
假如设计一个客户端埋点上报库,日志的完整性、高效传输、日志的及时性都是需求考量的点。
其间“高效传输”除了采用更高效的序列化计划、压缩日志、还包括减少通信次数。若每产生一条日志就上报一次就糟蹋流量了。一般的做法是“批量上报”,即先将日志堆积在内存中,数量抵达阈值时才触发一次上报。
批量上传 V1.0
假定埋点上报的完成如下:
object EasyLog {
var maxSize = 50
// 用于堆积日志的列表
private val logs = mutableListOf<Any>()
fun log(any: Any){
logs.add(any)
if(logs.size() >= maxSize) {
uploadLogs(logs)
logs.clear()
}
}
}
这样完成存在多线程安全问题,当log()被多线程并发拜访时,共享变量logs并不是线程安全的。在多线程环境下调用 ArrayList.add() 会产生数据损坏,由于 ArrayList.add() 完成如下:
public boolean add(E e) {
ensureCapacityInternal(size + 1);
elementData[size++] = e;
return true;
}
其间的前两句都不是线程安全的。
榜首句是扩容,重新申请内存并将原数组复制至新数组。假如两个线程发现容量缺乏一起进行扩容,由于复制数组进程不是原子的,若被打断,则已复制的内容或许会被掩盖。
第二句是索引自增,++ 操作也不是原子的,多线程环境下或许产生现有数据被掩盖。
运用@Synchronized
注解能够处理该问题:
object EasyLog {
@Synchronized
fun log(any: Any){}
}
相当于为整段代码套上 synchronized,同一瞬间只要一个线程能够输出日志。
性能更好的做法是运用线程安全的容器,比如ConcurrentLinkedQueue
,它运用无锁机制完成线程安全的并发读写,关于它源码等级的分析能够点击面试题 | 徒手写一个非堵塞线程安全行列。
批量上传 V2.0 —— 调控日志出产消费速度
埋点上报场景中,日志的出产的速度远大于消费速度(上传是耗时操作)。这样用于堆积日志的容器或许无限增长,内存有爆破的风险。
得运用一种机制调控日志出产和消费速度。比如,丢掉新/旧日志、暂停出产。
Kotlin 中的 Channel 提供了需求的一切功用,包括线程安全以及调控出产消费速度。
运用 Channel 重构如下:
object EasyLog {
// 容量为50的 Channel
private val channel = Channel<Any>(50)
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var maxSize = 50
private val logs = mutableListOf<Any>()
init {
//新起协程作为日志顾客
scope.launch { channel.consumeEach { innerLog(it) } }
}
fun log(any: Any){
// 新起协程作为日志出产者
scope.launch { channel.send(any) }
}
private fun innerLog(any: Any){
logs.add(any)// 堆积日志
if(logs.size() >= maxSize) {// 日志数量超阈值后上传
uploadLogs(logs)
logs.clear()
}
}
}
每一条新日志都会转发到 Channel 上,在 Channel 另一头有一个独自的协程消费日志。
Channel 就像一个行列,出产者队尾刺进,顾客队头取出。它是一个线程安全的容器,多线程并发写没有问题,并且现在只要一个顾客,所以消费日志的代码不会有线程安全问题。就比如四面八方涌入的购票者只能在一个窗口前排队。将并行问题串行化是完成线程安全的一种办法。
Channel 的结构办法可传入三个参数:
public fun <E> Channel(
// 缓存巨细
capacity: Int = RENDEZVOUS,
// 溢出战略
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
// 如何处理未传递元素
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
缓冲巨细表明出产速度大于消费速度时最多缓存的元素数。当缓存满后持续产生的元素会触发溢出战略,默许战略是BufferOverflow.SUSPEND
,表明挂起出产者,而不是堵塞,出产者线程能够持续运转。这是 Channel 相较于 Java 堵塞行列的优势。
批量上传 V3.0 —— 推迟去小尾巴
若客户端产生了49条日志,运用被杀,那这49条日志就丢了,由于还未抵达50条上传阈值。为了确保日志的完整性,不得不对每一条日志进行耐久化。然后在下一次运用发动时从磁盘读取并上传之。
这样仍然不能满足日志的及时性,比如该用户一周之后才发动运用。
需求一种机制及时处理日志的小尾巴(未达批量阈值的日志):当每一条日志抵达时,敞开倒计时,假如倒计时归零前无新日志请求,则将已堆积日志批量上传,不然关闭前一个倒计时,敞开新的倒计时。
日志&埋点&上报(一)中运用 Handler 完成了这套机制。
这次换用协程完成:
object EasyLog {
// 容量为50的 Channel
private val channel = Channel<Any>(50)
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var maxSize = 50
private val logs = mutableListOf<Any>()
// 冲刷job
private var flushJob: Job? = null
init {
scope.launch { channel.consumeEach { innerLog(it) } }
}
fun log(any: Any){
scope.launch { channel.send(any) }
}
private fun innerLog(any: Any){
logs.add(log)
flushJob?.cancel() // 撤销上一次倒计时
// 若日志数量抵达阈值,则直接冲刷,不然推迟冲刷
if (logs.size() >= maxSize) {
flush()
} else {
flushJob = delayFlush()
}
}
// 冲刷:上传内存中堆积的批量日志
private fun flush() {
uploadLogs(logs)
logs.clear()
}
// 推迟冲刷
private fun delayFlush() = scope.launch {
delay(5000)// 推迟5秒,假如没有新日志产生,则冲刷
flush()
}
}
批量上报 V4.0 —— 并行问题串行化
上述代码存在多线程安全问题。
由于推迟冲刷时新起一个协程,导致 logs.clear() 和 logs.add() 或许并发履行。
为了复现这个多线程问题,写了一个压测demo:
// 线程安全的自增 Int 值
val count = AtomicInteger(0)
// 用于验证多线程问题的 Int 数组,初始值都是 -1
private val testArray = Array(100) { -1 }
val list = mutableListOf<Int>()
var flushJob: Job? = null
// 模仿并产出产日志
repeat(100) {
scope.launch(Dispatchers.Default) {
delay((50L..10_00L).random())
channel.send(count.getAndIncrement())
}
}
// 模仿串行消费日志
scope.launch {
channel.consumeEach {
delay((50L..200L).random())
log(it)
}
}
//模仿日志库进口办法
private fun log(value: Int) {
list.add(value)
flushJob?.cancel()
if (list.size >= 5) {// 5条为一批
flush()
} else {
flushJob = delayFlush()
}
}
// 推迟冲刷
fun delayFlush() = scope.launch {
delay(100)
flush()
}
// 冲刷
private fun flush() {
// 以批量日志的值赋值测验数组
list.forEach { testArray[it.value] = it.value }
list.clear()
}
检测办法是并发输出100条日志(内容为一个自增序号0-99),一起声明一个巨细为100的数组,一切元素初始化为-1,每当日志被批量处理时将序号对应的数组索引值置为序号值。一切日志处理完毕后,检测数组中是否有-1,若有则表明复现多线程问题,检测代码如下:
val isOk = testArray.any { it == -1 }
若数组中一切的值都不是 -1,则表明 0-100 的日志都被正确地处理了。
运转 demo,isOk 是 false。。。。持续运转了 n 次,居然全都是 false。。。。开端置疑我对多线程的认知,然后就问 ChatGPT:“理论上这段代码会产生线程安全问题,但是我的测验代码没有复现这个问题,如何才能复现?”
这段代码中的线程安全问题或许不容易复现,由于涉及到多个线程和协程一起运转,并且推迟时刻是随机的。要复现这个问题,能够尝试以下办法:
- 添加重复次数:添加 repeat(100) 中的数字,这将添加线程安全问题产生的或许性。
- 减小推迟时刻规模:将 delay((50L..10_00L).random()) 和 delay((50L..200L).random()) 中的推迟时刻规模减小,以便让线程和协程更密集地运转。
虽然这些办法能够添加线程安全问题产生的或许性,但并不能保证一定能复现问题。线程安全问题的复现一般取决于操作系统、硬件和运转时环境等多种要素。
听了他的建议调整了 delay 的时刻间隔,并添加次数到1000次,多线程问题显现了:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.next(ArrayList.java:860)
at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.flush(ConcurrentInitActivity.kt:134)
at test.taylor.com.taylorcode.concurrent.ConcurrentInitActivity.log(ConcurrentInitActivity.kt:66)
在遍历列表的一起增删列表元素就会产生这个问题。即便没有产生上述 crash,也或许会打印出 isOk=true,表明array中有元素为-1。
运用线程安全的ConcurrentLinkedQueue
作为容器,仍然无法处理问题,由于它的clear()
办法是弱一致性的,它需求遍历整个行列,但遍历是基于快照机制的。
最终还是运用 “将并行问题串行化” 来处理这个多线程问题:
// 单线程 Dispatcher
val logDispatcher = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
// 在单线程上推迟冲刷
fun delayFlush() = scope.launch(logDispatcher) {
delay(50)
flush()
}
// 在单线程上消费日志
scope.launch(logDispatcher) {
channel.consumeEach {
delay((25L..100L).random())
log(it)
}
}
构建一个独自的线程,使得日志的消费和冲刷都在该线程进行。
单线程会降低性能吗? 不会,由于推迟冲刷是挂起剩下的代码,而不会堵塞线程。在单线程上推迟冲刷就比如运用 Handler.postDelay() 将冲刷逻辑排到主线程音讯行列的末尾。
引荐阅览
每次调试打印日志都很头痛
客户端日志&埋点&上报的接口设计
客户端日志&埋点&上报的性能优化