• Presto 中orderby 算子的实现过程


    一. 前言

           本文主要介绍在Presto中orderby 算子是如何通过代码实现的。Presto中orderby会经过局部排序+全部Merge来实现列的所有数据排序,如下所示:

     

    二. orderby算子中PartialSort功能实现

            PartialSort的流程大概如下所示:

     Presto的OrderByOperator算子在addPage的时候,首先会对位置进行编码,如下所示:

    1. public void addPage(Page page)
    2. {
    3. ....
    4. for (int position = 0; position < page.getPositionCount(); position++) {
    5. long sliceAddress = encodeSyntheticAddress(pageIndex, position);
    6. valueAddresses.add(sliceAddress);
    7. }
    8. ..
    9. }

          当所有Page都add完毕后,上游的Operator会调用OrderByOperator的finish方法,OrderByOperator的finish方法将对valueAddresses的所有数据进行排序和重构一个排序后的Page,代码流程如下所示:

    1. finish()
    2. pageIndex.sort
    3. createPagesIndexComparator.sort
    4. quickSort(PagesIndex pagesIndex, int from, int to)
    5. comparator.compareTo
    6. // 在compareTo当中,fromto其实是位置编码数组valueAddresses上的下标
    7. // Presto读取位置编码数据上的编码,然后还原成Block的地址,读取Block的数据进行对比
    8. // 然后比较结果来决定是否交换位置编码上的数据
    9. // 也就是说,在Presto中,数据排序交换都是基于位置编码进行的,并不是Block内部的数据的排序
    10. pagesIndex.swap // valueAddresses位置上数据的交换
    11. pageIndex.getSortedPages
    12. buildPage
    13. // 重构排序后的Page的时候,也是根据位置编码的顺序进行的
    14. long pageAddress = valueAddresses.getLong(position);
    15. Block block = this.channels[outputChannel].get(blockIndex);

     三. Orderby 算子中Merge功能实现

            Presto中,所有Task经过PartialSort排序后,会变成局部有序,但是总体还是无序的,因此生成最终结果前,还需要经过MergeOperator将局部有序归并为总体有序。MergeOperator是通过一个类似归并算法将各个Task的数据进行总体排序的,其大概流程如下所示:

         step1:首先需要将各个Task 传上来的所有Pages,按照Page + positition的结构映射成PageWithPosition,其中position指的是该Page的第几行数据。如下所示,2个Task的4个Page,将按照每一行一个PageWithPosition映射成12个PageWithPosition的列表(我们依次命名为Task0Page0Pos0,Task0Page0Pos1,Task0Page0Pos2....):

       step2:  presto 会根据order by的字段动态编译出一个比较器,这块的实现是在orderingCompiler.compilePageWithPositionComparator(types, sortChannels, sortOrder)中实现的,里边的实现原理和Presto Aggregation Function算子代码生成器代码走读_王飞活的博客-CSDN博客类似,此处非本文重点,不再重复描述。

        step3:构造一个PriorityQueue,优先级队列的比较器为step2动态生成的比较器。Presto会首先将所有Task的队首元素add到PriorityQueue中,然后从PriorityQueue中poll出队首元素,在将poll出来元素所在的task的下一个元素add到PriorityQueue中,直到所有PageWithPosition都处理完毕。可以知道,poll元素是总体有序的,因此只有按照poll出来的元素顺序,重新组成新的Page,新的Page即为总体有序的Page。

           以上述的案例举例,PriorityQueue中add和poll元素的顺序依次是:

    add Task0Page0Pos0 -> add Task1Page0Pos0 -> poll Task0Page0Pos0 -> add Task0Page0Pos1 -> poll Task1Page0Pos0 -> add Task1Page0Pos1 -> poll Task0Page0Pos1 -> add Task0Page0Pos2 -> poll Task0Page0Pos2 -> add Task0Page1Pos0 -> poll Task0Page1Pos0 -> add Task0Page1Pos1 > poll  Task1Page0Pos1 -> add Task1Page0Pos2 -> poll Task0Page1Pos1 -> add Task0Page1Pos2 -> Task1Page0Pos2 -> add Task1Page1Pos0 -> poll Task0Page1Pos2 -> add Task1Page1Pos1 -> poll Task1Page1Pos0 -> add Task1Page1Pos2 -> poll Task1Page1Pos1 -> poll Task1Page1Pos2

           可以看出,poll该列的是总体排序的(1 -> 1 -> 4 ->6 -> 7 ->8 ->10 .....),因此实现了总体排序的功能,排序的代码重点部分为:

    1. while (true) {
    2. if (processor.process()) {
    3. if (!processor.isFinished()) {
    4. queue.add(new ElementAndProcessor<>(processor.getResult(), processor)); //将各个task的队首的PageWithPosition元素推到PriorityQueue队列中
    5. }
    6. }
    7. else if (processor.isBlocked()) {
    8. return ProcessState.blocked(processor.getBlockedFuture());
    9. }
    10. else {
    11. return ProcessState.yield();
    12. }
    13. if (processorIterator.hasNext()) {
    14. processor = requireNonNull(processorIterator.next()); // 初始的时候,会将所有Task元素add到队列中,processorIterator是指所有task的Page数据列表
    15. continue;
    16. }
    17. if (queue.isEmpty()) {
    18. return ProcessState.finished();
    19. }
    20. ElementAndProcessor<T> elementAndProcessor = queue.poll(); //poll出队首元素,PriorityQueue队首元素总体有序
    21. processor = elementAndProcessor.getProcessor(); // poll 了哪个task的元素,下次就将task元素add到PriorityQueue中
    22. return ProcessState.ofResult(elementAndProcessor.getElement());
    23. }

  • 相关阅读:
    【ESP32_8266_WiFi (十二)】ESP8266客户端HTTP API应用实例
    【爬虫】requests 结合 BeautifulSoup抓取网页数据
    Vue3的12种组件通信方式(附代码)
    【c#】反射
    Navicat使自增主键归1
    C语言的静态库和的动态库
    强推这款键盘利器(Keychron),这次我彻底入坑了
    工商银行新一代银行移动开发平台建设研究
    15【react-Hook (下)】
    SpringBoot war包部署到tomcat上无法访问的异常处理
  • 原文地址:https://blog.csdn.net/wangfeihuo/article/details/127815247