本文主要介绍在Presto中orderby 算子是如何通过代码实现的。Presto中orderby会经过局部排序+全部Merge来实现列的所有数据排序,如下所示:
PartialSort的流程大概如下所示:
Presto的OrderByOperator算子在addPage的时候,首先会对位置进行编码,如下所示:
- public void addPage(Page page)
- {
- ....
- for (int position = 0; position < page.getPositionCount(); position++) {
- long sliceAddress = encodeSyntheticAddress(pageIndex, position);
- valueAddresses.add(sliceAddress);
- }
- ..
- }
当所有Page都add完毕后,上游的Operator会调用OrderByOperator的finish方法,OrderByOperator的finish方法将对valueAddresses的所有数据进行排序和重构一个排序后的Page,代码流程如下所示:
- finish()
- pageIndex.sort
- createPagesIndexComparator.sort
- quickSort(PagesIndex pagesIndex, int from, int to)
- comparator.compareTo
- // 在compareTo当中,from和to其实是位置编码数组valueAddresses上的下标
- // Presto读取位置编码数据上的编码,然后还原成Block的地址,读取Block的数据进行对比
- // 然后比较结果来决定是否交换位置编码上的数据
- // 也就是说,在Presto中,数据排序交换都是基于位置编码进行的,并不是Block内部的数据的排序
- pagesIndex.swap // valueAddresses位置上数据的交换
- pageIndex.getSortedPages
- buildPage
- // 重构排序后的Page的时候,也是根据位置编码的顺序进行的
- long pageAddress = valueAddresses.getLong(position);
- Block block = this.channels[outputChannel].get(blockIndex);
Presto中,所有Task经过PartialSort排序后,会变成局部有序,但是总体还是无序的,因此生成最终结果前,还需要经过MergeOperator将局部有序归并为总体有序。MergeOperator是通过一个类似归并算法将各个Task的数据进行总体排序的,其大概流程如下所示:
step1:首先需要将各个Task 传上来的所有Pages,按照Page + positition的结构映射成PageWithPosition,其中position指的是该Page的第几行数据。如下所示,2个Task的4个Page,将按照每一行一个PageWithPosition映射成12个PageWithPosition的列表(我们依次命名为Task0Page0Pos0,Task0Page0Pos1,Task0Page0Pos2....):
step2: presto 会根据order by的字段动态编译出一个比较器,这块的实现是在orderingCompiler.compilePageWithPositionComparator(types, sortChannels, sortOrder)中实现的,里边的实现原理和Presto Aggregation Function算子代码生成器代码走读_王飞活的博客-CSDN博客类似,此处非本文重点,不再重复描述。
step3:构造一个PriorityQueue,优先级队列的比较器为step2动态生成的比较器。Presto会首先将所有Task的队首元素add到PriorityQueue中,然后从PriorityQueue中poll出队首元素,在将poll出来元素所在的task的下一个元素add到PriorityQueue中,直到所有PageWithPosition都处理完毕。可以知道,poll元素是总体有序的,因此只有按照poll出来的元素顺序,重新组成新的Page,新的Page即为总体有序的Page。
以上述的案例举例,PriorityQueue中add和poll元素的顺序依次是:
add Task0Page0Pos0 -> add Task1Page0Pos0 -> poll Task0Page0Pos0 -> add Task0Page0Pos1 -> poll Task1Page0Pos0 -> add Task1Page0Pos1 -> poll Task0Page0Pos1 -> add Task0Page0Pos2 -> poll Task0Page0Pos2 -> add Task0Page1Pos0 -> poll Task0Page1Pos0 -> add Task0Page1Pos1 > poll Task1Page0Pos1 -> add Task1Page0Pos2 -> poll Task0Page1Pos1 -> add Task0Page1Pos2 -> Task1Page0Pos2 -> add Task1Page1Pos0 -> poll Task0Page1Pos2 -> add Task1Page1Pos1 -> poll Task1Page1Pos0 -> add Task1Page1Pos2 -> poll Task1Page1Pos1 -> poll Task1Page1Pos2
可以看出,poll该列的是总体排序的(1 -> 1 -> 4 ->6 -> 7 ->8 ->10 .....),因此实现了总体排序的功能,排序的代码重点部分为:
- while (true) {
- if (processor.process()) {
- if (!processor.isFinished()) {
- queue.add(new ElementAndProcessor<>(processor.getResult(), processor)); //将各个task的队首的PageWithPosition元素推到PriorityQueue队列中
- }
- }
- else if (processor.isBlocked()) {
- return ProcessState.blocked(processor.getBlockedFuture());
- }
- else {
- return ProcessState.yield();
- }
-
- if (processorIterator.hasNext()) {
- processor = requireNonNull(processorIterator.next()); // 初始的时候,会将所有Task元素add到队列中,processorIterator是指所有task的Page数据列表
- continue;
- }
-
- if (queue.isEmpty()) {
- return ProcessState.finished();
- }
-
- ElementAndProcessor<T> elementAndProcessor = queue.poll(); //poll出队首元素,PriorityQueue队首元素总体有序
- processor = elementAndProcessor.getProcessor(); // poll 了哪个task的元素,下次就将task元素add到PriorityQueue中
- return ProcessState.ofResult(elementAndProcessor.getElement());
- }