PostgreSQL 9.4以降での論理レプリケーション用のプラグインの作成

多くの興味のある人が知っているように、バージョン9.4のPostgreSQLは(最終的に)論理デコードを導入しました。 ここで、独自の複製を作成するために、バイナリwalファイルの形式を処理したり、トリガーを書き込んだり(まだ方法があるかもしれません)、データを便利な形式に変換する必要はありません。 これを行うには、これを行うPostgreSQLプラグインを作成するだけです。 この記事では、データをJSONに変換するプラグインについて説明します。





プラグインコードはgithub-github.com/ildus/decoder_jsonにあります。 プルリクエストは、改善(特に、型サポートの改善という点)、バグ修正、および見た目の改善で歓迎されます。 JSONは単純化のために選択されました。 これは最終バージョンではありません。おそらく実際のデータでテストした後、より生産的なフォーマットが必要であることが判明し、やり直す必要があります。 この記事では、すべてのプラグインコードではなく、伝える必要があると思われる部分のみを提供します。



プラグインを作成するために必要な要件:Cの知識、インストール済みビルドツール(gcc、cmake)、インストール済みパッケージ(Debianシステム)postgresql-9.4、postgresql-server-dev-9.4、および他のシステムでの類似。 postgresqlをインストールした後、postgresql.confでmax_replication_slots = 1(またはそれ以上)およびwal_level = logicalを設定する必要があります。



プラグイン自体はCライブラリで、そこからpostgresqlイベントのコールバック関数が呼び出されます。 初期化中に、 _PG_output_plugin_init



関数は、フィールドにフィールドに関数を割り当てる必要がある構造で_PG_output_plugin_init



れます。





構造を満たす関数:



 void _PG_output_plugin_init(OutputPluginCallbacks *cb) { cb->startup_cb = decoder_json_startup; cb->begin_cb = decoder_json_begin_txn; cb->change_cb = decoder_json_change; cb->commit_cb = decoder_json_commit_txn; cb->shutdown_cb = decoder_json_shutdown; }
      
      





これらの5つの機能を決定するのは今です。 decoder_json_startup



はデコードの開始時に呼び出され、デコードオプションを設定し、独自のメモリコンテキストを作成するために使用されます。



Decoder_json_startup関数
 /* initialize this plugin */ static void decoder_json_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { ListCell *option; DecoderRawData *data; data = palloc(sizeof(DecoderRawData)); data->context = AllocSetContextCreate(ctx->context, "Raw decoder context", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); data->include_transaction = false; data->sort_keys = false; ctx->output_plugin_private = data; /* Default output format */ opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; foreach(option, ctx->output_plugin_options) { DefElem *elem = lfirst(option); Assert(elem->arg == NULL || IsA(elem->arg, String)); if (strcmp(elem->defname, "include_transaction") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->include_transaction = true; else if (!parse_bool(strVal(elem->arg), &data->include_transaction)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else if (strcmp(elem->defname, "sort_keys") == 0) { /* if option does not provide a value, it means its value is true */ if (elem->arg == NULL) data->sort_keys = true; else if (!parse_bool(strVal(elem->arg), &data->sort_keys)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"))); } } }
      
      





ここでは、プラグインに渡されたパラメーターが解析され、構造に保存されます。 作成されたメモリコンテキストは、その後、 decoder_json_change



使用され、使用されたリソースが正しく消去されます。 重要なポイント:





decoder_json_shutdown



は、デコードの終了時に呼び出され、リソースをクリーンアップするために使用されます。



Decoder_json_shutdown関数
 /* cleanup this plugin's resources */ static void decoder_json_shutdown(LogicalDecodingContext *ctx) { DecoderRawData *data = ctx->output_plugin_private; /* cleanup our own resources via memory context reset */ MemoryContextDelete(data->context); }
      
      







その後、最も興味深い。 pg_logical_slot_peek_changes



およびpg_logical_slot_get_changes



受信した行を実際に生成する関数decoder_json_begin_txn



decoder_json_commit_txn



decoder_json_change



を定義する必要がありpg_logical_slot_get_changes



。 生成された行をスロットに追加する必要があります。これは次のコマンドで実行されます。



 OutputPluginPrepareWrite(ctx, true); appendStringInfoString(ctx->out, "some string"); OutputPluginWrite(ctx, true);
      
      





関数decoder_json_begin_txn



およびdecoder_json_commit_txn



は、トランザクションの開始および終了コマンドをスロットに書き込みます(または、それぞれ行 'begin'および 'commit')。



decoder_json_change



関数decoder_json_change



、データ変更イベントで呼び出されます。 この関数では、発生したイベント(INSERT、UPDATE、DELETE)が正確に判断され、それぞれのイベントに対して独自の構造が作成されます。 UPDATEおよびDELETEの場合、テーブルに一意の(nullではない)キーまたは主キーを含めることが重要です。そうしないと、変更可能な(削除された)行を定義することができません。 これは、テーブルのREPLICA IDENTITYパラメーターの値に依存します。



この関数は4つのパラメーターを取ります。





関数について簡単に説明すると、操作のタイプはchange->action



によって決定されると言えます。 さらに、 change



のデータ( change->data.tp.newtuple



およびchange->data.tp.oldtuple



change->data.tp.oldtuple



、JSON構造が作成されます。 JSONはlibjanssonライブラリを使用して生成されます。



これが困難の始まりです。 主キーが欠落している場合、表のREPLICA IDENTITYがNOTHINGまたはDEFAULTに設定されている場合、変更可能な行を定義することは不可能であり、追加レコードのみがログに記録されます。 DEFAULT、FULL、INDEXを使用してテーブルのデータを更新または削除し、一意のキーがある場合、その値はnewtupleまたはoldtupleから取得されます(キー値がクエリによって変更された場合)。 一意のキーがなく、FULLの場合、oldtupleのすべての値が識別に使用されます。



その結果、 {"a": 0, "r": "public.some_table", "c": {"id": 1}, "d": {"a": 2}}



の形式のJSON構造が構築されます。 aはアクションのタイプ、rはテーブルの名前、cは行を識別するための値、dは実際のデータです。



作業を確認してください。 プラグインをビルドし、テストを実行します。



 git clone https://github.com/ildus/decoder_json.git cd decoder_json #         -         sudo chmod a+rw `pg_config --pkglibdir` chmod a+rwx ./ #    libjansson,   JSON make deps #    postgres sudo su postgres make make test
      
      





手動プラグインテスト:



 #    postgres,          sudo su postgres createdb test_db psql test_db # psql  test_db=# create table test1 (id serial primary key, name varchar); test_db=# SELECT * FROM pg_create_logical_replication_slot('custom_slot', 'decoder_json'); slot_name | xlog_position -------------+--------------- custom_slot | 0/4D9F870 (1 row)
      
      





ここで、スロットとプラグインの名前を示します。 応答では、スロットの名前と、スロット内のデータの記録が実際に始まる場所(xlogの位置)が表示されます。 プラグインを指定したという事実は、プラグインが既に機能していることを意味するものではなく、データを取得したときにのみデコード自体が開始されます。 このために、関数pg_logical_slot_peek_changes



およびpg_logical_slot_get_changes



ます。 get関数がデータを受信した後にキューを消去するという点で異なります。



データの追加:



 test_db=# insert into test1 values (1, 'bb'); INSERT 0 1 test_db=# insert into test1 values (2, 'bb'); INSERT 0 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------------------- 0/BAB0968 | 48328 | begin 0/BAB0968 | 48328 | {"a":0,"r":"public.test1","d":{"id":1,"name":"bb"}} 0/BAB09F0 | 48328 | commit 0/BAB09F0 | 48329 | begin 0/BAB09F0 | 48329 | {"a":0,"r":"public.test1","d":{"id":2,"name":"bb"}} 0/BAB0A78 | 48329 | commit (6 rows)
      
      





データ変更



 test_db=# update test1 set name = 'dd' where id=2; UPDATE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+------------------------------------------------------------------ 0/BB4C700 | 48338 | begin 0/BB4C700 | 48338 | {"c":{"id":2},"a":1,"r":"public.test1","d":{"id":2,"name":"dd"}} 0/BB4C798 | 48338 | commit (3 rows)
      
      





データ削除:



 test_db=# delete from test1 where id=2; DELETE 1 test_db=# select * from pg_logical_slot_get_changes('custom_slot', NULL, NULL, 'include_transaction', 'on'); location | xid | data -----------+-------+----------------------------------------- 0/BB4C8A8 | 48339 | begin 0/BB4C8A8 | 48339 | {"c":{"id":2},"a":2,"r":"public.test1"} 0/BB4C9C8 | 48339 | commit (3 rows)
      
      





使用済みの有用なリソース:






All Articles