数据库浅谈之 DuckDB AGG 底层实现
HELLO,各位博友好,我是阿呆 🙈🙈🙈
这里是数据库浅谈系列,收录在专栏 DATABASE 中 😜😜😜
本系列阿呆将记录一些数据库领域相关的知识 🏃🏃🏃
OK,兄弟们,废话不多直接开冲 🌞🌞🌞
一 🏠 概述
DuckDB 关于 AGG 的算子有三个
PhysicalUngroupedAggregate,适用于只聚合无分组,无DISTINCT,且所有行都可被合并
PhysicalPerfectHashAggregate,适用于执行一组分组和聚合,使用一个完美哈希表
PhysicalHashAggregate 是实现分组和聚合的物理算子,哈希表执行分组
本篇是对多数 AGG 场景普遍适用的 PhysicalHashAggregate 源码剖析
1.1 核心类图展示
核心类图一
核心类图二
1.2 核心类成员描述
基类 PhysicalOperator
它是执行计划中物理算子的基类
类成员 | 作用 |
---|---|
PhysicalOperatorType type | 算子类型 |
vector<unique_ptr<PhysicalOperator>> children | 子算子集 |
vector<LogicalType> types | 算子返回类型 |
idx_t estimated_cardinality | 算子预估值 |
unique_ptr<GlobalSinkState> sink_state | 全局接收状态 |
unique_ptr<GlobalOperatorState> op_state | 算子全局状态 |
类函数 | 作用 |
---|---|
virtual SinkResultType Sink(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate,DataChunk &input) const; | 重复调用至无输入,可并行调用,访问 GlobalSinkState 需加锁 |
virtual void Combine(ExecutionContext &context, GlobalSinkState &gstate, LocalSinkState &lstate) const; | 单线程完成其 pipeline 模块部分时调用,可并行调用,最后一次访问 LocalSinkState |
virtual SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,GlobalSinkState &gstate) const; | 当所有线程执行完成时调用,单线程调用,每个pipeline 只调用一次 |
virtual void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate, LocalSourceState &lstate) const; | Source interface ,向上层 pipeline 发送数据 |
static idx_t GetMaxThreadMemory(ClientContext &context); | 算子每个线程可使用的最大内存 |
void AddPipeline(Executor &executor, shared_ptr<Pipeline> current, PipelineBuildState &state); | 添加 Pipeline |
virtual void BuildPipelines(Executor &executor, Pipeline ¤t, PipelineBuildState &state); | 创建 Pipeline |
void BuildChildPipeline(Executor &executor, Pipeline ¤t, PipelineBuildState &state,PhysicalOperator *pipeline_child); | 创建子 Pipeline |
PhysicalHashAggregate
它是实现分组和聚合的物理算子,哈希表执行分组
类成员 | 作用 |
---|---|
vector<unique_ptr<Expression>> groups | group by 分组项 |
vector<GroupingSet> grouping_sets | grouping set 分组项 |
vector<unique_ptr<Expression>> aggregates | Agg 函数项 |
bool any_distinct | 聚合函数中是否有 DISTINCT |
vector<LogicalType> group_types | 分组项的各个类型 |
vector<LogicalType> payload_types | 聚合函数参数列表的类型 |
vector<LogicalType> aggregate_return_types | 聚合返回的类型 |
vector<RadixPartitionedHashTable> radix_tables | 基数分区哈希表 (一个分组一个) |
vector<BoundAggregateExpression *> bindings | 指向聚合函数的指针集 |
// 分组函数, 类似于 grouping_set 这种
vector<vector<idx_t>> grouping_functions// 聚合函数存在过滤,记录对应的过滤 Expression 和索引
unordered_map<Expression *, size_t> filter_indexes
类函数 |
---|
void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,LocalSourceState &lstate) const override; |
SinkResultType Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate,DataChunk &input) const override; |
void Combine(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate) const override; |
SinkFinalizeType Finalize(Pipeline &pipeline, Event &event, ClientContext &context,GlobalSinkState &gstate) const override; |
//不切换此选项,GetData 方法将在扫描哈希表时销毁该哈希表
static void SetMultiScan(GlobalSinkState &state);
PhysicalHashAggregate 的 Sink 过程
HashAggregateGlobalState
类成员 | 作用 |
---|---|
vector<unique_ptr<GlobalSinkState>> radix_states | 记录(监控)基数分区哈希表的GlobalSinkState 状态 |
HashAggregateLocalState
类成员 | 作用 |
---|---|
DataChunk aggregate_input_chunk | 聚合输入数据块 |
vector<unique_ptr<LocalSinkState>> radix_states | 记录(监控)基数分区哈希表的LocalSinkState 状态 |
PhysicalHashAggregate 的 Source 过程
PhysicalHashAggregateGlobalSourceState
类成员 | 作用 |
---|---|
vector<unique_ptr<GlobalSourceState>> radix_states | 记录(监控)基数分区哈希表的GlobalSinkState 状态 |
PhysicalHashAggregateLocalSourceState
类成员 | 作用 |
---|---|
vector<unique_ptr<LocalSourceState>> radix_states | 记录(监控)基数分区哈希表的LocalSinkState 状态 |
RadixPartitionedHashTable
类成员 | 作用 |
---|---|
GroupingSet &grouping_set | group by 分组项 |
vector<idx_t> null_groups | 把不在grouping_set 中的分组项放到这 |
const PhysicalHashAggregate &op | HashAgg 算子 |
idx_t radix_limit | 切换到基数分区前,算子中可有多少分组 |
vector<Value> grouping_values | 属于此哈希表的分组值 |
类函数 | 作用 |
---|---|
void Sink(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate, DataChunk &input,DataChunk &aggregate_input_chunk) const; | 将输入数据拆分为分组和聚合两部分;通过逻辑判断,创建不同的哈希表实例;调用哈希表实例的 AddChunk 方法 |
void Combine(ExecutionContext &context, GlobalSinkState &state, LocalSinkState &lstate) const; | 把各个哈希表的哈希桶释放掉,然后把RadixHTLocalState 的成员PartitionableHashTable 推到全局 |
bool Finalize(ClientContext &context, GlobalSinkState &gstate_p) const; | 未分区时就是把分区哈希表的数据和全局哈希表进行依次合并,释放分区哈希表 |
void ScheduleTasks(Executor &executor, const shared_ptr<Event> &event, GlobalSinkState &state,vector<unique_ptr<Task>> &tasks) const; | 未分区时,不会有并行任务队列;分区时可并行计算 |
void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSinkState &sink_state, GlobalSourceState &gstate_p,LocalSourceState &lstate_p) const; | 向上输出数据的接口,调用 GroupedAggregateHashTable 的 Scan 接口 |
unique_ptr<GlobalSourceState> GetGlobalSourceState(ClientContext &context) const; | 因此在这里会进行聚合函数的计算?构造并获取 GlobalSourceState |
unique_ptr<GlobalSinkState> GetGlobalSinkState(ClientContext &context) const; | 还是说在 sink 已经完成了计算?构造并获取 GlobalSinkState |
unique_ptr<LocalSinkState> GetLocalSinkState(ExecutionContext &context) const; | 构造并获取 LocalSinkState |
unique_ptr<LocalSourceState> GetLocalSourceState(ExecutionContext &context) const; | 构造并获取 LocalSourceState |
RadixPartitionedHashTable 的 source 过程
RadixHTGlobalSourceState
类成员 | 作用 |
---|---|
mutex lock | 互斥锁,用于成员共享变量 |
idx_t ht_index | 哈希表索引 |
idx_t ht_scan_position | 哈希表扫描位置,相当于数据拷贝时的列偏移 |
atomic<bool> finished | 原子变量,是否数据拷贝完成 |
RadixHTLocalSourceState
类成员 | 作用 |
---|---|
DataChunk scan_chunk | 向上输出的数据 |
RadixPartitionedHashTable 的 sink 过程
RadixHTGlobalState
类成员 | 作用 |
---|---|
vector<unique_ptr<PartitionableHashTable>> intermediate_hts | Combine 阶段,接受推向全局的分区哈希表 |
vector<unique_ptr<GroupedAggregateHashTable>> finalized_hts | 最终的结果哈希表 |
bool is_empty | 哈希表是否为空 |
bool multi_scan | 哈希表是否支持多次扫描 |
mutex lock | 用于更新全局聚合状态锁 |
atomic<idx_t> total_groups | 聚合数 |
bool is_finalized = false | 是否结束 |
bool is_partitioned = false | 是否分区 |
RadixPartitionInfo partition_info | 分区数,分区掩码和前缀等 |
RadixHTLocalState
类成员 | 作用 |
---|---|
DataChunk group_chunk | 分组数据 |
unique_ptr<PartitionableHashTable> ht | 用于单个分组的哈希表 |
bool is_empty | 哈希表是否有数据 |
PartitionableHashTable
类成员 | 作用 |
---|---|
Allocator &allocator | STL 类似,申请空间但并不实例化 |
BufferManager &buffer_manager | Block 管理类 |
vector<LogicalType> group_types | 分组类型 |
vector<LogicalType> payload_types | 聚合函数返回的类型 |
vector<BoundAggregateExpression *> bindings | 聚合函数 |
bool is_partitioned | 是否分区 |
RadixPartitionInfo &partition_info | 当做分区时确定桶落在X分区 |
HashTableList unpartitioned_hts | 未分区哈希表集合 |
unordered_map<hash_t, HashTableList> radix_partitioned_hts | 分区哈希表集合 |
类函数 | 作用 |
---|---|
idx_t AddChunk(DataChunk &groups, DataChunk &payload, bool do_partition) | 为哈希表插入数据 |
void Partition() | 将哈希表分区 |
bool IsPartitioned() | 判断是否需要分区 |
HashTableList GetPartition(idx_t partition) | 获取分区哈希表 |
HashTableList GetUnpartitioned() | 获取未分区哈希表 |
void Finalize() | 遍历将哈希表的哈希桶销毁 |
GroupedAggregateHashTable
类成员 | 作用 |
---|---|
HtEntryType entry_type | 哈希值类型是 32/64 bit |
idx_t entries | 哈希桶数(元素数) |
vector<BufferHandle> payload_hds | 哈希表数据块 |
vector<data_ptr_t> payload_hds_ptrs | 上个成员的 data 指针 |
BufferHandle hashes_hdl | 哈希桶数据块 |
data_ptr_t hashes_hdl_ptr | 上个成员的 data 指针 |
idx_t hash_offset | 内存布局中哈希值的偏移量 |
hash_t hash_prefix_shift | 取高十六位前缀 |
hash_t bitmask | 哈希掩码 |
类函数 | 作用 |
---|---|
idx_t AddChunk(DataChunk &groups, DataChunk &payload) | 列式计算哈希值 |
idx_t FindOrCreateGroups(DataChunk &groups, Vector &group_hashes, Vector &addresses_out, SelectionVector &new_groups_out) | 创建哈希桶和插入分组数据,初始化聚合函数 |
idx_t AddChunk(DataChunk &groups, Vector &group_hashes, DataChunk &payload) | 更新聚合函数项 |
void Combine(GroupedAggregateHashTable &other) | 合并哈希表 |
idx_t Scan(idx_t &scan_position, DataChunk &result) | 进行数据拷贝,向上输出结果 DataChunk |
BaseAggregateHashTable
类成员 | 作用 |
---|---|
Allocator &allocator | STL 类似,申请空间但并不实例化 |
BufferManager &buffer_manager | Block 管理类 |
RowLayout layout | 行内存布局 |
RowLayout
类成员 | 作用 |
---|---|
Aggregates aggregates | 聚合函数 |
idx_t flag_width | 头部校验位的宽度 |
idx_t data_width | 数据宽度 |
idx_t aggr_width | 聚合函数宽度 |
idx_t row_width | 行宽 |
vector<idx_t> offsets | 偏移量 |
bool all_constant | 是否存在变长数据 |
idx_t heap_pointer_offset | 堆指针偏移 |
1.3 哈希表
哈希表结构
GroupedAggregateHashTable 是 PhysicalHashAggregate 用于计算的线性探测哈希表,使用开放寻址解决哈希冲突。输入分组和聚合函数,将计算结果存储在哈希表中,它由两部分组成,分别是 hashes(哈希部分) 和 payload(聚合部分)
HASHES LAYOUT
[SALT][PAGE_NR][PAGE_OFFSET]
内存布局 | 作用 |
---|---|
SALT | 哈希值高位,例对于 64 位哈希,为 16 |
PAGE_NR | 缓冲区管理的 payload 页面索引 |
PAGE_OFFSET | 指向 payload page 的逻辑条目偏移量 |
PAYLOAD LAYOUT
[VALIDITY][GROUPS][HASH][PADDING][PAYLOAD]
内存布局 | 作用 |
---|---|
VALIDITY | 数据列的有效位 |
GROUPS | 分组数据,大小固定,可为多列 |
HASH | 分组的哈希数据 |
PADDING | 对齐数据 |
PAYLOAD | 聚合数据 |
哈希表结构图
桶和数据拆开原因
时间角度:哈希桶和数据块绑定导致无法使用列连续特性(指针 + 偏移量),快速定位到某行的哈希桶
空间角度:在所有哈希表 Sink 阶段完成时(所有输入行计算完毕时),在 combine 阶段即可销毁哈希桶,释放掉
Block
块内存
哈希表实现原理
在哈希表创建时,申请一个 block 块(一段连续内存)作为哈希桶,在计算哈希值时通过输入分组数据,列计算和与操作计算出每行哈希值,哈希构建过程如下图所示
DuckDB 列计算哈希
如下图所示,输入数据可理解为 MDP (列存),哈希值批计算,充分利用列连续特性
DuckDB 构建哈希表
1、列算哈希
2、开放寻址
3、数据拷贝
遍历桶个数,拷贝分组输入 Trunk 填充数据块
所有数据拷贝操作,均调用 C 函数库,效率高
4、初化聚合
初始化 PayLoad 部分,遍历桶和聚合函数项,按对应的聚合函数类型去构筑空间
struct AvgState { uint64_t count; double value; //func... };
5、数据比对
遍历命中哈希表的行,数据比较(指针+偏移,类型强转)
如果数据不一致(即两行数据不完全相同,不落在一个分组),标记该行;比较结束后,被标记行再次重新往后寻找哈希桶,直到所有的行都被聚合
6、更新聚合
遍历行,更新 PayLoad 部分,各函数哈希函数内的 count 和 value 值
二 🏠 核心
2.1 HT 优化策略
HT 选取
哈希冲突的解决 链接 或 线性探测 ,使用链接,不直接将聚合值保留在哈希表中,而是保留一组值和聚合列表。如果分组值指向具有空列表的哈希表条目,则只需添加新组和聚合。如果分组值指向现有列表,检查每个列表条目分组值是否匹配。匹配则更新该组聚合。不匹配,我们创建一个新的列表条目。
在线性探测中没有这样的列表,在找到现有条目时,将比较分组值,匹配则将更新条目。不匹配则在哈希表中向下移动一个条目并重试。当找到匹配的组条目或找到空的哈希表条目时,此过程完成。
虽然理论上等效,由于缓存局部性,计算机硬件架构将倾向于线性探测。因为线性探测遍历哈希表条目线性地,下一个条目很可能在 CPU 缓存中,因此访问速度更快。在现代硬件架构上,链接通常会导致随机访问和更差的性能。因此,我们对聚合哈希表采用了线性探测
HT优化
当冲突太多,即太多的组散列到同一个散列表条目,链接和线性探测都会在理论上从 O(1) 到 O(n) 降低散列表大小的查找性能。这个问题的一个常见解决方案是在 填充率 超过某个阈值时调整哈希表大小,例如 75% 是 Java 的默认值HashMap
. 这一点特别重要,因为在开始聚合之前我们不知道结果中的组数量。我们也不假设知道输入表中的行数
因此,我们从一个相当小的哈希表开始,并在填充率超过阈值时调整它的大小。基本的哈希表结构如下图所示,该表有四个槽 0-4。表中已经存在三个组,组键为 12、5 和 2。每个组的聚合值(例如来自 a SUM
)为 43 等
在调整大小后调整部分填充的哈希表的大小是一个很大的挑战,所有组都在错误的位置,我们必须移动所有内容,这将非常昂贵
为了有效地支持调整大小,我们实现了一个由单独分配的指针数组组成的两部分聚合哈希表,该指针数组指向包含分组值和每个组的聚合状态的有效负载块。指针不是实际指针而是符号指针,它们指的是块 ID 和所述块内的行偏移量。如上图所示,哈希表条目分为两个有效负载块。在调整大小时,我们丢弃指针数组并分配一个更大的数组。然后,我们再次读取所有有效负载块,对组值进行哈希处理,并将指向它们的指针重新插入到新的指针数组中。组数据因此保持不变,这大大降低了调整哈希表大小的成本
这可以在下图中看到,我们将指针数组大小加倍,但有效负载块保持不变
简单的两部分哈希表设计需要在调整大小时重新哈希 所有 组值,这可能非常昂贵,尤其是对于字符串值。为了加快速度,我们还将组值的原始哈希写入每个组的有效负载块。然后,在调整大小期间,我们不必重新散列组,而是可以从有效负载块中读取它们,计算新的偏移量到指针数组中,然后插入那里
两部分哈希表在查找条目时有一个很大的缺点:指针数组和有效负载块中的组条目之间没有排序。因此,跟随指针会在内存层次结构中创建随机访问。这将导致计算中不必要的停顿。为了缓解这个问题,我们扩展了指针数组的内存布局,除了指向有效负载值的指针之外,还包括来自组哈希的一些(1 或 2)字节。这样,线性探测可以首先将指针数组中的哈希位与当前组哈希进行比较,并决定是否值得跟踪有效负载指针。对于指针链中的每个组,这可能会继续。只有当哈希位匹配时,我们才必须实际跟随指针并比较实际的组。这种优化大大减少了必须遵循指向有效负载块的指针的次数,从而减少了与整体性能直接相关的随机访问内存的次数。它还具有很好的副作用,即也大大减少了也可能很昂贵的完整组比较,例如在包含字符串的组上进行聚合时
这里的另一个(较小的)优化涉及指针数组条目的宽度。对于具有很少条目的小型哈希表,我们不需要很多位来编码有效负载块偏移指针。DuckDB 支持 4 字节和 8 字节指针数组条目。
对于大多数聚合查询,绝大多数查询处理时间都花在查找哈希表条目上,这就是为什么值得花时间优化它们的原因。如果你很好奇,所有这些的代码都在 DuckDB 存储库中,aggregate_hashtable.cpp
. 当我们知道列统计信息中只有几个不同的组时,还有另一个优化,完美的哈希聚合,但那是另一篇文章。但我们还没有在这里完成。
2.2 并行分组聚合
相关论文
随着现代计算机体系结构的发展,两个问题共同阻碍了最先进的并行查询执行方法:
(i) 为了利用多核的优势,所有的查询工作必须均匀地分布在(很快)数百个线程中,以实现良好的加速
(ii) 由于现代乱序核的复杂性,即使使用精确的数据统计也很难平均分配工作
因此,现有的 plandriven 并行方法会遇到负载平衡和上下文切换瓶颈,因此不再具有可伸缩性。
多核体系结构面临的第三个问题是 内存控制器的去中心化,这将导致非统一内存访问(NUMA)
作为回应,我们提出了 “片段驱动的” 查询执行框架,其中调度变成了支持 NUMA 的细粒度运行时任务。morsel 驱动的查询处理获取输入数据的小片段(“morsels”),并将它们调度到运行整个操作符 pipeline 的工作线程,直到下一个管道中断。并行度没有嵌入到计划中,但是可以在查询执行期间弹性地改变,因此调度程序可以对不同片段的执行速度做出反应,也可以动态地调整资源,以响应工作负载中新到达的查询。此外,调度程序知道 NUMA 本地片段和操作符状态的数据局部性,因此绝大多数执行都发生在 NUMA 本地内存上。我们对TPC-H和SSB基准测试的评估显示,在32核的情况下,它的绝对性能非常高,平均加速超过30
并行分组聚合
每个线程从下层算子读取数据并构建单独的本地哈希表,然后从单个线程将它们合并在一起。在分组列很少时,这将非常有效。如果组很少,单个线程可以合并许多线程本地哈希表,而不会产生瓶颈。但是完全有可能有与输入行一样多的组,例在可能成为主键候选的列上进行分组时(KEY 值极高场景),DuckDB 解决方式是采用 相关论文 的 4.4 Grouping/Aggregation 中提到的并行聚合方式
若两个组具有不同的哈希值,它们不可能相同。因此所有线程使用相同的分区方式,就可以使用哈希值创建组的完全独立分区,而无需线程之间的任何通信(参见上图中的阶段 1)【即各线程的分区逻辑(计算哈希值)一致】
在构建完所有本地哈希表之后,为每个工作线程分配单独的分区,并将该分区内的哈希表合并在一起(参见上图中的阶段 2)。因为分区是在散列上使用基数分区方案创建的,所以所有工作线程都可以独立地合并各自分区内的散列表。结果是正确的,因为每个组都进入一个分区并且仅该分区 【即将各个线程的 HT 条目所对应的分区聚合】
不需要构建一个包含所有组的最终(可能是巨大的)哈希表,因为基组分区确保每个组都本地化到一个分区 【即各分区数据可以直接输出】
2.3 计算流程
逻辑走向
构建哈希表
Combine
Finalize
GetData
分区逻辑
未分区哈希表
当哈希桶达到上限,当前值 *2 的方式进行分区
计算桶落在哪个分区,FlushMoveState(allocator, layout)
拷贝数据
将桶数据插入新的哈希表,参考哈希表实现流程
最后调用 RowOperations::CombineStates
将原哈希表的 PayLoad 的数值赋值给 分区的哈希表
2.4 核心优点
计算哈希值
DuckDB 算子间用 DataChunk 传递数据,在 HashAgg 计算哈希值时,采用列式计算,后续列和前列哈希值按行与操作
哈希值使用高十六位的比较方式(哈希值撒盐+右移)
数据比对
不用构造迭代器,直接指针偏移 + 格式转换
数据拷贝
数据拷贝过程,不用迭代器,C 风格 memcopy ,效率很高
哈希表结构
开放寻址法,桶和数据分开构筑,参考 4.1 提到的时间和空间效率
聚合函数
所有的聚合函数均使用函数指针以执行绑定操作,HashAgg 算子本身不关注聚合类型。在向上传输数据时,分组列和聚合函数值,本身就在同一块内存区域,利于数据拷贝
由聚合函数本身提供相关的具体实现,以 AVG 举例
PlayLoad 初始化和合并
在分组结束后,按行依次更新聚合函数值
在 GetData 时调用 Finalize ,完成最后的聚合计算
哈希表分区
在当前哈希表桶数超过默认最大限制(1024)且支持分区时,将当前哈希表进行拆分成多个哈希表,由 RadixPartitionedHashTable 负责开线程(走 PipelineTask 的资源管控)
这种哈希表的分区方式,使每一个哈希表桶基数不大,减轻行数据的寻址成本
三 🏠 结语
身处于这个浮躁的社会,却有耐心看到这里,你一定是个很厉害的人吧 👍👍👍
各位博友觉得文章有帮助的话,别忘了点赞 + 关注哦,你们的鼓励就是我最大的动力
博主还会不断更新更优质的内容,加油吧!技术人! 💪💪💪