PostgreSQLの並列クエリ







最近のCPUには多くのコアがあります。 長年、アプリケーションはクエリをデータベースに並行して送信してきました。 これがテーブル内の複数の行のレポートクエリである場合、複数のCPUを使用するとより高速に実行され、PostgreSQLではバージョン9.6以降で可能です。







並列クエリ機能の実装には3年かかりました。クエリ実行のさまざまな段階でコードを書き換える必要がありました。 PostgreSQL 9.6は、コードをさらに改善するためのインフラストラクチャを導入しました。 それ以降のバージョンでは、他の種類のクエリが並行して実行されます。







制限事項





テスト環境



PostgreSQL開発者は、TPC-Hベンチマーククエリの応答時間を短縮しようとしました。 ベンチマークをダウンロードして、 PostgreSQLに適合させます 。 これはTPC-Hベンチマークの非公式な使用法であり、データベースやハードウェアの比較用ではありません。







  1. オフサイトのTPCから TPC-H_Tools_v2.17.3.zip(または新しいバージョン) ダウンロードします。
  2. makefile.suiteの名前をMakefileに変更し、 https//github.com/tvondra/pg_tpchの説明に従って変更します 。 makeコマンドでコードをコンパイルします。
  3. データの生成: ./dbgen -s 10



    は23 GBのデータベースを作成します。 これは、並列クエリと非並列クエリのパフォーマンスの違いを確認するのに十分です。
  4. csv for



    およびsed



    csv for



    tbl



    ファイルをcsv for



    変換します。
  5. pg_tpchリポジトリのクローンを作成し、 csv



    pg_tpch/dss/data



    コピーしpg_tpch/dss/data



  6. qgen



    コマンドでクエリを作成します。
  7. ./tpch.sh



    コマンドを使用して、データベースにデータをアップロードします。


並列順次スキャン



並列読み取りではなく、多くのCPUコアにデータが分散しているため、高速になる可能性があります。 最新のオペレーティングシステムでは、PostgreSQLデータファイルは適切にキャッシュされます。 先読みを使用すると、PGデーモンが要求するよりも多くをストレージから取得できます。 したがって、クエリのパフォーマンスはディスクI / Oによって制限されません。 以下のためにCPUサイクルを消費します。









簡単なselect



クエリを実行してみましょう。







 tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms
      
      





順次スキャンでは、集約せずに行が多すぎるため、要求は単一のCPUコアによって実行されます。







SUM()



を追加すると、2つのワークフローがリクエストの高速化に役立つことがわかります。







 explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms
      
      





並列集約



Parallel Seq Scanノードは、部分集約用の行を生成します。 部分集約ノードは、 SUM()



を使用してこれらの行を切り捨てます。 最後に、各ワークフローのSUMカウンターがGatherノードによって収集されます。







最終結果は、「Finalize Aggregate」ノードによって計算されます。 独自の集計関数を持っている場合は、それらを「並列安全」としてマークしてください。







ワークフローの数



サーバーを再起動せずに、ワークフローの数を増やすことができます。







 alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf();
      
      





これで、Explain出力に4人のワーカーが表示されます。







 tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms
      
      





ここで何が起こっていますか? ワークフローは2倍あり、リクエストは1.6599倍しか速くありませんでした。 計算は興味深いです。 2つの作業プロセスと1人のリーダーがいました。 変更後、4 + 1になりました。







並列処理からの最大加速:5/3 = 1.66(6)回。







どのように機能しますか?



プロセス



リクエストの実行は常に先頭のプロセスから始まります。 リーダーは、非並列処理と並列処理の一部をすべて行います。 同じリクエストを実行する他のプロセスは、ワークフローと呼ばれます。 並列処理では、 動的バックグラウンドワークフローのインフラストラクチャを使用します (バージョン9.4以降)。 PostgreSQLの他の部分はスレッドではなくプロセスを使用するため、3つのワークフローを使用したクエリは、従来の処理よりも4倍高速になる場合があります。







相互作用



ワークフローは、メッセージキュー(共有メモリに基づく)を介してリーダーと通信します。 各プロセスには、エラー用とタプル用の2つのキューがあります。







いくつの作業プロセスが必要ですか?



最小制限は、 max_parallel_workers_per_gather



パラメーターによって設定されます。 次に、クエリ実行max_parallel_workers size



は、 max_parallel_workers size



パラメータによって制限されたプールからワークフローをmax_parallel_workers size



ます。 最後の制限はmax_worker_processes



、つまりバックグラウンドプロセスの総数です。







ワークフローを割り当てることができなかった場合、処理は単一プロセスになります。







クエリプランナーは、テーブルまたはインデックスのサイズに応じてワークフローを短縮できます。 これにはmin_parallel_table_scan_size



およびmin_parallel_index_scan_size



パラメーターがあります。







 set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
      
      





テーブルがmin_parallel_(index|table)_scan_size



3倍になるmin_parallel_(index|table)_scan_size



、Postgresはワークフローを追加します。 ワークプロセスの数はコストベースではありません。 循環依存関係は複雑な実装を複雑にします。 代わりに、スケジューラは単純なルールを使用します。







実際には、これらのルールは本番環境に常に適しているわけではないため、特定のテーブルのワークフローの数を変更できます:ALTER TABLE ... SET( parallel_workers = N



)。







並列処理が使用されないのはなぜですか?



制限の長いリストに加えて、コストチェックもあります。







parallel_setup_cost



短いリクエストの並列処理なしで実行します。 このパラメーターは、メモリーの準備、プロセスの開始、および初期データ交換の時間を推定します。







parallel_tuple_cost



:リーダーとワーカー間の通信は、ワークプロセスからのタプルの数に比例して遅延させることができます。 このパラメーターは、データ交換コストを計算します。







入れ子ループ結合



 PostgreSQL 9.6+      —   . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
      
      





収集は最後の段階で行われるため、ネストされたループの左結合は並列操作です。 Parallel Index Only Scanはバージョン10でのみ登場しました。パラレルシリアルスキャンと同様に機能します。 条件c_custkey = o_custkey



は、クライアント行ごとに1つの注文を読み取ります。 したがって、平行ではありません。







ハッシュ結合-ハッシュ結合



各ワークフローは、PostgreSQL 11までの独自のハッシュテーブルを作成します。また、これらのプロセスが4つ以上ある場合、パフォーマンスは向上しません。 新しいバージョンでは、ハッシュテーブルが一般的です。 各ワークフローは、WORK_MEMを使用してハッシュテーブルを作成できます。







 select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
      
      





TPC-Hからの要求12は、並列ハッシュ接続を示しています。 各ワークフローは、共有ハッシュテーブルの作成に関与します。







結合を結合



マージ結合は本質的に並列ではありません。 これがリクエストの最後の段階であるかどうか心配しないでください-まだ並行して実行できます。







 -- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
      
      





Merge JoinノードはGather Mergeの上にあります。 したがって、マージでは並列処理は使用されません。 ただし、Parallel Index Scanノードは引き続きpart_pkey



セグメントに役立ちます。







セクション接続



PostgreSQL 11では、パーティション分割はデフォルトで無効になっています。非常に高価なスケジューリングが必要です。 同様のパーティショニングを持つテーブルは、セクションごとに結合できます。 したがって、Postgresはより小さなハッシュテーブルを使用します。 各セクション接続は並列にできます。







 tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
      
      





主なことは、セクション内の接続は、これらのセクションが十分に大きい場合にのみ並列であるということです。







並列追加-並列追加



異なるワークフローの異なるブロックの代わりに、 並列追加を使用できます。 これは通常、UNION ALLクエリで発生します。 欠点は、各ワークフローが1つの要求のみを処理するため、並列処理が少なくなることです。







ここには2つのワークフローが実行されていますが、4つが含まれています。







 tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
      
      





最も重要な変数





まとめ



バージョン9.6以降、並列処理により、多くの行またはインデックスをスキャンする複雑なクエリのパフォーマンスが大幅に向上します。 PostgreSQL 10では、デフォルトで並列処理が有効になっています。 OLTPワークロードが大きいサーバーでは忘れずに無効にしてください。 順次スキャンまたはインデックススキャンは多くのリソースを消費します。 データセット全体をレポートしていない場合は、欠落しているインデックスを追加するか、正しいパーティションを使用するだけで、クエリを効率化できます。







参照資料






All Articles