作者:倪涛 MO产品布道师
目录
Part 1.
背景
Part 2.
Shuffle实现原理
Part 3.
Colocate shuffle
本文字数:7000字+ 阅读时间:8分钟+
Part 1
背景
shuffle算法的基本原理,就是给输入的数据进行分桶,每个桶内的数据都可以单独处理并且输出结果。这样可以减少汇总数据时hash表的大小。
以group算子为例,实现方式有sort group和hash group两种。hash group在多并发时,每个并发都会对输入数据进行hash并放到hash表中进行处理。所有hash表需要汇总到一起再做一次merge,以得到最终结果。这种方式叫做merge group。另外一种方式是先将数据进行shuffle分发,每个并发接收并处理不同的数据,并且直接输出结果,不需要将hash表merge到一起。这种方式叫做shuffle group。
为什么需要支持shuffle?原因主要有以下三点。:
解决hash表太大导致分配内存失败的问题:以merge group为例,由于需要将所有数据merge到一起,如果最后merge的hash表基数太大,则需要分配的内存就会非常大,会导致分配内存失败。如果超过单CN(compute node, 表示计算节点)的内存上限,且不考虑spill的情况下,还会导致oom。而shuffle则可以将一个大的hash表切分成多个小的hash表进行处理。
提高计算节点的横向扩展性:仍以merge group为例,由于需要将所有数据merge到一起,在单CN上进行单并发处理。如果hash表基数比较大,这一步处理比较慢,就会成为瓶颈,即使增加再多CN也无法加速这样的query。而shuffle group则不存在这样的单点瓶颈。
提升性能:当hash表太大时,随机访问hash表会导致非常高的cache miss概率,而cache miss会导致性能显著下降。通过shuffle将hash表切分成多个小hash表进行处理,对每个小hash表的随机访问cache命中率大大提高,进而提升整体性能。
Part 2
Shuffle实现原理
shuffle对数据进行分桶,桶的数目在编译pipeline(在执行阶段,算子被组成流水线来执行。一道流水线被称为一个pipeline)时,由运行环境的CN数量,CPU核数动态决定。例如当前租户的环境是3CN,每个CN10核,shuffle算子会自动计算分桶数量为30。
shuffle的分桶算法有两种,range shuffle和hash shuffle。hash shuffle直接通过一个hash函数得到数据的分桶号。range shuffle则需要优化器在编译阶段给出分桶策略。对于均匀分布的数据,分桶策略较为简单。以tpch1T,lineitem表为例。l_orderkey的取值范围是1到60亿,且均匀分布。对该列进行shuffle的策略则是按照最小最大值进行平均分配,1-2亿分到桶1,2亿到4亿分到桶2,依次类推。对于非均匀分布的数据处理算法比较复杂,将在后文详细介绍。
MO在编译阶段由优化器决定使用range shuffle还是hash shuffle,通常优先使用range shuffle,如果出于某些原因无法使用range shuffle,则会使用hash shuffle,保证分桶的均匀性。
在MO Pipeline中对应shuffle算法的算子有两个,shuffle和dispatch。
Shuffle算子的功能是将输入的数据进行分桶,并且输出分桶后的数据。例如输入一个batch(MO是列存数据库,算子对数据的处理以batch为基本单位),batch内包含8192行数据,值为1、2、3 ... 一直到8192。如果需要分两个桶,可能的分桶结果是将1-4096放入一个新batch,4097-8192放入一个新batch内,然后将这两个新的batch再发送出去。
由于并发可能非常多,分桶后每个桶内数据可能很少,所以直接输出是不太合适的。因此shuffle算子需要在内部维护一个内存池,将分桶后的数据存放起来,攒到一定程度的时候再发送出去。或者是当shuffle算子读完所有输入数据后,就需要将内部所有数据都发送出去。
在pipeline里大部分算子都是一个batch输入对应一个batch的输出,但是shuffle算子并不会这样。由于内部维护的内存池,shuffle可能一段时间内没有输出,也可能连续输出多个batch。内存池的策略对shuffle算子的性能有比较重要的影响。
Dispatch算子负责将shuffle分桶后的数据发送到指定的位置。shuffle算子会在batch中设置一个字段shuffleIDX,dispatch算子读取这个字段,并且根据指定的策略找到需要发送的目的地并将batch发送出去。需要注意的是shuffleIDX这个字段不会参与batch的序列化和反序列化,因此shuffle算子和dispatch算子之间一定不能有跨CN的网络传输,否则会导致错误。
dispatch算子有三种发送数据的策略,绝大部分情况下使用第一种策略,后面两种策略在hybrid shuffle中使用,将在后续进行介绍。
直接根据shuffleIDX找到唯一的对应目的地(可能在本地CN,也可能在远端CN),并发送过去。可能会有跨CN的网络传输。
根据shuffleIDX找到当前CN的对应目的地,并发送过去。这种情况下一定只给本地发送一份,不会有跨CN的网络传输。
根据shuffleIDX找到每个CN的对应目的地,并发送过去。这种情况下会给每个CN都发送一份,必定会有跨CN的网络传输。
目前MO支持开启shuffle的算子有四个:load、scan、group和join。不过理论上任何算子对shuffle都是无感知的,以后MO将会支持对更多的算子开启shuffle算法。
Load算子
Load算子中部分复用了shuffle的分发逻辑来保证多并发读以及多并发写的情况下,写入s3的数据能保留数据原始排序。
Scan算子
对scan算子的shuffle,是在编译pipeline的阶段,将所有需要读的block分发到多个CN上。这里的shuffle是以block为单位进行,还没有到实际读取block数据的阶段。如果是hash shuffle,则对blockname做hash。如果是range shuffle,则取block zonemap里min max的中间值进行range计算。
Group算子
对group算子的shuffle,是将group算子在编译pipeline时静态展开成多个pipeline,对scan算子实际读取的block数据进行shuffle,理论上以每行数据为单位。每个pipeline接收shuffle输出的数据。此时group算子不需要merge group,每个单独的pipeline都可以直接输出数据到下一个pipeline。
Join算子
对join算子的shuffle,同样是将join算子在编译pipeline时静态展开成多个pipeline。join的输入分为probe和build两边,对两边的输入数据采用同样的shuffle算法,保证相同数据会分到同一个桶内。每个join算子也可以直接将join结果输出,而不需要做merge。
如果需要查看一条语句的执行计划是否开启shuffle,可以通过explain语句,查看执行计划中是否有shuffle关键字。例如tpch q4,可以看到lineitem表join orders表,join类型是right semi,lineitem是probe表,orders是build表。并且开启了shuffle join,在lineitem表上对l_orderkey这一列做了range shuffle。由于join两表采用同样的shuffle算法,所以在orders表上同样会对o_orderkey这一列做shuffle。
QUERY PLAN
Project
-> Sort
Sort Key: orders.o_orderpriority INTERNAL
-> Aggregate
Group Key: orders.o_orderpriority
Aggregate Functions: starcount(1)
-> Join
Join Type: RIGHT SEMI hashOnPK
Join Cond: (lineitem.l_orderkey = orders.o_orderkey) shuffle: range(lineitem.l_orderkey)
-> Table Scan on tpch_10g.lineitem
Filter Cond: (lineitem.l_commitdate < lineitem.l_receiptdate)
Block Filter Cond: (lineitem.l_commitdate < lineitem.l_receiptdate)
-> Table Scan on tpch_10g.orders
Filter Cond: (orders.o_orderdate >= 1997-07-01), (orders.o_orderdate < 1997-10-01)
Block Filter Cond: (orders.o_orderdate >= 1997-07-01), (orders.o_orderdate < 1997-10-01)
Part 3
Colocate shuffle
shuffle的执行计划在多CN上虽然可以减少hash表的开销,但是同样可能会导致数据在网络传输上传输的开销增加。一个好的shuffle执行计划,必须要尽可能减少数据在网络上的传输。所以接下来要介绍的colocate shuffle优化对性能是至关重要的。
仍以tpch1T,q4为例。lineitem表输出行数大约为38亿行,orders表输出行数大约为5千万行。如果采用broadcast join,hash表太大难以处理,并且需要广播5千万行的hash表,开销也不算低。如果采用普通的shuffle join,在3CN上,可以计算出需要走网络传输的数据量大约为(38亿+5千万)/3*2,大约是26亿行左右。虽然hash表变小了,但是网络传输开销太大,性能会降低到难以接受。此时使用colocate shuffle join是最优选择。
colocate shuffle join的具体原理是,从scan开始,利用block zonemap将数据分布到对应的CN上,并且尽可能保证后续的shuffle算法会将数据shuffle到当前CN上进行处理,最大程度减少数据跨网络发送。在这个例子中,优化器会告诉scan算子,block zonemap处于1-20亿之间的数据在CN1上读取,20亿-40亿之间的数据在CN2上读取,40亿到60亿之间的数据在CN3上读取。同时优化器会告诉shuffle join,1-20亿之间的数据需要shuffle到CN1上进行计算,20亿-40亿之间的数据shuffle到CN2上计算,40亿到60亿之间的数据在CN3上计算。如果每个CN有10核,那么具体到CN1是1-2亿之间在第一个pipeline上处理,2亿-4亿在第二个pipeline上处理,以此类推。具体到CN2是20亿-22亿之间在第一个pipeline上处理,22亿-24亿在第二个pipeline上处理,以此类推。具体到CN3是40亿-42亿之间在第一个pipeline上处理,42亿-44亿在第二个pipeline上处理,以此类推。
此时,只有极少数的block,由于数据正好跨越在shuffle的边界上,需要进行跨网络传输。例如某个block zonemap为19亿-21亿,这个block可能是在CN1上进行读取,读取到的8192行里一部分应该在CN1上计算,一部分应该发送到CN2进行计算。由于l_orderkey列是主键,TAE(MO的存储引擎)会保证主键在S3文件里的的排序,block与block之间应该是重叠非常少,需要跨网络传输的数据应该是极少数。整体流程示意如下图所示。
由此可见,想要开启colocate shuffle ,必须是range shuffle。如果使用hash shuffle,则一定无法开启colocate shuffle。
另外colocate shuffle 还有个优化是,对于绝大多数block,可以直接通过zonemap就判断出这个block可以整体shuffle到某个桶内,而不需要对每一行进行读取,计算,重新组成新的batch。在这种情况下,colocate shuffle join几乎没有引入新的开销,但是比broadcast join减少了广播hash表的开销,显著降低了hash表随机访问导致cache miss的开销。实测多CN场景下性能几乎是broadcast join的十倍以上。并且colocate shuffle join具有良好的多CN扩展性,可以达到线性扩展,甚至某些场景下可以超过线性扩展性。
Colocate shuffle在分布式场景下对性能提升很大,不过具体提升程度是与数据的排序性,block与block之间的重叠度有关的。通常主键或者clusterby列具有良好的排序性,提升最大,其他情况则colocate shuffle可能退化成普通shuffle。另外由于join是对两边做同样的shuffle策略,所以有可能是两边都开启colocate,也有可能只有一边可以开启colocate。由于join order的策略总是将小表放在build端,所以通常是优先对probe端的表开启colocate。
目前MO对colocate shuffle join的支持是优化器自动识别计算的。比起同类数据库的实现,优势在于不依赖手工收集统计信息,不需要修改DDL,对计算节点数量(CN数)以及并发数没有任何限制,扩缩容时也可以自动计算出新的shuffle执行计划,用户无需进行任何额外配置或调整,从而实现了完全透明的操作体验。主要体现在以下几个方面:
首先是统计信息,colocate shuffle join必须使用range shuffle,range shuffle又依赖优化器给出合理的shuffle策略,这就必须依赖准确的stats(统计信息)。而MO的优化器会在访问block zonemap时自动计算统计信息,不依赖任何定时或者人工更新统计信息的策略,可以保证在任何情况下都可以通过准确的stats来计算range shuffle策略。
其次不依赖分区表的分区策略。这里包含几个部分,一是可以支持对任意表进行colocate shuffle join,不需要用户改动任何DDL。二是不限制在特定的列上。例如lineitem join orders on l_orderkey = o_orderkey, 可能应该在l_orderkey 和 o_orderkey这两列上开启shuffle, 但是如果lineitem join part on l_partkey=p_partkey where l_shipdate < xxx, lineitem表过滤后输出行数小于part表输出,此时应该在lineitem表上做build,part表做probe,在p_partkey 列和l_partkey 列上做shuffle,保证part表的数据可以实现colocate。这个策略应当由优化器来进行计算,如果依赖用户指定,就可能无法得到最优执行计划。三是shuffle分桶数量不依赖分区的数量,有些情况下用户在建表时很难立刻给出最合适的分区数,还有些情况下,分区和分区之间的数据可能不是很均衡。MO的实现方式是用户无需感知这一切,全部交给优化器进行计算。
最后是对于serverless数据库,CN的数量随时会发生变化。第一次执行的时候可能是3CN10核的场景,需要shuffle分30个桶,下一次执行可能变成了5CN10核的场景,需要分50个桶。仍以tpch1T lineitem表为例,MO会自动计算出新的shuffle策略为第一个桶处理1-1.2亿,第二个桶处理1.2亿到2.4亿,依次类推。再下一次执行可能又变成了7CN8核,MO依旧会自动计算新的shuffle策略。