分布式体系较为杂乱,不管写入还是查询,都需求多个节点的配合才干完结操作。本教程以一个分布式 SQL 查询为例,介绍 DolphinDB 分布式数据库的数据流以及其中阅历的各类线程池。经过了解 SQL 查询的全进程,也能够帮助我们更好地优化 DolpinDB 的装备和功能。

1. DolphinDB 线程类型

woker
惯例交互作业的作业线程,用于接纳客户端恳求,将使命分解为多个小使命,依据使命的粒度自己履行或许发送给 local executor 或 remote executor 履行。

local executor
本地履行线程,用于履行 worker 分配的子使命。每个本地履行线程一次只能处理一个使命。所有作业线程同享本地履行线程。[ploop](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252F200%252FFunctionalprogramming%252FTemplateFunctions%252FloopPloop.html)[peach](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252F200%252FFunctionalprogramming%252FTemplateFunctions%252Feach.html) 等并行核算函数的核算使命分配在本地履行线程完结。

remote executor
长途履行线程,将长途子使命发送到长途节点的独立线程。

batch job worker
运用 [submitJob](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252FFunctionsandCommands%252FFunctionReferences%252Fs%252FsubmitJob.html)[submitJobEx](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252FFunctionsandCommands%252FFunctionReferences%252Fs%252FsubmitJobEx.html) 创立批处理作业的作业线程。该线程在使命履行完后若搁置 60 秒则会被体系主动回收,不再占用体系资源。

web worker
处理 HTTP 恳求的作业线程。DolphinDB 供给了基于 web 的集群管理界面,用户能够经过 web 与 DolphinDB 节点进行交互,提交的恳求由该线程处理。

secondary worker
次级作业线程。当前节点产生的长途子使命,会在长途节点的次级作业线程上履行,用于防止作业环,解决节点间的使命循环依靠而导致的死锁问题。

dynamic worker
动态作业线程。当所有的作业线程被占满且有新使命时,体系会主动创立动态作业线程来履行使命。依据体系并发使命的繁忙程度,总共能够创立三个等级的动态作业线程,每一个等级能够创立 maxDymicWorker 个动态作业线程。该线程在使命履行完后若搁置 60 秒则会被体系主动回收,不再占用体系资源。

infra worker
基础设施处理线程。当敞开元数据高可用或流数据高可用的时分,体系会主动创立基础设施处理线程,用于处理集群节点间的 raft 信息同步作业。

urgent worker
紧急作业线程,只接纳一些特殊的体系级使命,如登录 [login](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252F200%252FFunctionsandCommands%252FCommandsReferences%252Fl%252Flogin.html) ,撤销作业 [cancelJob](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252FFunctionsandCommands%252FCommandsReferences%252Fc%252FcancelJob.html)[cancelConsoleJob](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252FFunctionsandCommands%252FCommandsReferences%252Fc%252FcancelConsoleJob.html) 等。

diskIO worker
磁盘数据读写线程,经过参数 diskIOConcurrencyLevel 操控。假如 diskIOConcurrencyLevel = 0,表示直接用当前线程来读写磁盘数据。假如 diskIOConcurrencyLevel > 0,则会创立相应个数的指定线程来读写磁盘数据。

2. 不同类型线程与装备参数的关系

线程类型

装备参数

默许装备

woker

wokerNum

默许值是 CPU 的内核数

local executor

localExecutors

默许值是 CPU 内核数减1

remote executor

remoteExecutors

默许值是1

batch job worker

maxBatchJobWorker

默许值是 workerNum 的值

web worker

webWorkerNum

默许值是1

secondary worker

secondaryWorkerNum

默许值是 workerNum 的值

dynamic worker

maxDynamicWorker

默许值是 workerNum 的值

infra worker

infraWorkerNum

默许值是2

urgent worker

urgentWorkerNum

默许值是1

diskIO worker

diskIOConcurrencyLevel

默许值是1

3. API 主张一次 SQL 查询的线程阅历

DolphinDB 的主要节点类型:

  • controller
    操控节点,担任搜集署理节点和数据节点的心跳,监控每个节点的作业状况,管理分布式文件体系的元数据和事务
  • data node
    数据节点,既能够存储数据,也能够完结查询和杂乱的核算。
  • compute node
    核算节点,只用于核算,应用于包括流核算、分布式相关、机器学习、数据剖析等场景。核算节点不存储数据,故在该节点上不能建库建表,但能够经过 [loadTable](https://link.zhihu.com/?target=https%3A//gitee.com/link%3Ftarget%3Dhttps%253A%252F%252Fwww.dolphindb.cn%252Fcn%252Fhelp%252F200%252FFunctionsandCommands%252FFunctionReferences%252Fl%252FloadTable.html) 加载数据进行核算。能够经过在集群中装备核算节点,将写入使命提交到数据节点,将所有核算使命提交到核算节点,实现存储和核算的分离。2.00.1版别开端支持核算节点。

综上,API 主张的 SQL 查询能够提交到一个协调节点(coordinator) 来完结数据的查询和核算恳求。 coordinator 能够是集群中的 data node 或许 compute node。当运用 2.00.1 及以上版别时,用户能够经过在集群中装备 compute node,将 SQL 查询使命悉数提交到 compute node,实现存储和核算的分离。下面以 API 向 coordinator 主张一次 SQL 查询为例,叙述整个进程中所调度的所有线程。

从一次 SQL 查询的全过程看 DolphinDB 的线程模型

step1:DolphinDB 客户端向 coordinator 主张数据查询恳求

以 coordinator 为 data node 为例,例如主张一次聚合查询,查询句子如下:

select avg(price) from loadTable("dfs://database", "table") where year=2021 group by date

假定上述聚合查询句子总共触及 300 个分区的数据,且正好平均分配在三个数据节点。

data node1:100 chunks;data node2:100 chunks;data node3:100 chunks

DolphinDB 客户端将查询恳求进行二进制序列化后经过 tcp 协议传输给 data node1。

step2:data node1 收到查询恳求

data node1 收到客户端的查询恳求后,将分配 1 个 worker 线程对内容进行反序列化和解析。当发现内容是 SQL 查询时,会向 controller 主张恳求,获取跟这个查询相关的所有分区的信息。整个 SQL 查询未履行结束时,当前的 worker 线程会被一向占用。

step3:controller 收到 data node1 的恳求

controller 收到 data node1 的恳求后,将分配 1 个 worker 线程对内容进行反序列化和解析,准备好本次 SQL 查询触及的数据分区信息后,由该 worker 线程序列化后经过 tcp 协议传输给 data node1。controller 的 worker 完结该作业后将从行列中获取下一个恳求。

data node1:100 chunks;data node2:100 chunks;data node3:100 chunks

step4:data node1 收到 controller 回来的信息

data node1 收到 controller 回来的信息后,由一向占用的 worker 线程对内容进行反序列化和解析,得知本次 SQL 查询触及的数据分区信息后,将坐落本节点的分区数据核算使命发送到本地使命行列,此刻本地使命行列会产生 100 个子使命。一起,将在长途节点 data node2、data node3 的分区数据核算使命以 group task 的方法发送到长途使命行列,所以长途使命行列会被添加2个长途使命,分别打上 data node2 和 data node3 的标志。

step5(1):本地 worker 和 local executor 消费本地使命行列

此刻,一向占用的 worker 线程和 local executor 线程会一起并行消费本地使命行列的子使命。所以装备项中的 wokerNum 和 localExecutors 很大程度上决定了体系的并发核算才能。

step5(2)(3):本地 remote executor 发送长途使命至长途节点

一起,remote executor 线程将长途使命行列的内容序列化后经过 tcp 协议分别发送到 data node2 和 data node3。

step6(1)(2):长途节点收到长途使命

data node2 和 data node3 收到长途使命后,将分配 1 个 secondary worker 线程对内容进行反序列化和解析,并将核算使命发送到本地使命行列,此刻 data node2 和 data node3 的本地使命行列都会产生 100 个子使命。

step7(1)(2):长途节点 secondary worker 和 local executor 消费本地使命行列

此刻,data node2 和 data node3 上一向占用的 secondary worker 线程和 local executor 线程会一起并行消费本地使命行列的子使命。所以装备项中的 secondaryWorkerNum 对体系的并发核算才能也有必定影响。

step8(1)(2):长途节点回来中心核算成果至 data node1

当 data node2 和 data node3 触及的核算使命完结后,分别得到了本次 SQL 查询的中心核算成果,由一向占用的 secondary worker 线程对内容进行序列化后经过 tcp 协议传输给 data node1。

step9:data node1 核算终究成果并回来给客户端

data node1 接纳到 data node2 和 data node3 回来的中心核算成果后,由一向占用的 worker 线程对内容进行反序列化,然后在该线程上核算出终究成果,并在序列化后经过 tcp 协议传输给客户端。

DolphinDB 客户端接纳到 data node1 回来的信息后,经过反序列化显现本次 SQL 查询的成果。

coordinator 为 data node 和 compute node 的差异

  • compute node 不存储数据,所以 compute node 解析客户端的 SQL 查询后,从 controller 拿到本次 SQL 查询触及的数据分区信息,会将所有数据查询使命都分配到 data node 履行,得到每个 data node 回来的中心成果,终究调度 compute node 的本地资源核算终究成果并回来给客户端。
  • 将所有 SQL 查询都提交到 compute node 后,能够实现存储和核算的分离,减轻 data node 的核算作业担负。当实时写入的数据量非常大时,主张装备 compute node,将所有 SQL 查询都提交到 compute node,实现存储和核算的分离。2.00.1版别开端支持核算节点。

4. SQL 查询进程剖析

经过对 API 主张一次 SQL 查询的线程阅历统计剖析能够发现,本次 SQL 查询总共发生了 8 次 tcp 传输,其中 2 次是 DolphinDB server 和 DolphinDB client 之间的传输。假如查询成果的数据量比较大,但对查询成果的延时性又比较敏感,能够优化的方向主要有以下几个:

  • 集群节点间为内网通讯,引荐万兆以太网。
  • DolphinDB server 和 DolphinDB client 间为内网通讯。
  • API 指定查询成果回来进行数据压缩。
  • 线程装备参数优化。
  • SQL 句子优化:where 条件添加分区字段的信息过滤,起到分区剪枝的目的,防止全表扫描,这样能够大大减少子使命的数目。
  • 添加每个节点的磁盘卷的数量。这样更多的磁盘能够并行读取分区的数据。
  • 添加license 约束的 CPU 核心数和内存大小,提升体系的并行或并发处理才能。

5. 线程装备参数优化

wokerNum
假如 license 约束的 CPU 核心数大于物理机 CPU 核心数,引荐 wokerNum 等于物理机 CPU 核心数;假如 license 约束的 CPU 核心数小于等于物理机 CPU 核心数,引荐 wokerNum 等于 license 约束的 CPU 核心数。

localExecutors
引荐 localExecutors = wokerNum – 1

remoteExecutors
引荐 remoteExecutors = n – 1,n 表示集群的节点数。假如是单节点 single 形式或许是单数据节点集群,不需求装备 remoteExecutors 的值。

maxBatchJobWorker
引荐 maxBatchJobWorker = wokerNum,选用默许值。

webWorkerNum
引荐操控节点 webWorkerNum 能够装备为 4,数据节点 webWorkerNum 能够装备为 1。由于正常情况下很少经过 web 与 DolphinDB 节点交互的方法提交查询使命。

secondaryWorkerNum
引荐 secondaryWorkerNum = wokerNum,选用默许值。

maxDynamicWorker
引荐 maxDynamicWorker = wokerNum,选用默许值。

infraWorkerNum
引荐 infraWorkerNum = 2,选用默许值。

urgentWorkerNum
引荐 urgentWorkerNum = 1,选用默许值。

diskIOConcurrencyLevel
关于 hdd 磁盘,引荐 diskIOConcurrencyLevel 等于对应节点下经过 volumes 参数装备的磁盘个数。关于 ssd 磁盘,引荐 diskIOConcurrencyLevel = 0。