• PgSQL-并行查询系列-介绍[译]


    PgSQL-并行查询系列-介绍

    现代CPU模型拥有大量的CPU核心。多年来,数据库应用程序都是并发向数据库发送查询的。查询处理多个表的行时,若可以使用多核,则可以客观地提升性能。PgSQL 9.6引入了并行查询的新特性,开启并行查询后可以大幅提升性能。

    1、局限性

    1)若所有CPU核心已经饱和,则不要启动并行查询。并行执行会从其他查询中窃取CPU时间,并增加响应时间

    2)进一步需要注意:并行处理会显著增加内存使用(需要注意work_mem的值)。因为,每个hash join或者排序操作都会使用work_mem大小的内存。

    3)低延迟的OLTP查询并不能通过并行显著提升性能。特别是仅返回1行的查询,若启用并行,性能会变得特烂。

    4)并行执行仅支持没有锁谓词的SELECT查询

    5)不支持cursor和会挂起的查询

    6)windowed 函数和ordered-set聚合函数都不是并行的

    7)对于负载已达IO瓶颈的,并没有啥好处

    8)没有并行排序算法。然而,排序查询在某些方面仍然可以并行

    9)将CTE(WITH...)替换为sub-select以支持并行执行

    10)FDW还不支持并行(后面版本可以,注意哪个版本支持)

    11)full outer join不支持

    12)客户端设置了max_rows,禁止并行执行

    13)如果查询中使用了没有标记为PARALLEL SAFE的函数,那他就是单线程执行

    14)SERIALIZABLE事务隔离级别禁用并行执行

    2、并行顺序扫描

    并行顺序扫描很快,原因可能不是并行读,而是将数据访问分散到多个CPU上。现代操作系统给PgSQL的数据文件提供了很好的缓冲机制。预取允许从存储中获取一个块,而不仅是PgSQL请求的块。因此查询性能限制往往不在IO上,它消耗CPU周期:从表数据页中逐行读取;比较行值和WHERE条件

    我们执行一个简单查询:

    1. tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
    2. QUERY PLAN
    3. --------------------------------------------------------------------------------------------------------------------------
    4. Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1)
    5. Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
    6. Rows Removed by Filter: 1146337
    7. Planning Time: 0.203 ms
    8. Execution Time: 19035.100 ms

    一个顺序扫描,没有聚合,需要产生大量行。因此该查询被一个CPU核心执行。添加聚合SUM()后,可以清晰的看到有2个进程帮助查询:

    1. explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate &lt;= date '1998-12-01' - interval '105' day;
    2. QUERY PLAN
    3. ----------------------------------------------------------------------------------------------------------------------------------------------------
    4. Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1)
    5. -&gt; Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1)
    6. Workers Planned: 2
    7. Workers Launched: 2
    8. -&gt; Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3)
    9. -&gt; Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3)
    10. Filter: (l_shipdate &lt;= '1998-08-18 00:00:00'::timestamp without time zone)
    11. Rows Removed by Filter: 382112
    12. Planning Time: 0.241 ms
    13. Execution Time: 8555.131 ms

    性能提升2.2倍。

    3、并行聚合

    “Parallel Seq Scan”节点为partial aggregation提供行。“Partial Aggregate”节点先对SUM()进行一次操作。最后“Gather”节点汇总每个进程的SUM值。“Finalize Aggregate”节点进行最后计算。如果你使用了聚合函数,不要忘记标记他们为“parallel safe”。

    4、进程个数

    可以不重启服务,增加并行进程个数:

    1. alter system set max_parallel_workers_per_gather=4;
    2. select * from pg_reload_conf();
    3. Now, there are 4 workers in explain output:
    4. tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate &lt;= date '1998-12-01' - interval '105' day;
    5. QUERY PLAN
    6. ----------------------------------------------------------------------------------------------------------------------------------------------------
    7. Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1)
    8. -&gt; Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1)
    9. Workers Planned: 4
    10. Workers Launched: 4
    11. -&gt; Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5)
    12. -&gt; Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5)
    13. Filter: (l_shipdate &lt;= '1998-08-18 00:00:00'::timestamp without time zone)
    14. Rows Removed by Filter: 229267
    15. Planning Time: 0.218 ms
    16. Execution Time: 5153.967 ms

    我们将并发进程由2改成了4,但是查询仅快1.6599倍。实际上,我们有2个进程+一个leader,配置改好成为4+1。并行最大提升可以:5/3=1.66倍。

    5、如何工作?

    查询执行总是从“leader”进程开始。Leader进程执行所有非并行动作。其他进程执行相同查询,称为“worker”进程。并行利用Dynamic Backgroud workers基础架构(9.4引入)执行。因此创建3个工作进程的查询可能比传统执行快4倍。

    Worker进程使用消息队列(基于共享内存)和leader进行通信。每个进程有2个队列:一个为errors,另一个是tuples。

    5、进程使用个数

    1)max_parallel_workers_per_gather是workers进程数的最小限制

    2)查询执行使用的workers限制为max_parallel_workes

    3)最上层的限制是max_worker_processes:后台进程的总数

    分配进程失败,会导致使用单进程执行。查询规划器会根据表或索引大小来增加worker个数。min_parallel_table_scan_size和min_parallel_index_scan_size控制该行为。

    1. set min_parallel_table_scan_size='8MB'
    2. 8MB table =&gt; 1 worker
    3. 24MB table =&gt; 2 workers
    4. 72MB table =&gt; 3 workers
    5. x =&gt; log(x / min_parallel_table_scan_size) / log(3) + 1 worker

    表比min_parallel_(index|table)_scan_size值每大3倍,PG增加一个worker进程。Workers进程个数不是基于成本的。循环依赖使得复杂的实现变得困难。相反,规划者使用简单的规则。

    可以通过ALTER TABLE … SET (parallel_workers = N)来对某个表指定并行进程数。

    6、为什么不使用并行

    除了并行限制外,PG还会检查代价:

    parallel_setup_cost:避免短查询的并行执行。模拟用于内存设置、流程启动和初始通信的时间

    parallel_tuple_cost:leader和worker之间通信可能花费很长时间。时间和worker发送的记录数成正比。参数对通信成本进行建模。

    7、Nested Loop Join

    PgSQL9.6+可以以并行形式执行“Nested loop”。

    1. explain (costs off) select c_custkey, count(o_orderkey)
    2. from customer left outer join orders on
    3. c_custkey = o_custkey and o_comment not like '%special%deposits%'
    4. group by c_custkey;
    5. QUERY PLAN
    6. --------------------------------------------------------------------------------------
    7. Finalize GroupAggregate
    8. Group Key: customer.c_custkey
    9. -&gt; Gather Merge
    10. Workers Planned: 4
    11. -&gt; Partial GroupAggregate
    12. Group Key: customer.c_custkey
    13. -&gt; Nested Loop Left Join
    14. -&gt; Parallel Index Only Scan using customer_pkey on customer
    15. -&gt; Index Scan using idx_orders_custkey on orders
    16. Index Cond: (customer.c_custkey = o_custkey)
    17. Filter: ((o_comment)::text !~~ '%special%deposits%'::text)

    Gather发生在最后阶段,因此“Nested Loop Left Join”是并行操作。“Parallel Index Only Scan”在版本10才可以使用,和并行顺序扫描类似。c_custkey = o_custkey条件读取每个customer行的order列,因此不是并行。

    8、Hash Join

    PgSQL11中每个worker构建自己的hash table。因此,4+ workers不能提升性能。新的实现方式:使用一个共享hash table。每个worker可以利用WORK_MEM来构建hash table>

    1. select
    2. l_shipmode,
    3. sum(case
    4. when o_orderpriority = '1-URGENT'
    5. or o_orderpriority = '2-HIGH'
    6. then 1
    7. else 0
    8. end) as high_line_count,
    9. sum(case
    10. when o_orderpriority &lt;&gt; '1-URGENT'
    11. and o_orderpriority &lt;&gt; '2-HIGH'
    12. then 1
    13. else 0
    14. end) as low_line_count
    15. from
    16. orders,
    17. lineitem
    18. where
    19. o_orderkey = l_orderkey
    20. and l_shipmode in ('MAIL', 'AIR')
    21. and l_commitdate &lt; l_receiptdate
    22. and l_shipdate &lt; l_commitdate
    23. and l_receiptdate &gt;= date '1996-01-01'
    24. and l_receiptdate &lt; date '1996-01-01' + interval '1' year
    25. group by
    26. l_shipmode
    27. order by
    28. l_shipmode
    29. LIMIT 1;
    30. QUERY PLAN
    31. -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    32. Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1)
    33. -&gt; Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1)
    34. Group Key: lineitem.l_shipmode
    35. -&gt; Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1)
    36. Workers Planned: 4
    37. Workers Launched: 4
    38. -&gt; Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5)
    39. Group Key: lineitem.l_shipmode
    40. -&gt; Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5)
    41. Sort Key: lineitem.l_shipmode
    42. Sort Method: external merge Disk: 2304kB
    43. Worker 0: Sort Method: external merge Disk: 2064kB
    44. Worker 1: Sort Method: external merge Disk: 2384kB
    45. Worker 2: Sort Method: external merge Disk: 2264kB
    46. Worker 3: Sort Method: external merge Disk: 2336kB
    47. -&gt; Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5)
    48. Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
    49. -&gt; Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5)
    50. Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate &lt; l_receiptdate) AND (l_shipdate &lt; l_commitdate) AND (l_receiptdate &gt;= '1996-01-01'::date) AND (l_receiptdate &lt; '1997-01-01 00:00:00'::timestamp without time zone))
    51. Rows Removed by Filter: 11934691
    52. -&gt; Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5)
    53. Buckets: 65536 Batches: 256 Memory Usage: 3840kB
    54. -&gt; Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5)
    55. Planning Time: 0.977 ms
    56. Execution Time: 7923.770 ms

    TPC-H中的SQL12是并行hash join的一个很好的哪里,每个进程都帮助构建共享hash table。

    9、Merge Join

    由于merge join的特性,使得不能并行。如果merge join是查询执行的最后阶段,那么不用担心,仍可以使用并行。

    1. -- Query 2 from TPC-H
    2. explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment
    3. from part, supplier, partsupp, nation, region
    4. where
    5. p_partkey = ps_partkey
    6. and s_suppkey = ps_suppkey
    7. and p_size = 36
    8. and p_type like '%BRASS'
    9. and s_nationkey = n_nationkey
    10. and n_regionkey = r_regionkey
    11. and r_name = 'AMERICA'
    12. and ps_supplycost = (
    13. select
    14. min(ps_supplycost)
    15. from partsupp, supplier, nation, region
    16. where
    17. p_partkey = ps_partkey
    18. and s_suppkey = ps_suppkey
    19. and s_nationkey = n_nationkey
    20. and n_regionkey = r_regionkey
    21. and r_name = 'AMERICA'
    22. )
    23. order by s_acctbal desc, n_name, s_name, p_partkey
    24. LIMIT 100;
    25. QUERY PLAN
    26. ----------------------------------------------------------------------------------------------------------
    27. Limit
    28. -> Sort
    29. Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey
    30. -> Merge Join
    31. Merge Cond: (part.p_partkey = partsupp.ps_partkey)
    32. Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
    33. -> Gather Merge
    34. Workers Planned: 4
    35. -> Parallel Index Scan using part_pkey on part
    36. Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36))
    37. -> Materialize
    38. -> Sort
    39. Sort Key: partsupp.ps_partkey
    40. -> Nested Loop
    41. -> Nested Loop
    42. Join Filter: (nation.n_regionkey = region.r_regionkey)
    43. -> Seq Scan on region
    44. Filter: (r_name = 'AMERICA'::bpchar)
    45. -> Hash Join
    46. Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
    47. -> Seq Scan on supplier
    48. -> Hash
    49. -> Seq Scan on nation
    50. -> Index Scan using idx_partsupp_suppkey on partsupp
    51. Index Cond: (ps_suppkey = supplier.s_suppkey)
    52. SubPlan 1
    53. -> Aggregate
    54. -> Nested Loop
    55. Join Filter: (nation_1.n_regionkey = region_1.r_regionkey)
    56. -> Seq Scan on region region_1
    57. Filter: (r_name = 'AMERICA'::bpchar)
    58. -> Nested Loop
    59. -> Nested Loop
    60. -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1
    61. Index Cond: (part.p_partkey = ps_partkey)
    62. -> Index Scan using supplier_pkey on supplier supplier_1
    63. Index Cond: (s_suppkey = partsupp_1.ps_suppkey)
    64. -> Index Scan using nation_pkey on nation nation_1
    65. Index Cond: (n_nationkey = supplier_1.s_nationkey)

    “Merge Join”节点在“Gather Merge”上。因此merge不使用并行。但是“Parallel Index Scan”仍旧有助于part_pkey。

    10、Partition-wise join

    PgSQL11默认禁止partition-wise join特性。它有一个很高的规划代价。分区表可以一个分区一个分区的进行join。允许使用更小的hash table。每个per-partition join操作可以并行:

    1. tpch=# set enable_partitionwise_join=t;
    2. tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    3. where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
    4. QUERY PLAN
    5. ---------------------------------------------------
    6. Append
    7. -&gt; Hash Join
    8. Hash Cond: (t2.b = t1.a)
    9. -&gt; Seq Scan on prt2_p1 t2
    10. Filter: ((b &gt;= 0) AND (b &lt;= 10000))
    11. -&gt; Hash
    12. -&gt; Seq Scan on prt1_p1 t1
    13. Filter: (b = 0)
    14. -&gt; Hash Join
    15. Hash Cond: (t2_1.b = t1_1.a)
    16. -&gt; Seq Scan on prt2_p2 t2_1
    17. Filter: ((b &gt;= 0) AND (b &lt;= 10000))
    18. -&gt; Hash
    19. -&gt; Seq Scan on prt1_p2 t1_1
    20. Filter: (b = 0)
    21. tpch=# set parallel_setup_cost = 1;
    22. tpch=# set parallel_tuple_cost = 0.01;
    23. tpch=# explain (costs off) select * from prt1 t1, prt2 t2
    24. where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
    25. QUERY PLAN
    26. -----------------------------------------------------------
    27. Gather
    28. Workers Planned: 4
    29. -&gt; Parallel Append
    30. -&gt; Parallel Hash Join
    31. Hash Cond: (t2_1.b = t1_1.a)
    32. -&gt; Parallel Seq Scan on prt2_p2 t2_1
    33. Filter: ((b &gt;= 0) AND (b &lt;= 10000))
    34. -&gt; Parallel Hash
    35. -&gt; Parallel Seq Scan on prt1_p2 t1_1
    36. Filter: (b = 0)
    37. -&gt; Parallel Hash Join
    38. Hash Cond: (t2.b = t1.a)
    39. -&gt; Parallel Seq Scan on prt2_p1 t2
    40. Filter: ((b &gt;= 0) AND (b &lt;= 10000))
    41. -&gt; Parallel Hash
    42. -&gt; Parallel Seq Scan on prt1_p1 t1
    43. Filter: (b = 0)

    分区连接只有在分区足够大的情况下才能使用并行执行

    11、Parallel Append

    Parallel Append通常在UNION ALL中。缺点:较小的并行度,因为每个worker进程最终都为一个查询服务。即使启用了4个进程,也会仍旧发起2个:

    1. tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day;
    2. QUERY PLAN
    3. ------------------------------------------------------------------------------------------------
    4. Gather
    5. Workers Planned: 2
    6. -&gt; Parallel Append
    7. -&gt; Aggregate
    8. -&gt; Seq Scan on lineitem
    9. Filter: (l_shipdate &lt;= '2000-08-18 00:00:00'::timestamp without time zone)
    10. -&gt; Aggregate
    11. -&gt; Seq Scan on lineitem lineitem_1
    12. Filter: (l_shipdate &lt;= '1998-08-18 00:00:00'::timestamp without time zone)

    12、更重要的变量

    WORKER_MEM:限制每个进程的使用内存。每个查询:work_mem*processes*joins-->会导致内存使用很大

    max_parallel_workers_per_gather:执行器使用多少进程并发执行该节点

    max_worker_processes:根据服务器上CPU核数调整进程数

    max_parallel_workers:和并发进程数一样

    13、总结

    从9.6并行查询执行开始,可以显著提高扫描许多行或索引记录的复杂查询的性能。不要忘记在高oltp工作负载的服务器上禁止并行执行。顺序扫描或索引扫描仍然耗费大量资源。如果您没有针对整个数据集运行报表,那么只需添加缺失的索引或使用适当的分区就可以提高查询性能。

    原文

    https://www.percona.com/blog/parallel-queries-in-postgresql/

  • 相关阅读:
    An Efficient Memory-Augmented Transformer for Knowledge-Intensive NLP Tasks
    Android使用ANativeWindow更新surfaceView内容最简Demo
    STM32 I2C总线锁死原因及解决方法
    命令行获取chrome版本的多个方法
    夜天之书 #68 开源码力圆桌文字稿
    《最新出炉》系列入门篇-Python+Playwright自动化测试-43-分页测试
    从零开始学Graph Database:什么是图
    sCrypt 现已支持各类主流前端框架
    Netty RPC 实现
    计算属性与侦听器
  • 原文地址:https://blog.csdn.net/yanzongshuai/article/details/132658505