在介绍 Scalar Function(给 Databend 增加 Scalar 函数 | 函数开发系例一) 后,咱们来看 Aggregate Function。

Aggregate Function 用于对列的值进行操作并回来单个值。常见的 Agg Function 有 sum, count, avg 等。

函数注册

Agg Function 的 register 函数以小写的函数名和 AggregateFunctionDescription 类型为参数,每个被注册的函数都存入 case_insensitive_desc ( HashMap 结构) 中。

而 case_insensitive_combinator_desc 是为了存储一些组合函数,比方与 _if 组合的 count_if, sum_if 这类函数。

pubstructAggregateFunctionFactory{
case_insensitive_desc:HashMap<String,AggregateFunctionDescription>,
case_insensitive_combinator_desc:Vec<(String,CombinatorDescription)>,
}
implAggregateFunctionFactory{
...
pubfnregister(&mutself,name:&str,desc:AggregateFunctionDescription){
letcase_insensitive_desc=&mutself.case_insensitive_desc;
case_insensitive_desc.insert(name.to_lowercase(),desc);
}
...
}

每个被注册的函数都要实现 trait AggregateFunction 和 AggregateFunctionFeatures。其间 AggregateFunctionFeatures 和 Scalar 中的 FunctionProperty 比较相似,都是存储函数的一些特质。

pubtypeAggregateFunctionRef=Arc<dynAggregateFunction>;
pubtypeAggregateFunctionCreator=
Box<dynFn(&str,Vec<Scalar>,Vec<DataType>)->Result<AggregateFunctionRef>+Sync+Send>;
pubstructAggregateFunctionDescription{
pub(crate)aggregate_function_creator:AggregateFunctionCreator,
pub(crate)features:AggregateFunctionFeatures,
}

首要来看 trait AggregateFunction,这儿面是 Agg Function 的构成。

函数构成

能够看到与 Scalar 直接运用一个 Struct 不同,AggregateFunction 是一个 trait。由于聚合函数是按 block 累加列中的数据,再累加进程中会发生一些中心成果。

因而 Aggregate Function 必须有初始状况,而且聚合进程中生成的成果也要是 mergeable (可合并) 和 serializable (可序列化) 的。

首要函数有:

  • name 表明被注册的函数的名字,比方 avg, sum 等等。

  • return_type 表明被注册的函数回来值的类型,相同的函数回来值可能会由于参数类型的不同而发生变化。比方 sum(int8) 参数为 i8 类型,但是回来回来值可能是 int64。

  • init_state 用来初始化聚合函数状况。

  • state_layout 用来表明 state 在内存中的大小和内存块的摆放方式。

  • accumulate 用于SingleStateAggregator。也就是着整个块能够在单个状况下聚合,没有任何 keys。比方 select count(*) from t 此刻查询中没有任何分组列的聚合,这时会调度 accumulate 函数。

  • accumulate_keys 则是用于 PartialAggregator。这儿需要考虑 key 和 offset ,每个 key 代表一个仅有的内存地址,记为函数参数 place。

  • serialize 将聚合进程中的 state 序列化为二进制

  • deserialize 从二进制反序列化为 state。

  • merge 用于合并其他 state 到当前 state。

  • merge_result 能够将 Aggregate Function state 合并成单个值。

示例

以 avg 为例

详细实现在 aggregate_avg.rs 中。

由于咱们需要累加每个值,并除以非 null 总行数。因而 avg function 被界说为 struct AggregateAvgFunction<T, SumT> 。其间 T 和 SumT 是实现 Number 的逻辑类型。

在聚合进程中 avg 会发生的中心状况值是 已经累加的值的总和 以及 已经扫描过的非 null 的行。因而 AggregateAvgState 能够被界说为如下结构。

#[derive(Serialize,Deserialize)]
structAggregateAvgState<T:Number>{
#[serde(bound(deserialize="T:DeserializeOwned"))]
pubvalue:T,
pubcount:u64,
}
  • return_type 设置为 Float64Type。比方 value = 3, count = 2, avg = value/count。

  • init_state 初始状况设置 value 为 T 的 default 值,count 为 0。

  • accumulate AggregateAvgState 的 count, value 分别对 block 中非 NULL 的行数和值进行累加。

  • accumulate_keys 经过 place.get::<AggregateAvgState>() 获取对应的状况值,并进行更新。

fnaccumulate_keys(
&self,
places:&[StateAddr],
offset:usize,
columns:&[Column],
_input_rows:usize,
)->Result<()>{
letdarray=NumberType::<T>::try_downcast_column(&columns[0]).unwrap();
darray.iter().zip(places.iter()).for_each(|(c,place)|{
letplace=place.next(offset);
letstate=place.get::<AggregateAvgState<SumT>>();
state.add(c.as_(),1);
});
Ok(())
}

相似的聚合函数示例也能够参考

sum,

count。

函数测验

Unit Test

聚合函数相关单元测验在 agg.rs 中。

Logic Test

Functions 相关的 logic 测验在 tests/logictest/suites/base/02_function/ 中。

关于 Databend

Databend 是一款开源、弹性、低成本,基于目标存储也能够做实时剖析的新式数仓。等待您的重视,一起探究云原生数仓解决方案,打造新一代开源 Data Cloud。

  • Databend 文档:databend.rs/

  • Twitter:twitter.com/Datafuse_La…

  • Slack:datafusecloud.slack.com/

  • Wechat:Databend

  • GitHub :github.com/datafuselab…