Kylin 查询的基本流程

1.查询入口

Kylin server 接到用户端的查询后, 会使用 JDBC 与 Calcite 连接, 并告诉 Calcite 要查询的表的 schema 信息, 并且注册一系列转化的 RULE 到 calcite.

Calcite 作为一个通用的 SQL 框架, 他出发点是希望能为不同计算存储引擎提供统一的 SQL 查询引擎, 它本身不知道也不关心底层的 storage, 所以使用 Calcite 框架, 需要注意两点:

  1. schema 信息(通过 JDBC 的 Properties 传入)
  2. TableScan 如何读取数据(需要框架使用方自己实现, 后面讲代码生成时会提到)

image-20200207232940630

image-20200207232956828

2.定义规则 Hook 到 Calcite

Kylin 使用上面注册进去的这些 Rule 把各个 Calcite 逻辑执行计划节点转换成 Kylin 的逻辑执行计划节点(OLAP*Rel)

  1. OLAPRel 也继承自RelNode(Calcite 的逻辑执行计划), 简单来说每个OLAP*Rel只是包了下 Calcite 的逻辑执行计划节点, 只是为了多加一些自己的抽象方法, 需要各个子类去实现, 后面会详细介绍
  2. 这些抽象方法是用来完成[查询信息收集/选 Cube/rewrite 执行计划/生成具体的物理执行计划]

image-20200207233022958

  1. 可以看到这个 Rule 当遇到 LogicalFilter 时, 会把它转换成 OLAPFilterRel, 他们都是 RelNode 的子类.

image-20200207233039360

  1. 这里有一个在 calcite 代码里 Hack 的点, 就是所有生成的查询计划树, 头结点一定会是 OLAPToEnumerableConverter, 如果不是, 会抛错.

image-20200207233054038

这个是串起整个代码流程的关键位置, 下面会详细讲解:

OLAPToEnumerableConverter.implement

image-20200207233114376

3.切分 OLAPContext与选择 Cube

OLAPContext 与 Cube 一一对应, 它定义了很多属性, 帮助我们定位一个 OLAPContext 是否有 Cube 与其对应.

所以当我们把一颗执行计划树分解成一个个 OLAPContext, 并且找到这一个个 OLAPContext 有哪些对应的 cube 的时候, 我们就知道了这个查询中, 哪些部分能够用 Cube 来回答, 哪些部分不能.

OLAPContext记录的信息:

1
2
3
4
5
6
Aggregations
Filter columns
Group by columns
Joins
Tables
...

3.1 如何切分 OLAPContext

1
2
OLAPRel.OLAPImplementor olapImplementor = new OLAPRel.OLAPImplementor();
olapImplementor.visitChild(getInput(), this);

OLAPRel 接口有个方法: implementOLAP,之前通过 Rule 转化成的各种 OLAP*Rel 都实现了这个方法, 从根节点OLAPToEnumerableConverter 开始, 使用 Visitor 模式访问, 当如果遇到 aggregation 就需要划分一个新的 OLAPContext.

对于 case1 , 在 Visitor 模式下, 会访问根节点 agg, 这需要先访问它的⼦子节点 filter, 类似的, 访问 filter 节点需要先访问其⼦子节点 join节点 …, 当所 有的⼦子节点全部访问完重新回到 agg时, 这时候需要划分出⼀一个 OLAPContext.同理理, case2 和 case3 会 被拆成两个 OLAPContext.

注意, case2 虽然有两个 OLAPContext, 但是左边那个 OLAPContext 无法对应一个 cube, 真正 cube 能加速的部分是右下角红色的那个OLAPContext, 同理 case3, 上面白色的两个算子都需要现算.

image-20200207233136751

3.2 选择 cube

根据 context 选择 Cube 的逻辑在: RealizationChooser.selectRealization(contexts);其实现比较简单, 就是遍历得到的所有 contexts, 再遍历cube, 看看其度量和维度是不是和我们提前定义的 cube 中的是一致的, 如果一致, 则表示这个 OC 可以用 cube 来回答.

1
2
Collection unmatchedDimensions = unmatchedDimensions(dimensionColumns, cube);
Collection unmatchedAggregations = unmatchedAggregations(aggrFunctions, cube);

4. Rewrite 执行计划

1
2
3
// rewrite query if necessary
OLAPRel.RewriteImplementor rewriteImplementor = new OLAPRel.RewriteImplementor();
rewriteImplementor.visitChild(this, getInput());

经过上面那步, Kylin 已经对用户的查询选出 cube 去回答了, 但是用户 SQL的语义到这里还并没有改变.

假设用户想查商品的 PV: SELECT item, COUNT(user_id) FROM stock GROUP BY item, 定义的 Cube 维度为 item, date, 度量为 COUNT(user_id), 到这一步, 执行计划的 agg 的节点里, 还是一个count() 的函数, 如果我们想用已经预计算过后的 cube 来回答这个SQL, 就会导致问题:

  1. 根本没有 user_id 这个列, 预计算过的 Cube 有三个列: user/date 以及一个 M_开头的度量列, 比如 M_C
  2. 即使是对这个度量列求 COUNT, 结果也不对, 因为这是预计算过后的条数, 最终改写的 SQL 应该是 SELECT item, SUM(M_C) FROM cuboid GROUP BY item, 为什么还要保留这个 group by 呢? 对于上文定义的 cube, 不是精确回答这句查询, 还需要从 item, date 聚合出 item_name 来, 但是相比较于从源表聚合, input 以及少了很多了

同理, rewrite 这步也是一个 visitor 模式, 每个算子都实现了 implementRewrite, 由每个算子自行决定要不要 rewrite 执行计划, 像 filter/limit 等算子, 是啥都不干的.

5.生成物理执行计划

注意: 这是老的实现, 新架构用 Spark 进行计算, 不走这边了

1
2
3
// implement as EnumerableRel
OLAPRel.JavaImplementor impl = new OLAPRel.JavaImplementor(enumImplementor);
EnumerableRel enumerable = impl.visitChild((OLAPRel) getInput());

同上面两步一样, 这里同样是一个 visitor 方法: EnumerableRel implementEnumerable(List inputs), 每个 OLAP*Rel 都实现了这个方法, 将每一个 OLAP*Rel 根据本次查询的参数, 生成 Calcite 自身的 EnumerableXXX 算子执行, 即逻辑节点转为物理节点.

这里用到的 Calcite 的 Enumerable 的机制, 这是 Calcite 默认的物理算子, 可以生成执行的代码. 等于说刚刚从正常的 Calite 逻辑执行计划转到 Kylin 的逻辑执行计划, 再进行改写, 都是 Kylin 为了预计算叉出去的, 现在又叉回到了 Calcite 本来的轨道上来, 但是这时候你的执行计划已经是改变过后的了, 对应生成的物理执行计划也可以用于查询 Cube 的数据.

OLAPSortRel#implementEnumerable

image-20200207233214957

注意

特别需要注意的是: OLAPTableScan 这个算子很特殊, 他们没有转回到 Calcite, 而是自己实现了EnumerableRel, 直接返回 this.

原因是因为这个算子是真正读取数据的地方, Kylin 需要在这个算子中, 接入 Cube 数据, 由于涉及到一些代码生成的东西, 所以下节介绍.

6.物理执行计划生成代码真正执行

1
2
3
OLAPRel.JavaImplementor impl = new OLAPRel.JavaImplementor(enumImplementor);
this.replaceInput(0, enumerable);
return impl.visitChild(this, 0, enumerable, pref);

同样还是 visitor 方法: Result implement(EnumerableRelImplementor implementor, Prefer pref);,每个 EnumerableRel 都实现了这个方法

截图是 EnumerableSort 生成代码的使用的代码, 用了 linq4j, 仿照的是 C# 的 linq, 也是 Calcite 作者 Julian 的大作, 大家有兴趣的可以自行研究下.

EnumerableSort#implement

image-20200207233235160

上文提到使用 Calcite 框架, 需要注意的第二点: TableScan 如何读取数据 接下来介绍, 这个很重要:

6.1 TableScan 如何读取数据

上文提到 OLAPTableScan 没有转换回 Calcite 的物理执行计划, 而是自己实现了EnumerableRel 的接口,也就说他自己实现了 implement 方法来读取数据:

OLAPTableScan#implement

image-20200207233314381

我们只需要关注 execFunction, 这个是最后生产代码取数据会执行的函数, 如果击中cube, 会走到 executeOLAPQuery, 这个方法在:OLAPTable#executeOLAPQuery

1
2
3
public Enumerable executeOLAPQuery(DataContext optiqContext, int ctxSeq) {
return new OLAPQuery(optiqContext, EnumeratorTypeEnum.OLAP, ctxSeq);
}

一路追下去, 会调用到 OLAPEnumerator.queryStorage, 这里就是具体 query storage 的地方:

1
2
3
4
5
6
7
private Iterator queryStorage() {
logger.debug("query storage...");
// query storage engine
IStorageQuery storageEngine = StorageFactory.createQuery(olapContext.realization);
ITupleIterator iterator = storageEngine.search(olapContext.storageContext, sqlDigest, olapContext.returnTupleInfo);
return iterator;
}

再追下去就是具体从 Hbase 扫数据等流程了, 这里就先不看了, 感兴趣的同学可以自己看, 欢迎随时交流

6.2 生成代码摘录

摘一个生成的代码, 注意这个不是 spark 的全阶段代码生成, 是一个算子自己的一份代码, 还是传统的火山模型, 所以还会有迭代器的开销.

整个计算过程迭代的读取指定 cuboid 数据,并执行相应的计算逻辑,是一个基于内存的单机计算过程(calcite 的计算是单机的! 大数据时代你敢信?).

可以看到整个数据是从 executeOLAPQuery 读上来, 假设后面有了新的 storage, 我们只需要做在 executeOLAPQuery 中就可以了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
*// _inputEnumerable 为 OLAPQuery 类型,OLAPQuery*
final org.apache.calcite.linq4j.Enumerable _inputEnumerable = ((org.apache.kylin.query.schema.OLAPTable) root.getRootSchema().getSubSchema("DEFAULT").getTable("KYLIN_SALES")).executeOLAPQuery(root, 0);
final org.apache.calcite.linq4j.AbstractEnumerable child = new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator enumerator() {
return new org.apache.calcite.linq4j.Enumerator(){
*// 类型,OLAPQuery.enumerator() 得到的 inputEnumerator 为 OLAPEnumerator 类型*
*// inputEnumerator 会调用 StorageEngine 去 HBase 中查询指定 cube、指定 cuboid(及可能的 filter 下推)数据*
public final org.apache.calcite.linq4j.Enumerator inputEnumerator = _inputEnumerable.enumerator();
public void reset() {
inputEnumerator.reset();
}
public boolean moveNext() {
while (inputEnumerator.moveNext()) {
final Integer inp4_ = (Integer) ((Object[]) inputEnumerator.current())[4];
if (inp4_ != null && inp4_.intValue() != 1000) {
return true;
}
}
return false;
}
public void close() {
inputEnumerator.close();
}
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] {
current[0],
current[5],
current[13],
current[11],
current[10]};
}
};
}
};
return child.groupBy(new org.apache.calcite.linq4j.function.Function1() {
public Long apply(Object[] a0) {
return (Long) a0[0];
}
public Object apply(Object a0) {
return apply(
(Object[]) a0);
}
}
, new org.apache.calcite.linq4j.function.Function0() {
public Object apply() {
java.math.BigDecimal a0s0;
boolean a0s1;
a0s1 = false;
a0s0 = new java.math.BigDecimal(0L);
long a1s0;
a1s0 = 0;
Record3_0 record0;
record0 = new Record3_0();
record0.f0 = a0s0;
record0.f1 = a0s1;
record0.f2 = a1s0;
return record0;
}
}
, new org.apache.calcite.linq4j.function.Function2() {
public Record3_0 apply(Record3_0 acc, Object[] in) {
final java.math.BigDecimal inp4_ = in[4] == null ? (java.math.BigDecimal) null : org.apache.calcite.runtime.SqlFunctions.toBigDecimal(in[4]);
if (inp4_ != null) {
acc.f1 = true;
acc.f0 = acc.f0.add(inp4_);
}
acc.f2 = acc.f2 + org.apache.calcite.runtime.SqlFunctions.toLong(in[3]);
return acc;
}
public Record3_0 apply(Object acc, Object in) {
return apply(
(Record3_0) acc,
(Object[]) in);
}
}
, new org.apache.calcite.linq4j.function.Function2() {
public Object[] apply(Long key, Record3_0 acc) {
return new Object[] {
key,
acc.f1 ? acc.f0 : (java.math.BigDecimal) null,
acc.f2};
}
public Object[] apply(Object key, Object acc) {
return apply(
(Long) key,
(Record3_0) acc);
}
}
).orderBy(new org.apache.calcite.linq4j.function.Function1() {
public Long apply(Object[] v) {
return (Long) v[0];
}
public Object apply(Object v) {
return apply(
(Object[]) v);
}
}
, org.apache.calcite.linq4j.function.Functions.nullsComparator(false, false)).take(10);

Ref

Kylin执行查询流程分析