布景

Apache Bookkeeper 是根据日志的一个持久化体系,一切的数据会以日志的形式存储到 Ledger 磁盘的 Entry Log 文件中,之后经过后台异步收回的形式来将 EntryLog 文件收回掉。但是在咱们实际的运用场景中,发现好久之前的 EntryLog 文件无法被删去掉,对 Entry Log 文件存在的时刻进行监控,详细如下:

Apache Pulsar系列 —— 深入理解Bookie GC 回收机制

咱们能够看到,假设从 Broker 侧设置的 Retention 战略最大为 5 天,即好久之前的 EntryLog 文件依然存在于对应的 Ledger 数据盘中,导致磁盘的占用率较高。尽管Bookie 的 GC 收回机制是后台异步收回的,当 Broker 侧以为某条音讯能够删去时,Bookie 并不会立即从磁盘中将该数据删去掉,而是利用 Bookie 的 GC 线程周期性的触发收回的逻辑。但是数据的删去操作居然滞后了半年多,所以萌生了搞懂 Bookie GC 收回机制的主意,究竟是什么原因导致了该现象的产生。

Bookie GC 介绍

在 Apache Bookkeeper 中,数据的写入,读取以及收回(紧缩)操作是相互隔离的。为了防止过多碎片文件的产生,在 Bookies 中不同 Ledgers 中的 Entrys 会聚合存储到一个 EntryLog 文件中。Bookie 能够经过运转 GC 线程(GarbageCollectorThread)来删去未相关的 Entry 条目来抵达收回磁盘空间的目的。在当时的 EntryLog 文件中,假如某一个 Ledger 中包括无法删去的 Entry,那么这个 EntryLog 文件将一向保留在数据盘(Ledger 盘)中无法被删去。由于事务场景的限制,咱们没办法要求一个 EntryLog 文件中一切 Ledgers 的 Entries 都能在近乎相同的时刻内满足可删去的条件。为了防止该现象,Bookie 引入了数据紧缩的概念,即经过扫描 EntryLog 文件断定哪些 Entry 是能够删去的,能够删去的 Entry 持续保留在原始的 EntryLog 文件中,不行删去的 Entry 写入新的 EntryLog 文件中,扫描完结之后将原始的 EntryLog 文件删去掉。

Bookie 紧缩类型

Bookie 的 GC 收回线程并不是一向履行的,而是根据特定的阈值,Bookie 依照一个 EntryLog 文件中有用数据的占比以及数据紧缩被触发的时刻将数据紧缩的操作分为如下两种类型:

Minor GC:

默许触发的时刻为每 1 小时触发一次,能够经过 minorCompactionInterval 来自界说每一次 minor GC 触发的时刻距离。当抵达 Minor GC 触发的时刻阈值之后,会持续查看当时 EntryLog 中有用数据的占比是否超越默许装备的 20%。假如没有超越,则 Minor GC 收效,开端收回并紧缩 EntryLog 中的数据。假如超越阈值,那么 Minor GC 不会被触发。能够经过 minorCompactionThreshold 来自界说 Minor GC 中有用数据的占比抵达多少之后不会持续触发 Minor GC。为了防止 Minor GC 履行占用太多的时刻,也能够经过 minorCompactionMaxTimeMillis 的参数来操控当时 Minor GC 最大答应履行的时刻是多少。当 minorCompactionMaxTimeMillis <= 0 时,废物收回线程会一向履行直到扫描完结当时 Ledger 目录下一切的 Entry Log 文件。

Major GC:

默许触发的时刻为每 24 小时触发一次,能够经过 majorCompactionInterval 来自界说每一次 major GC 触发的时刻距离。当抵达 Major GC 触发的时刻阈值之后,会持续查看当时 EntryLog 中有用数据的占比是否超越默许装备的 80%。假如没有超越,则 Major GC 收效,开端收回并紧缩 EntryLog 中的数据。假如超越阈值,那么 Major GC 不会被触发。能够经过 majorCompactionThreshold 来自界说 Major GC 中有用数据的占比抵达多少之后不会持续触发 Major GC。为了防止 Minor GC 履行占用太多的时刻,也能够经过 majorCompactionMaxTimeMillis 的参数来操控当时 Major GC 最大答应履行的时刻是多少。当 majorCompactionMaxTimeMillis <= 0 时,废物收回线程会一向履行直到扫描完结当时 Ledger 目录下一切的 Entry Log 文件。

留意: minorCompactionThreshold 和 majorCompactionThreshold 的最大值不能够超越 100%,当 minorGC 和 majorGC 一起装备时,MinorGC 的 minorCompactionInterval 和 minorCompactionThreshold 要求有必要小于 MajorGC 中指定的阈值。

为什么需要引入紧缩有用占比阈值?

当做数据紧缩收回时,咱们默许分别为 Minor GC 和 Major GC 引入了数据有用占比的阈值,这样做的目的是为了防止每次废物收回线程运转时,都会去频频的扫描一切的 EntryLog 文件。当一个 EntryLog 文件中有用数据的占比超越 Major GC 指定的阈值,那么能够以为当时 EntryLog 中绝大部分数据依然为有用的数据。这种情况下咱们无需持续为了收回剩下的那一点无效数据,然后将该 EntryLog 中的数据从原始的 EntryLog 文件中再写入新的 EntryLog 文件中,这样能够大幅度的节省磁盘 I/O。

Bookie 紧缩办法

当时,Bookie 提供了如下两种数据紧缩的办法:

依照 Entries 的数量

默许情况下,Bookie 是经过 Entries 的数量进行紧缩,默许值为 1000,即每次最大紧缩 1000 条 Entry。能够经过 compactionRateByEntries 自界说每次紧缩 Entries 的数量。

依照 Entries 巨细

Bookie 依照 Entries 的巨细进行紧缩,能够经过 compactionRateByBytes 自界说每次收回最大答应被收回 Entries 的巨细。当想要运用该紧缩办法时,需要在 Bookie 的装备文件中一起翻开如下装备:isThrottleByBytes=true。

留意:生产环境中建议运用依照 Entries 巨细紧缩的办法,这个取决于 Entry 被打包的办法。关于 Pulsar 来说,普通音讯和 Batch 音讯都会被当作一条 Entry 来看待,这就可能会导致每一条 Entry 的巨细都不相同。假如依照 Entries 的数量来收回,即每次收回的数据巨细是不一致的,假如单个 Entry 过大,有可能导致收回期间占用较大的磁盘 IO,影响正常数据的读写IO,形成抖动的现象产生。

Bookie GC 触发的办法

当时 Bookie 的 GC 操作支撑如下两种触发办法:

主动触发

Bookie 的 GC 收回线程依照 Bookie 紧缩类型小节中介绍的办法,依照特定的时刻距离及阈值周期性的履行数据紧缩收回的操作。

手动触发

Bookie 支撑了 REST API 的 HTTP 服务,答应用户经过手动的办法触发 GC,运用办法如下:

curl -X PUT http://127.0.0.1:8000/api/v1/bookie/gc

  • IP: 即为当时 Bookie 的 IP 地址

  • Port:示例中的 8000 端口为 Bookie 装备文件中 httpServerPort 指定的端口,默以为 8000。

履行完结之后,也能够经过如下请求查看 GC 的状况等信息:

curl http://127.0.0.1:8000/api/v1/bookie/gc_details

Output:

[ {
   "forceCompacting" : false,
   "majorCompacting" : false,
   "minorCompacting" : false,
   "lastMajorCompactionTime" : 1662436000016,
   "lastMinorCompactionTime" : 1662456700007,
   "majorCompactionCounter" : 11,
   "minorCompactionCounter" : 99 
}]

Bookie GC 收回代码分析

Bookie 收回的代码逻辑首要在 org.apache.bookkeeper.bookie.GarbageCollectorThread 类中的 runWithFlags() 办法, 首要的收回逻辑包括如下三个函数:

  • doGcLedgers()

  • doGcEntryLogs()

  • doCompactEntryLogs()

在了解 Bookie GC 的收回逻辑中,咱们首要需要介绍几个关键的调集:

  • LedgerRangeIterator: 该接口为一个 LedgerRange 的迭代器,用来存储从 Meta Store (zookeeper)中存储的一切 Ledgers 的信息。

  • ledgerIndex:是 LedgerStorage (RocksDB)中一切 Ledger 扫描出来的一个调集。

  • ledgersMap:每一个 EntryLog 对应一个 ledgersMap,表示当时 EntryLog 中存储的一切 Ledgers 的调集。

  • entryLogMetaMap:每一个 Ledger 盘拥有一个 entryLogMetaMap 目标,是当时 Ledger 数据盘下一切 EntryLogID -> EntryLogMeta 的一个缓存

当时,Bookie 的索引存储支撑了多种办法,默许运用的是 SortedLedgerStorage,能够在 Bookie 的装备文件中经过 ledgerStorageClass 来指定详细需要运用的索引存储办法,一般推荐运用的装备如下:


ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage

所以鄙人面的代码详解中,咱们以 DbLedgerStorage 为例。

doGcLedgers()

在 doGcLedgers() 中,代码逻辑首要如下:

  1. 首要从 RocksDB 中获取当时数据盘目录下一切的 Ledgers 数据,并运用 NavigableSet 调集暂存当时活泼的 Ledgers 列表。

NavigableSet bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
  1. 经过 ledgerManager 目标,获取 meta store (默许:zookeeper)中一切 Ledgers 的 Range,暂存在 LedgerRangeIterator 迭代器中

LedgerRangeIterator ledgerRangeIterator = ledgerManager
.getLedgerRanges(zkOpTimeoutMs);
  1. 界说一个 Set 调集,来缓存从 zookeeper 中获取的 LedgerRangeIterator

if (ledgerRangeIterator.hasNext()) {
LedgerRange lRange = ledgerRangeIterator.next();
ledgersInMetadata = lRange.getLedgers();
// 当第一次进来以后,就能够获取到当时批次中最大的那个 Ledgers 的索引是多少
end = lRange.end();
} else {
// 假如从 zk 中获取到的 Ledgers 迭代器是空的或许现已迭代完一切的 Ledgers,则重置 done 标记,退出循环。
ledgersInMetadata = new TreeSet<>();
end = Long.MAX_VALUE;
done = true;
}

这儿将 LedgerRangeIterator 的迭代器转化为 ledgersInMetadata 的 Set 调集首要是为了第四步能够做 subSet 的操作。

  1. 以 RocksDB 中获取到的 Ledgers 调集为规范,对从 zookeeper 中获取的 Ledgers 列表做 subSet 的操作

Iterable subBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true);

其中 start 方位为 0,end 的方位为 LedgerRangeIterator 迭代器最终的一个方位。由于上述两个 Set 在预先都是做过排序操作的,所以在这儿能够直接进行 subSet 的操作。

  1. 拿第四步获取到的 subBkActiveLedgers 与 zookeeper 中的 ledgersInMetadata 调集比较,判别 zookeeper 中是否还包括当时 LedgerID,假如不包括代表能够从 Bookie 的 RocksDB 索引中删去当时 LedgerID 的信息。

// 迭代 subBkActiveLedgers 的调集
for (Long bkLid : subBkActiveLedgers) {
// 以 zk 为规范
if (!ledgersInMetadata.contains(bkLid)) {
....
// 清理指定的 Ledger ID
// 这个Ledger在 Bookie 中有,在 zk上没有,则删去。
garbageCleaner.clean(bkLid);
}
}

这儿以 zookeeper 中获取到的 ledgersInMetadata 为基准是由于,在 Pulsar 中当数据写入的时分是先去 zookeeper 节点注册一个暂时的 zk-node 来存储当时 LedgerID 的相关元数据信息,然后再去 RocksDB 中写入 LedgerID 的存储索引信息,然后将 LedgerID 的 Entry 数据写入到 EntryLog 中。删去操作也是同样的道理,当用户在 Pulsar 中运用的 Topic 中,有 Ledger 契合删去条件时,会去调用 ManagedLedger 的接口去 zookeeper 中删去 LedgerID 的 zk-node。能够看到,无论是读写,关于 Bookie 的 Client 来说,都是优先操作 Bookie-Zk 中的 Ledgers 信息。所以关于删去操作而言咱们也是以 zookeeper 中的 Ledgers Set 调集为基准,来查看 RocksDB 的索引存储中有哪些 LedgerID 是能够删去的。

  1. 调用 GarbageCleaner 的接口去 RocksDB 的 ledgerIndex 中删去指定的 LedgerID

初始化 garbageCleaner 接口并完成 clean 办法,在 clean 办法中调用 DbLedgerStorage 的 deleteLedger 接口


this.garbageCleaner = ledgerId -> {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("delete ledger : " + ledgerId);
}
gcStats.getDeletedLedgerCounter().inc();
// 调用 DbLedgerStorage 去删去接口
ledgerStorage.deleteLedger(ledgerId);
} catch (IOException e) {
LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
}
};

去 ledgerIndex 的缓存中删去当时的 LedgerID


@Override
public void deleteLedger(long ledgerId) throws IOException {
...
entryLocationIndex.delete(ledgerId);
ledgerIndex.delete(ledgerId);
....
}

能够看到 doGcLedgers() 函数首要是以 zookeeper 的 Ledgers 调集为基准,去比照 RocksDB 的 ledgerIndex 索引存储中删去待删去的 Ledgers。

doGcEntryLogs()

在 doGcEntryLogs() 中,代码逻辑首要如下:

  1. 迭代 entryLogMetaMap 获取当时数据盘目录下一切的 EntryLog 信息

entryLogMetaMap.forEach((entryLogId, meta) -> {
...
});
  1. 以 RocksDB 中的 ledgerIndex 缓存为基准,判别当时 EntryLog 中是否有能够删去的 Ledger

removeIfLedgerNotExists(meta);

EntryLogMetadata 中的 removeLedgerIf() 办法的参数为 LongPredicate ,本质是经过 ledgerIndex 中是否存在当时 LedgerID,假如不存在则 LongPredicate 的 test() 办法为true,该 Ledger 能够删去。


private void removeIfLedgerNotExists(EntryLogMetadata meta) {
// 这个 ledger 是否能够删去,取决于当时这个 Ledger 是否在 ledgerIndex 的调集中存在
meta.removeLedgerIf((entryLogLedger) -> {
// Remove the entry log ledger from the set if it isn't active.
try {
// ledgerStorage为专门为紧缩定制的 CompactableLedgerStorage,继承了 LedgerStorage 接口
return !ledgerStorage.ledgerExists(entryLogLedger);
} catch (IOException e) {
LOG.error("Error reading from ledger storage", e);
return false;
}
});
}

而 removeLedgerIf() 办法本身操作的是 EntryLogMeta 中 ledgersMap 的这个调集,删去操作也是根据 ledgerIndex 判别是否能够从 ledgersMap 中删去 LedgerID。


public void removeLedgerIf(LongPredicate predicate) {
ledgersMap.removeIf((ledgerId, size) -> {
boolean shouldRemove = predicate.test(ledgerId);
if (shouldRemove) {
remainingSize -= size;
}
return shouldRemove;
});
}
  1. 经过第二步的删去操作,在这儿去判别当时 EntryLog 中是否一切的 LedgerID 都现已被删去,假如都删去了,则咱们能够直接将这个 EntryLog 从数据盘中删去。

// 判别 EntryLog Meta 中的 ledgersMap 目标是否还有元素。
if (meta.isEmpty()) {
// This means the entry log is not associated with any active ledgers anymore.
// We can remove this entry log file now.
LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");
// 当当时的 EntryLog 中没有任何 Ledgers 目标时,直接调用删去 EntryLog 的接口进行删去操作。
removeEntryLog(entryLogId);
gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());
}

能够看到,在doGcEntryLogs 函数中,首要是以 ledgerIndex 为基准,操作每一个 EntryLog 中的 ledgesMap 目标,判别 Ledger 是否能够删去。假如当时 EntryLog 的一切 Ledger 都能够删去,则直接删去 EntryLog 文件。假如有一部分 Ledger 能够删去,一部分 Ledger 无法删去,则进入 doCompactEntryLogs() 函数的处理逻辑中。

doCompactEntryLogs

在 doCompactEntryLogs() 中,代码首要逻辑如下:

  1. 构造 entryLogMetaMap 的暂时目标 logsToCompact,并依照运用率对其排序:

List logsToCompact = new ArrayList();
// 开端之前首要把本地缓存的 entryLogMetaMap 都添加进来
logsToCompact.addAll(entryLogMetaMap.values());
// 依照运用率做一个排序
logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage));
  1. 迭代entryLogMetaMap的暂时目标:logsToCompact

for (EntryLogMetadata meta : logsToCompact) {
...
// 真实触发收回的核心逻辑
compactEntryLog(meta);
}
  1. 调用 scanEntryLog() 办法,开端扫描 EntryLog 文件

// 扫描指定的 EntryLog 文件
entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), scannerFactory.newScanner(entryLogMeta));

扫描 EntryLog 文件中能够简略整理为如下三个逻辑:

3.1 怎么扫描 EntryLog 文件

要了解 EntryLog 文件是怎么被扫描出来的,咱们首要要去看 Entry 是怎么被写入 EntryLog 文件中的。首要每一个 EntryLog 都有 1024 个字节 EntryLog Header 信息,首要包括如下内容:

  • Fingerprint(指纹信息): 4 bytes “BKLO”

在预分配 EntryLog 的时分,就固定的将4字节的签名信息写入

  • Log file HeaderVersion: 4 bytes

有两个版本:HEADER_V0 和 HEADER_V1,当时 EntryLog 的版本为 HEADER_V1。

  • Ledger map offset: 8 bytes

  • Ledgers Count: 4 bytes

所以在扫描 EntryLog 文件时,咱们首要越过当时 EntryLog 的 Header 信息:


// Start the read position in the current entry log file to be after
// the header where all of the ledger entries are.
long pos = LOGFILE_HEADER_SIZE;

之后会持续写入 4 字节的 entrySize 以及 8 字节的 LedgerID,所以扫描的时分也需要依照这种格局将 entrySize 和 LedgerID 分别读取出来,然后依据 entrySize 的巨细,持续向后读取出 Entry 真实的内容。


// Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)
ByteBuf headerBuffer = Unpooled.buffer(4 + 8);
while (true) {
...
long offset = pos;
pos += 4;
int entrySize = headerBuffer.readInt();
long ledgerId = headerBuffer.readLong();
headerBuffer.clear();
// 调用 scanner 的 accept() 办法
if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
// skip this entry
pos += entrySize;
continue;
}
// read the entry
data.clear();
if (entrySize <= 0) {
LOG.warn("bad read for ledger entry from entryLog {}@{} (entry size {})",
entryLogId, pos, entrySize);
return;
}
data.capacity(entrySize);
// process the entry
// 调用 scanner 的 process() 办法,将 entry 写入新的 EntryLog 中
scanner.process(ledgerId, offset, data);
// Advance position to the next entry
pos += entrySize;
}

如此往复,不断的将 EntryLog 中的每一条 Entry 顺次读取出来。

3.2 accept 接口

accept 接口首要用来判别当时的 LedgerID 是否还在 EntryLog 文件中,即是否还在 ledgersMap 中存在。


@Override
public boolean accept(long ledgerId) {
return meta.containsLedger(ledgerId);
}

3.3 process 接口

process 接口首要用来将无法删去的 Entry 写入到新的 EntryLog 文件中,并记载 Entry 对应的 offset 信息。


@Override
public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException {
...
long newoffset = entryLogger.addEntry(ledgerId, entry);
offsets.add(new EntryLocation(ledgerId, entryId, newoffset));
}
  1. 调用 flush 办法,更新新 EntryLog 文件的索引信息

// 强制把写入的数据改写到磁盘上去,改写的时分会一起更新索引信息,以便 broker 下次读取音讯的时分,能够去新的 EntryLog 中去读取。
scannerFactory.flush();

flush() 办法首要是将上述无法删去的 Entry 写入新 EntryLog 中的位点信息调用 DbLedgerStorage 的接口更新到 RocksDB 中去。


void flush() throws IOException {
if (offsets.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping entry log flushing, as there are no offset!");
}
return;
}
// Before updating the index, we want to wait until all the compacted entries are flushed into the
// entryLog
try {
entryLogger.flush();
// 更新新 EntryLog 文件中 offsets 的信息。
ledgerStorage.updateEntriesLocations(offsets);
ledgerStorage.flushEntriesLocationsIndex();
} finally {
offsets.clear();
}
}
  1. 删去原始的 EntryLog 文件

// 移除去原先旧的EntryLog文件
logRemovalListener.removeEntryLog(entryLogMeta.getEntryLogId());

上述的代码逻辑描述了关于单个数据盘目录下 EntryLog 完整的收回逻辑。关于多个数据盘目录的场景,每一个数据盘目录都会创建一个单独的 GarbageCollectorThread 的线程来运转上述的逻辑。

EntryLog 文件的巨细怎么操控

在 Ledger 的数据盘目录中能够看到,每一个 EntryLog 文件的巨细都固定为 1GB 左右,当抵达这个巨细时,EntryLog 文件就会翻滚创建新的 EntryLog 文件来写入。这是由于默许设置的 EntryLog 巨细为 1GB,详细如下:


/**
* Set the max log size limit to 1GB. It makes extra room for entry log file before
* hitting hard limit '2GB'. So we don't need to force roll entry log file when flushing
* memtable (for performance consideration)
*/
public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;

reachEntryLogLimit() 办法用来查看是否 EntryLog 文件抵达指定的巨细:


boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {
if (logChannel == null) {
return false;
}
return logChannel.position() + size > logSizeLimit;
}

用户也能够经过如下参数自界说 EntryLog 文件的巨细:


logSizeLimit

怎么核算 EntryLog 文件的运用率

在 doCompactEntryLogs 章节中能够看到,在迭代 entryLogMetaMap 时,依据 EntryLog 的运用率对 EntryLog 进行了排序。EntryLog 的运用率首要经过 EntryLog Metadata 中的如下两个字段进行核算的:

  • private long totalSize; // 总巨细

  • private long remainingSize; // 剩下巨细

在数据写入 EntryLog (ledgersMap) 的进程中会一起添加 totalSize 和 remainingSize 这两个字段:


// 往 ledgersMap 中新增元素
public void addLedgerSize(long ledgerId, long size) {
totalSize += size;
remainingSize += size;
ledgersMap.addAndGet(ledgerId, size);
}

当在做数据紧缩时,假如判别某一个 LedgerID 能够从 ledgersMap 中删去时,会从 remainingSize 中减去当时 Ledger 的 size:


public void removeLedgerIf(LongPredicate predicate) {
ledgersMap.removeIf((ledgerId, size) -> {
boolean shouldRemove = predicate.test(ledgerId);
if (shouldRemove) {
remainingSize -= size; // 减去当时 ledger 的巨细
}
return shouldRemove;
});
}

所以在核算 EntryLog 的运用率时,拿当时 remainingSize/totalSize 即可核算出 EntryLog 文件中当时剩下的有用数据的比率是多少:


public double getUsage() {
if (totalSize == 0L) {
return 0.0f;
}
return (double) remainingSize / totalSize;
}

minor GC 与 major GC 履行数据收回的逻辑是完全一致的,EntryLog 中有用数据的运用率也是用来区别是否为 minor GC 或许 major GC 的关键点。