作者:倪涛 MO产品布道师
目录
Part 1.
如何处理不均匀数据?
Part 2.
Hybrid shuffle
Part 3.
Shuffle resue
Part 4.
Join reorder
Part 5.
总结
本文字数:7000字+ 阅读时间:8分钟+
Part 1
如何处理不均匀数据?
之前一直用tpch数据集进行举例,但是tpch是一个比较理想的场景,所有数据都是比较均匀分布的。实际生产环境中,很多数据是不均匀分布的。对于不均匀数据,比较简单的做法是使用hash shuffle来保证分桶后的数据是均匀的。由于hash shuffle无法开启colocate优化,因此优化器还是尽可能使用range shuffle,这就需要对数据的分布有比较好的掌握,从而给出更好的分桶算法。
MO的优化器,在计算stats阶段,同时也会根据数据的zonemap来计算数据的分布情况。具体做法是每访问一个zonemap,就假定这个object中的数据在这个zonemap内均匀分布。通过遍历所有的zonemap来计算数据分布,并将计算结果保存在ShuffleRange这个数据结构中。
这里有个细节是对数值类型,假设每个zonemap在自己的min-max区间内数据均匀分布,直接按max从小到大排序。对字符类型,由于每个字符占一个字节,但一个字节的大部分值不会出现在字符串中,对每个zonemap数据均匀分布的假设不成立,所以对每种出现过的字符映射到数字,重新计算每个字符串对应的新值后再按max从小到大排序。
ShuffleRange这个结构里有几个关键的值:
ShuffleRange.Overlap
是一个0到1之间的float64类型变量,表示zonemap之间的重叠度,Overlap越大重叠的部分越多,具体定义为:任意两个zonemap之间重叠的比例的平均值的平方根,取平方根是为了更方便区分重叠度较低的部分数据集。
ShuffleRange.Uniform
是一个0到1之间的float64类型变量,表示数据的平均度,Uniform越大,数据更平均,具体定义为:数据整体平均的密度除以数据最密集处的密度。Uniform接近于1时,考虑直接对整体最大最小值平均划分值来分桶。
ShuffleRange.Result
是一个[]float64,长度为默认分桶数1024,表示相邻两个桶之间的划分值。Result的计算方法为:假设每个zonemap在自己的min-max区间内数据均匀分布,将zonemap排序后计算每一小段的密度并计算划分值。
计算完ShuffleRange之后,优化器会根据overlap,uniform等指标决定是否采用range shuffle。如果数据分布不是很理想,算法无法给出合适的分桶方法,那么只能使用hash shuffle。如果指标合适,说明可以给出合理的分桶方案,则会默认保留1024个桶的分布方法。之后在编译pipeline时,再根据运行时信息,决定实际的分桶数后,重新分出N个桶。限于篇幅具体的算法细节也不再做进一步介绍,感兴趣的同学可以通过MO的源码直接查看相关细节。
Part 2
Hybrid shuffle
对于某些query来说,不满足colocate shuffle join的条件,此时无论使用broadcast join,还是使用shuffle join都达不到很好的性能。例如tpch1T的q9,查询中有个join是(lineitem.l_suppkey = supplier.s_suppkey), (lineitem.l_partkey = part.p_partkey),其中左边lineitem表输出60亿行数据,右边是一个join子树,输出大约四千万行数据。优化器选择l_suppkey这一列作为shuffle列,由于这一列没有排序,所以必然会有大部分数据需要跨CN进行传输。同样以3CN10核为例,需要跨CN传输的数据量为40亿行。会导致性能非常差。此时可以观察到build端只有四千万行数据,传输四千万行的代价远远小于40亿行。
在这种情况下优化器对shuffle策略做了一个优化。首先保证probe端所有数据一定shuffle到本地,其次需要让build端数据shuffle到每个CN,以保证每个CN都能访问到完整的hash表。
具体算法如下:
首先对每个pipeline,包括build端和probe端都有一个shuffle编号,从0到30,并且一定要保证一一对应。
对于probe端,例如shuffle算子告诉dispatch算子某个batch需要发送到18号。18%10=8,则8号、18号以及28号算子中,找到哪一个在本地,就发送给对应算子。对应dispatch算子的策略2。
对于build端,仍然以18号为例,则dispatch算子会将数据发送给8、18以及28这三个算子,确保3个CN都有完整的hash表。对应dispatch算子的策略
这样probe端60亿行全部发往本地,build端四千万行需要复制到3个CN上,发送代价仅有八千万行,远远小于40亿。与普通shuffle join相比,大大减少了跨网络数据传输。而与broadcast join相比,跨网络传输代价相同,但是每个hash表的大小只有broadcast join的十分之一。此时可以得到最佳的性能。由于兼顾了shuffle join和broadcast join的有点,所以这种执行计划叫做hybrid shuffle join。
但是这种策略下,每个CN都有完整的hash表,所以如果单CN内存不够大,不足以存放完整hash表时,可能需要spill,此时hybrid shuffle join就未必比普通shuffle join更优。另外如果probe端比build端并没有大很多,而CN数量又很多,hybrid shuffle同样也未必由于普通shuffle join。需要优化器和pipeline综合考虑得到最优计划。在explain结果中,通过查看是否存在HYBRID关键字,可以看到优化器是否为当前的shuffle join开启这一策略。例如tpch1T q3、orders join customer就采用了hybrid shuffle。
QUERY PLAN
Project
-> Sort
Sort Key: sum(lineitem.l_extendedprice * (1 - lineitem.l_discount)) DESC, orders.o_orderdate INTERNAL
Limit: 10
-> Aggregate
Group Key: lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority shuffle: REUSE
Aggregate Functions: sum((cast(lineitem.l_extendedprice AS DECIMAL128(38, 2)) * (1 - cast(lineitem.l_discount AS DECIMAL128(38, 2)))))
-> Join
Join Type: INNER hashOnPK
Join Cond: (lineitem.l_orderkey = orders.o_orderkey) shuffle: range(lineitem.l_orderkey)
-> Table Scan on tpch_1000g.lineitem
Filter Cond: (lineitem.l_shipdate > 1995-03-29)
Block Filter Cond: (lineitem.l_shipdate > 1995-03-29)
-> Join
Join Type: INNER hashOnPK
Join Cond: (orders.o_custkey = customer.c_custkey) shuffle: range(orders.o_custkey) HYBRID
-> Table Scan on tpch_1000g.orders
Filter Cond: (orders.o_orderdate < 1995-03-29)
Block Filter Cond: (orders.o_orderdate < 1995-03-29)
-> Table Scan on tpch_1000g.customer
Filter Cond: (customer.c_mktsegment = 'HOUSEHOLD')
Part 3
Shuffle reuse
某些场景下,shuffle join的结果又需要进行shuffle group。如果选择的shuffle列相同,shuffle策略也相同,那么shuffle group实际不需要再进行一次shuffle,直接在pipeline中让join的结果输出给group算子,group算子再输出结果即可。此时性能最佳。
以tpch1T,q13为例,right join和group使用了相同的分组策略,优化器也选择了相同的shuffle策略。此时probe结果可以直接给group算子,避免重复的shuffle。可以在explain语句中查看是否存在REUSE关键字,来查看优化器是否开启这一优化。
QUERY PLAN
Project
-> Sort
Sort Key: count(*) DESC, c_orders.c_count DESC
-> Aggregate
Group Key: count(orders.o_orderkey)
Aggregate Functions: starcount(1)
-> Aggregate
Group Key: customer.c_custkey shuffle: REUSE
Aggregate Functions: count(orders.o_orderkey)
-> Join
Join Type: RIGHT hashOnPK
Join Cond: (orders.o_custkey = customer.c_custkey) shuffle: range(orders.o_custkey)
-> Table Scan on tpch_10g.orders
Filter Cond: (not (orders.o_comment like '%pending%accounts%'))
-> Table Scan on tpch_10g.customer
在优化器中,考虑到shuffle reuse之后,搜索空间会变得很大,策略会变得非常复杂。例如某个join原本不适合走shuffle,但是考虑到子节点或者父节点走了shuffle,当前节点可以直接reuse,就应该走shuffle的执行计划。即使对于确定了应该走shuffle的执行计划,也要考虑是否支持colocate,是否应该使用hybrid shuffle,应该选择range shuffle还是hash shuffle。这里对优化器是非常大的挑战。目前MO优化器通过多遍搜索的方式来寻找最优执行计划,具体细节限于篇幅不在这里介绍,感兴趣的同学可以通过MO的源码直接查看相关细节。
Part 4
Join reorder
在查询优化中,代价是衡量一个执行计划好坏的标准,通常代价代表了一个执行计划的执行时间或者对数据库系统资源的占用量,包括 CPU 资源、IO 资源、网络资源等。在单机执行中,代价模型通常只需要考虑 CPU 和 IO 就可以。但是在分布式的场景中,除了考虑 CPU 和 IO 的代价之外,还需要考虑网络传输代价、查询的并行度以及一些分布式特定优化场景的代价,比如 bloom filter 的代价计算等。这些因素从根本上提升了分布式代价模型设计和拟合的复杂性,也从一定程度上增加了整个分布式查询优化的复杂性。
从本质上来说,一个单机最优的join order,在分布式状态下未必是最优。为了解决分布式查询优化带来的复杂性,跟业界的大部分解决方案类似,MO的优化器采用二阶段的分布式查询优化方法。首先使用单机的join order算法,求解出一个单机最优的执行计划。然后对执行计划进行二次扫描,为每个算子确定其分布式执行计划,是应该使用merge group还是shuffle group,是应该用broadcast join还是shuffle join。在这个过程中,还会经历多次递归扫描,判断是否可以开启colocate shuffle,是否需要使用hybrid shuffle,是否满足shuffle reuse的条件等等。
以tpch1T Q10为例,在单机环境下得到的最优执行计划,和加入shuffle执行计划后,分布式场景下搜索到的执行计划分别为:
左边是单机场景下最优的执行计划。关键是lineitem最大的表,其他表全部join好之后最后再和lineitem表做join,可以最大程度降低lineitem数据量,避免先做大表的join。但是加入shuffle的搜索空间后,右边才是更优的执行计划。一是直接让lineitem和orders 做join,这个join可以对两侧同时启用colocate shuffle join,性能达到最好。二是让customer和nation的join结果集出现在父节点join的左边,输出结果可以保留customer的排序。这时group节点可以直接reuse join节点的shuffle,避免了重复shuffle。实际测试新的执行计划在分布式场景下比原执行计划性能提升一倍左右。
QUERY PLAN
Project
-> Sort
Sort Key: sum(lineitem.l_extendedprice * (1 - lineitem.l_discount)) DESC
Limit: 20
-> Aggregate
Group Key: customer.c_custkey, customer.c_name, customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address, customer.c_comment shuffle: REUSE
Aggregate Functions: sum((cast(lineitem.l_extendedprice AS DECIMAL128(38, 2)) * (1 - cast(lineitem.l_discount AS DECIMAL128(38, 2)))))
-> Join
Join Type: INNER
Join Cond: (customer.c_custkey = orders.o_custkey) shuffle: range(customer.c_custkey)
-> Join
Join Type: INNER hashOnPK
Join Cond: (customer.c_nationkey = nation.n_nationkey)
-> Table Scan on tpch_10g.customer
-> Table Scan on tpch_10g.nation
-> Join
Join Type: INNER hashOnPK
Join Cond: (lineitem.l_orderkey = orders.o_orderkey) shuffle: range(lineitem.l_orderkey)
-> Table Scan on tpch_10g.lineitem
Filter Cond: (lineitem.l_returnflag = 'R')
-> Table Scan on tpch_10g.orders
Filter Cond: (orders.o_orderdate < 1993-06-01), (orders.o_orderdate >= 1993-03-01)
Block Filter Cond: (orders.o_orderdate < 1993-06-01), (orders.o_orderdate >= 1993-03-01)
Part 5
总结
Shuffle的执行计划在优化器中是非常重要的一块,限于篇幅这里只介绍了其中比较重要的一部分,更多细节和相关实现代码欢迎直接查看MO源代码。
About
MatrixOne
MatrixOne 是一款基于云原生技术,可同时在公有云和私有云部署的多模数据库。该产品使用存算分离、读写分离、冷热分离的原创技术架构,能够在一套存储和计算系统下同时支持事务、分析、流、时序和向量等多种负载,并能够实时、按需的隔离或共享存储和计算资源。 云原生数据库MatrixOne能够帮助用户大幅简化日益复杂的IT架构,提供极简、极灵活、高性价比和高性能的数据服务。
MatrixOne企业版和MatrixOne云服务自发布以来,已经在互联网、金融、能源、制造、教育、医疗等多个行业得到应用。得益于其独特的架构设计,用户可以降低多达70%的硬件和运维成本,增加3-5倍的开发效率,同时更加灵活的响应市场需求变化和更加高效的抓住创新机会。在相同硬件投入时,MatrixOne可获得数倍以上的性能提升