PostgreSQLのデータのパーティション分割とライブスナップショット

セクショニングのトピックはすでにに取り上げましたが 、ここに戻って、大量のデータの分析処理の必要性に関連して生じたこの問題を解決した私の経験についてお話したいと思います。 セクショニングに加えて、ソースデータが変更されたときに自動的に更新される、集約されたクエリの「スナップショット」の非常に簡素化された実装を検討します。



開発したシステムの主な要件の1つはフリーソフトウェアの使用でした。そのため、PostgreSQLを選択しました。 プロジェクトの作業を開始したとき、私はPostgreSQLを非常に表面的には知っていましたが、Oracle Databaseの機能にはかなり精通していました。 分析処理に関するものであったため、 PartitioningMaterialized ViewsなどのOracleオプションの類似物が欲しかったです。 PostgreSQLの機能に精通した後、この機能を何らかの方法で手動で記述する必要があることが明らかになりました。



もちろん、 クエリの書き換えを提供するマテリアライズドビューの本格的な実装については話していませんでした。 私のニーズに合わせて、自動更新された集約された単一テーブルのサンプルを作成する機能で十分でした(テーブルの結合のサポートは近い将来追加される可能性があります)。 パーティション化のために、トリガーによって制御されるデータ挿入を使用て、 継承されたテーブルを使用して繰り返し説明されるアプローチを使用することを計画しました。 パーティション分割を制御するためにルールを使用することを考えていましたが、私の場合は単一のレコードによるデータの挿入が優先されたため、それを拒否しました。



もちろん、メタデータを保存するためのテーブルから始めました。



ps_tables.sql
create sequence ps_table_seq; create table ps_table ( id bigint default nextval('ps_table_seq') not null, name varchar(50) not null unique, primary key(id) ); create sequence ps_column_seq; create table ps_column ( id bigint default nextval('ps_column_seq') not null, table_id bigint not null references ps_table(id), name varchar(50) not null, parent_name varchar(50), type_name varchar(8) not null check (type_name in ('date', 'key', 'nullable', 'sum', 'min', 'max', 'cnt')), unique (table_id, name), primary key(id) ); create table ps_range_partition ( table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), start_value date not null, end_value date not null, primary key(table_id, start_value) ); create table ps_snapshot ( snapshot_id bigint not null references ps_table(id), table_id bigint not null references ps_table(id), type_name varchar(10) not null check (type_name in ('day', 'week', 'month', 'year')), primary key(snapshot_id) );
      
      







ここではすべてが明らかです。 言及する価値があるのは、列タイプのみです。

種類

説明

日付

データのパーティション化および集約に使用されるカレンダー日付を含む列(PostgreSQLタイプの日付とタイムスタンプがサポートされています)



データ集約のgroup by句で使用されるキー(すべてのPostgreSQL整数型がサポートされています)

nullable

データ集約で使用されるキー。nullを含む可能性があります

合計

値の合計



最小値

最大

最大値

cnt

NULL以外の値のカウント



ソリューション全体の基本は、ソースデータを含むテーブルのトリガー関数の再構築を実行する関数です。



ps_trigger_regenerate(bigint)
 create or replace function ps_trigger_regenerate(in p_table bigint) returns void as $$ declare l_sql text; l_table_name varchar(50); l_date_column varchar(50); l_flag boolean; tabs record; columns record; begin select name into l_table_name from ps_table where id = p_table; l_sql := 'create or replace function ps_' || l_table_name || '_insert_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and not type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + coalesce(NEW.' || columns.parent_name || ', 0) '; end if; if columns.type_name = 'min' then l_sql := l_sql || columns.name || ' = least(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'max' then l_sql := l_sql || columns.name || ' = greatest(coalesce(' || columns.name || ', NEW.' || columns.parent_name || '), coalesce(NEW.' || columns.parent_name || ', ' || columns.name || ')) '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; ' || 'if not FOUND then ' || 'insert into ' || tabs.table_name || '('; l_flag = FALSE; for columns in select name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') values ('; l_flag = FALSE; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ')'; elsif columns.type_name = 'cnt' then l_sql := l_sql || 'case when NEW.' || columns.parent_name || ' is null then 0 else 1 end'; elsif columns.type_name in ('nullable', 'sum') then l_sql := l_sql || 'coalesce(NEW.' || columns.parent_name || ', 0)'; else l_sql := l_sql || 'NEW.' || columns.parent_name; end if; end loop; l_sql := l_sql || '); ' || 'end if; '; end loop; select name into l_date_column from ps_column where table_id = p_table and type_name = 'date'; for tabs in select to_char(start_value, 'YYYYMMDD') as start_value, to_char(end_value, 'YYYYMMDD') as end_value, type_name from ps_range_partition where table_id = p_table order by start_value desc loop l_sql := l_sql || 'if NEW.' || l_date_column || ' >= to_date(''' || tabs.start_value || ''', ''YYYYMMDD'') and NEW.' || l_date_column || ' < to_date(''' || tabs.end_value || ''', ''YYYYMMDD'') then ' || 'insert into ' || l_table_name || '_' || tabs.start_value || ' values (NEW.*); ' || 'return null; ' || 'end if; '; end loop; l_sql := l_sql || 'return NEW; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_raise_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin ' || 'raise EXCEPTION ''Can''''t support % on MIN or MAX aggregate'', TG_OP;' || 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_delete_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; l_sql := 'create or replace function ps_' || l_table_name || '_update_trigger() returns trigger ' || 'as $'|| '$ ' || 'begin '; for tabs in select a.snapshot_id as id, b.name as table_name, a.type_name as snapshot_type from ps_snapshot a, ps_table b where a.table_id = p_table and b.id = a.snapshot_id loop l_flag = FALSE; l_sql := l_sql || 'update ' || tabs.table_name || ' set '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('sum', 'cnt') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'sum' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - OLD.' || columns.parent_name || ' + NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'cnt' then l_sql := l_sql || columns.name || ' = ' || columns.name || ' - case when OLD.' || columns.parent_name || ' is null then 0 else 1 end ' || ' + case when NEW.' || columns.parent_name || ' is null then 0 else 1 end '; end if; end loop; l_flag = FALSE; l_sql := l_sql || 'where '; for columns in select name, parent_name, type_name from ps_column where table_id = tabs.id and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || 'and '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' = date_trunc(lower(''' || tabs.snapshot_type || '''), NEW.' || columns.parent_name || ') '; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name || ' = NEW.' || columns.parent_name || ' '; end if; if columns.type_name = 'nullable' then l_sql := l_sql || columns.name || ' = coalesce(NEW.' || columns.parent_name || ', 0)'; end if; end loop; l_sql := l_sql || '; '; end loop; l_sql := l_sql || 'return null; '|| 'end; '|| '$'||'$ language plpgsql'; execute l_sql; end; $$ language plpgsql;
      
      







その素晴らしい外観にもかかわらず、この機能は非常にシンプルです。 そのタスクは、トリガーの構築に使用される4つの関数を(利用可能なメタデータに基づいて)形成することです:





ここでは、TABLEの代わりに、ソースデータを含むテーブルの名前が置き換えられます。 ps_TABLE_insert_trigger()の典型的な定義は次のようになります。



 create or replace function ps_data_insert_trigger() returns trigger as $$ begin update data_month set sum_field = sum_field + NEW.sum_field , min_field = least(min_field, NEW.min_field) where date_field = date_trunc('month', NEW.date_field) and key_field = NEW.key_field; if not FOUND then insert into data_month(date_field, key_field, sum_field, min_field) values (date_trunc('month', NEW.date_field), NEW.key_field, NEW.sum_field, NEW.min_field); end if; if NEW.date_field >= to_date('20130101', 'YYYYMMDD') and NEW.date_field < to_date('20130201', 'YYYYMMDD') then insert into data_20130101 values (NEW.*); return null; end if; return NEW; end; $$ language plpgsql;
      
      





実際、null値は特別な方法で処理されるため、関数はもう少し複雑に見えます。 しかし、例として、上記の例は非常に適切です。 このコードのロジックは明らかです。







最後の段落は、適切なセクションが見つからない場合、データがメインテーブルに追加されるという事実につながります。 実際には、これは非常に便利です。 事前にセクションを作成しなかったり、日付が正しくないデータを受信した場合でも、データの挿入は成功します。 その後、クエリを実行してメインテーブルの内容を分析できます。



 select * from only data
      
      





その後、不足しているセクションを作成します(以下に示すように、データはメインテーブルから作成されたセクションに自動的に転送されます)。 このような場合、セクションに分類されなかったレコードの数は通常大きくなく、データ転送のコストは無視できます。



今ではハーネスを作ることが残っています。 新しいセクションを作成する関数から始めましょう:



ps_add_range_partition(varchar、varchar、varchar、date)
 create or replace function ps_add_range_partition(in p_table varchar, in p_column varchar, in p_type varchar, in p_start date) returns void as $$ declare l_sql text; l_end date; l_start_str varchar(10); l_end_str varchar(10); l_table bigint; l_flag boolean; columns record; begin perform 1 from ps_table a, ps_column b where a.id = b.table_id and lower(a.name) = lower(p_table) and b.type_name = 'date' and lower(b.name) <> lower(p_column); if FOUND then raise EXCEPTION 'Conflict DATE columns'; end if; l_end := p_start + ('1 ' || p_type)::INTERVAL; perform 1 from ps_table a, ps_range_partition b where a.id = b.table_id and lower(a.name) = lower(p_table) and (( p_start >= b.start_value and p_start < b.end_value ) or ( b.start_value >= p_start and b.start_value < l_end )); if FOUND then raise EXCEPTION 'Range intervals intersects'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); perform 1 from ps_column where table_id = l_table and type_name = 'date' and lower(name) = lower(p_column); if not FOUND then insert into ps_column(table_id, name, type_name) values (l_table, lower(p_column), 'date'); end if; insert into ps_range_partition(table_id, type_name, start_value, end_value) values (l_table, p_type, p_start, l_end); l_start_str = to_char(p_start, 'YYYYMMDD'); l_end_str = to_char(l_end, 'YYYYMMDD'); l_sql := 'create table ' || p_table || '_' || l_start_str || '(' || 'check (' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')), ' || 'primary key ('; l_flag := FALSE; for columns in select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ')) inherits (' || p_table || ')'; execute l_sql; l_sql := 'create index ' || p_table || '_' || l_start_str || '_date on ' || p_table || '_' || l_start_str || '(' || p_column || ')'; execute l_sql; perform ps_trigger_regenerate(l_table); execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'insert into ' || p_table || '_' || l_start_str || ' ' || 'select * from ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'delete from only ' || p_table || ' where ' || p_column || ' >= to_date(''' || l_start_str || ''', ''YYYYMMDD'') and ' || p_column || ' < to_date(''' || l_end_str || ''', ''YYYYMMDD'')'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_update ' || 'after update on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_' || l_start_str || '_after_delete ' || 'after delete on ' || p_table || '_' || l_start_str || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end; $$ language plpgsql;
      
      







ここでは、入力データの正確性を確認した後、必要なメタデータを追加し、その後、継承されたテーブルを作成します。 次に、ps_trigger_regenerateを呼び出してトリガーの関数を再作成します。その後、動的クエリを使用してセクショニング条件に該当するデータを作成済みセクションに転送し、トリガー自体を再作成します。



2点で問題が発生しました。



  1. 開始日に月、日、または年を追加することで少し苦労しました(入力パラメーターp_typeに応じて:



     l_end := p_start + ('1 ' || p_type)::INTERVAL;
          
          





  2. 主キーは継承されないため、 システムカタログへの要求を作成して、ソーステーブルの主キーの列のリストを取得する必要がありました(メタデータで主キーを記述することも不適切であることがわかりました)。



      select f.name as name from ( select ps_array_to_set(a.conkey) as nn from pg_constraint a, pg_class b where b.oid = a.conrelid and a.contype = 'p' and b.relname = p_table ) c, ( select d.attname as name, d.attnum as nn from pg_attribute d, pg_class e where e.oid = d.attrelid and e.relname = p_table ) f where f.nn = c.nn order by f.nn
          
          







また、インデックスを作成する前に、(作成されたパーティションの)パーティションキーで、プライマリキーの先頭列であるかどうかを確認する価値があることに注意してください(重複インデックスを作成しないように)。



セクションを削除するための機能ははるかに単純で、特別なコメントは必要ありません。



ps_del_range_partition(varchar、date)
 create or replace function ps_del_range_partition(in p_table varchar, in p_start date) returns void as $$ declare l_sql text; l_start_str varchar(10); l_table bigint; begin select id into l_table from ps_table where lower(name) = lower(p_table); l_start_str = to_char(p_start, 'YYYYMMDD'); delete from ps_range_partition where table_id = l_table and start_value = p_start; perform ps_trigger_regenerate(l_table); l_sql := 'insert into ' || p_table || ' ' || 'select * from ' || p_table || '_' || l_start_str; execute l_sql; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; execute 'drop function ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function ps_' || p_table || '_update_trigger() cascade'; execute 'drop function ps_' || p_table || '_delete_trigger() cascade'; delete from ps_column where table_id = l_table; delete from ps_table where id = l_table; end if; perform 1 from ps_range_partition where table_id = l_table; if not FOUND then delete from ps_column where table_id = l_table and type_name = 'date'; end if; execute 'drop table ' || p_table || '_' || l_start_str; end; $$ language plpgsql;
      
      







セクションが削除されると、データはもちろん失われませんが、メインテーブルに転送されます(トリガーは事前に削除されます。これは、挿入ステートメントで唯一のキーワードが機能しないためです。



「ライブ」データスナップショットを管理するための機能を追加する必要があります。



ps_add_snapshot_column(varchar、varchar、varchar、varchar)
 create or replace function ps_add_snapshot_column(in p_snapshot varchar, in p_column varchar, in p_parent varchar, in p_type varchar) returns void as $$ declare l_table bigint; begin perform 1 from ps_table where lower(name) = lower(p_snapshot); if not FOUND then insert into ps_table(name) values (lower(p_snapshot)); end if; select id into l_table from ps_table where lower(name) = lower(p_snapshot); insert into ps_column(table_id, name, parent_name, type_name) values (l_table, lower(p_column), lower(p_parent), p_type); end; $$ language plpgsql;
      
      







ps_add_snapshot(varchar、varchar、varchar)
 create or replace function ps_add_snapshot(in p_table varchar, in p_snapshot varchar, in p_type varchar) returns void as $$ declare l_sql text; l_table bigint; l_snapshot bigint; l_flag boolean; columns record; begin select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); perform 1 from ps_column where table_id = l_snapshot and type_name in ('date', 'key'); if not FOUND then raise EXCEPTION 'Key columns not found'; end if; perform 1 from ps_column where table_id = l_snapshot and not type_name in ('date', 'key', 'nullable'); if not FOUND then raise EXCEPTION 'Aggregate columns not found'; end if; perform 1 from ps_table where lower(name) = lower(p_table); if not FOUND then insert into ps_table(name) values (lower(p_table)); end if; select id into l_table from ps_table where lower(name) = lower(p_table); insert into ps_snapshot(table_id, snapshot_id, type_name) values (l_table, l_snapshot, p_type); perform ps_trigger_regenerate(l_table); l_sql := 'create table ' || p_snapshot || ' ('; l_flag := FALSE; for columns in select name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || columns.name || ' date not null'; else l_sql := l_sql || columns.name || ' bigint not null'; end if; end loop; l_sql := l_sql || ', primary key ('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || '))'; execute l_sql; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; l_sql := 'insert into ' || p_snapshot || '('; l_flag := FALSE; for columns in select name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; l_sql := l_sql || columns.name; end loop; l_sql := l_sql || ') select '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; end if; if columns.type_name = 'key' then l_sql := l_sql || columns.name; end if; if columns.type_name = 'nullable' then l_sql := l_sql || 'coalesce(' || columns.name || ', 0)'; end if; if columns.type_name = 'sum' then l_sql := l_sql || 'sum(' || columns.name || ')'; end if; if columns.type_name = 'min' then l_sql := l_sql || 'min(' || columns.name || ')'; end if; if columns.type_name = 'max' then l_sql := l_sql || 'max(' || columns.name || ')'; end if; if columns.type_name = 'cnt' then l_sql := l_sql || 'count(' || columns.name || ')'; end if; end loop; l_sql := l_sql || 'from ' || p_table || ' group by '; l_flag := FALSE; for columns in select parent_name as name, type_name from ps_column where table_id = l_snapshot and type_name in ('date', 'key', 'nullable') loop if l_flag then l_sql := l_sql || ', '; end if; l_flag := TRUE; if columns.type_name = 'date' then l_sql := l_sql || 'date_trunc(lower(''' || p_type || '''), ' || columns.name || ')'; else l_sql := l_sql || columns.name; end if; end loop; execute l_sql; end; $$ language plpgsql;
      
      







ps_del_snapshot(varchar)
 create or replace function ps_del_snapshot(in p_snapshot varchar) returns void as $$ declare l_sql text; p_table varchar(50); l_table bigint; l_snapshot bigint; begin select a.table_id, c.name into l_table, p_table from ps_snapshot a, ps_table b, ps_table c where b.id = a.snapshot_id and c.id = a.table_id and lower(b.name) = lower(p_snapshot); select id into l_snapshot from ps_table where lower(name) = lower(p_snapshot); delete from ps_snapshot where snapshot_id = l_snapshot; delete from ps_column where table_id = l_snapshot; delete from ps_table where id = l_snapshot; execute 'drop trigger if exists ps_' || p_table || '_before_insert on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_update on ' || p_table; execute 'drop trigger if exists ps_' || p_table || '_after_delete on ' || p_table; perform 1 from ( select 1 from ps_range_partition where table_id = l_table union all select 1 from ps_snapshot where table_id = l_table ) a; if not FOUND then execute 'drop function if exists ps_' || p_table || '_insert_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_raise_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_update_trigger() cascade'; execute 'drop function if exists ps_' || p_table || '_delete_trigger() cascade'; else perform ps_trigger_regenerate(l_table); l_sql := 'create trigger ps_' || p_table || '_before_insert ' || 'before insert on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_insert_trigger()'; execute l_sql; perform 1 from ps_snapshot a, ps_column b where b.table_id = a.snapshot_id and a.table_id = l_table and b.type_name in ('min', 'max'); if FOUND then l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_raise_trigger()'; execute l_sql; else l_sql := 'create trigger ps_' || p_table || '_after_update ' || 'after update on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_update_trigger()'; execute l_sql; l_sql := 'create trigger ps_' || p_table || '_after_delete ' || 'after delete on ' || p_table || ' for each row ' || 'execute procedure ps_' || p_table || '_delete_trigger()'; execute l_sql; end if; end if; execute 'drop table if exists ' || p_snapshot; end; $$ language plpgsql;
      
      







ここでも、根本的に新しいものは何もありません。注意したいのは、 'min'または 'max'集計を使用する場合、トリガーを作成するときにps_TABLE_raise_trigger()関数を使用することです。スナップショットを構築しました。 これは、ソーステーブルでupdateステートメントとdeleteステートメントを実行するときにこれらの集計を更新するための適切なパフォーマンス実装を思い付かなかったためです。



すべてがどのように機能するかを見てみましょう。 テストテーブルを作成します。



 create sequence test_seq; create table test ( id bigint default nextval('test_seq') not null, event_time timestamp not null, customer_id bigint not null, value bigint not null, primary key(id) );
      
      





ここで、セクションを追加するには、次のクエリを実行するだけで十分です。



 select ps_add_range_partition('test', 'event_time', 'month', to_date('20130501', 'YYYYMMDD'))
      
      





その結果、継承テーブルtest_20130501が作成され、5月のすべてのレコードが自動的に含まれます。



セクションを削除するには、次のクエリを実行できます。



 select ps_del_range_partition('test', to_date('20130501', 'YYYYMMDD'))
      
      







最初に興味のある列を決定する必要があるため、スナップショットの作成はもう少し複雑です。



 select ps_add_snapshot_column('test_month', 'customer_id', 'key') select ps_add_snapshot_column('test_month', 'event_time', 'date') select ps_add_snapshot_column('test_month', 'value_sum', 'value', 'sum') select ps_add_snapshot_column('test_month', 'value_cnt', 'value', 'cnt') select ps_add_snapshot_column('test_month', 'value_max', 'value', 'max') select ps_add_snapshot('test', 'test_month', 'month')
      
      







その結果、次のクエリに基づいて自動的に更新されるテーブルが作成されます。



 select customer_id, date_trunc('month', event_time), sum(value) as value_sum, count(value) as value_cnt, max(value) as value_max from test group by customer_id, date_trunc('month', event_time)
      
      





次のクエリを実行して、スナップショットを削除できます。



 select ps_del_snapshot('test_month')
      
      





今日は以上です。スクリプトはGitHubで取得できます




All Articles