CでのPostgreSQL非同期操作

今日、私はCでのPostgreSQLの非同期作業について簡単なメモを書きたかったのです。その動機は単純です。小さなユーティリティの場合、そのような機能を実装する必要がありました。 -クエリのクラス)、この問題に関する公式ドキュメント、非常に詳細ではありますが、構造化されていませんが、非同期モードでlibpqライブラリを操作するアルゴリズムには多くの落とし穴があります。 したがって、質問を整理し、誰かがこれを有用だと思った場合に備えて、結果を公開します。



そのため、PostgreSQLが何であるか、同期(ブロック)モードの操作が非同期モードとどのように異なるかを誰も知る必要がないと想定しています。読者は大まかに理解しています。 ちなみに、非同期呼び出しの最初の明白な利点(I / Oとスレッドの実行をブロックしないため、追加のスレッドを作成したり、それらを同期したりする必要がなくなります)、Postgreの場合には、通常のPQexecメソッドがあります一度に1つのSQLクエリのみを実行した結果を取得できます。非同期libpq関数にはこのような制限はありません。



前にも言ったように、libpqには非同期モードでかなりの落とし穴があります。 非同期モードが美しく完全に実装されているライブラリがあります(開発者はコールバックを割り当てることで絶対に非同期メソッドを呼び出し、その後、ライブラリイベントループを単に「回転」するだけで十分です(メソッドを無限にまたはタイマーで呼び出します)。その後、ライブラリ自体が処理を行います。必要なシーケンスのコマンド、イベントのキャッチ、コールバックの呼び出し)、PostgreSQLには異なる作業モデルがあります。



現在の状態と前の操作の結果に応じて、厳密に定義された順序で呼び出す必要がある非同期接続および要求用のコマンドが多数あります。さらに、ソケットの準備状況を手動で確認する必要があります。 ある場所でミスをして、間違った時間に関数を呼び出したり、その逆を行ったり、ビジーなソケットにアクセスしようとするだけで十分です。これにより、ストリームがブロックされる可能性があります(最悪の場合、無限、つまりフリーズ)。 また、非同期モードのライブラリは、操作のタイムアウトをほとんど制御できません。すべてを自分で処理する必要があります。



公式ドキュメントでは、非同期モードでの作業に関するほとんどの情報は、 1回2回の 2つのセクションに記載されています。



まあ、私たちはポイントに正しくなります。



非同期モードでデータベースへの接続を確立するには、手順は次のようになります。



1.接続構造にメモリを割り当て、PQconnectStart()メソッドを使用して接続を開始します



2.操作のタイムアウトをさらに制御できるように、現在の時刻を覚えておいてください。



3. PQstatus()を呼び出して、接続が成功したことを確認します。 結果がCONNECTION_BADの場合、初期化は失敗しました(たとえば、接続文字列のエラーやメモリ割り当てが失敗した)。そうでない場合は続行できます



4. PQconnectPoll()メソッドを使用して、現在の接続ステータスを確認します。



考えられる結果:



PGRES_POLLING_WRITING -       PGRES_POLLING_READING -       PGRES_POLLING_FAILED -         PGRES_POLLING_OK -   
      
      





5.ステータスがPGRES_POLLING_WRITINGまたはPGRES_POLLING_READINGの場合、PQsocket()メソッドを使用して使用済みの接続ソケットを取得し、システム関数を選択()またはポーリング()する必要があります。 OKまたはFAILEDの結果、またはタイムアウトが期限切れになる前(忘れないでください、タイムアウトは手動でチェックする必要があります)。



PQconnectPoll()への次の呼び出しがソケットを解放する場合、スレッドはブロックされます。これは念頭に置く必要があります。



このすべての後、すべてがうまくいけば、データベースへの確立された接続を取得します。 SQLクエリを実行する手順は次のようになります。



1. PQsendQuery()コマンドを使用して、サーバーに送信するリクエストを準備します。



2. PQsetnonblocking()メソッドを使用してリクエストを送信するための非ブロッキングモードを設定します。これは、デフォルトではlibPqが非同期で読み取りのみを行い、ソケットへの書き込みを行わないためです。



3. 0(リクエストが正常に送信された)または-1(エラー)を返すまでPQflush()を実行します。



4.アクティブなソケットを取得し、操作の準備ができるまでselect()またはpoll()で読み取る準備ができていることを確認します。



5. PQconsumeInput()を実行します。 関数が0を返した場合、エラーが発生しました。



6. PQisBusy()を実行します。 関数が1を返した場合、リクエストの処理またはサーバーの応答の読み取りはまだ完了していないため、ステップ4からアルゴリズムを再度繰り返す必要があります。

もちろん、タイムアウトを制御することを忘れないでください。



上記のすべての操作を実行した後、クエリ結果を通常どおり操作できます-PQgetResult()、PQgetvalue()など。



さあ、練習しましょう。 コードはCですが、C ++プログラムで使用するためにクラスにラップする場合は、必要に応じてすべてが非常に簡単です。



 //   - : gcc pgtest4.c -I/usr/include/postgresql -lpq #include <libpq-fe.h> //<     PostgreSQL #include <sys/socket.h> //< setsockopt()    #include <sys/select.h> //< select() #include <sys/time.h> //< gettimeoftheday() #include <unistd.h> //< usleep()    #define SOCK_POLL_TIMEOUT 100 //     (      ?)   typedef enum { DISCONNECTED = 0, CONN_POLLING, CONN_READING, CONN_WRITING, READY, QUERY_SENT, QUERY_FLUSHING, QUERY_BUSY, QUERY_READING, CLOSING, ERROR } pq_state; typedef enum { NO_ERROR = 0, ALLOCATION_FAIL, POLLING_FAIL, READING_FAIL, WRITING_FAIL, TIMEOUT_FAIL } pq_error; struct pqconn_s{ pq_state state; //<   PGconn* conn; //<        unsigned long start; //<     ( ) long timeout; //<    pq_error error; //<   -  ,     }; /** * @brief    * @return    */ unsigned long time_ms(void) { struct timespec tp; // gettimeoftheday()   ,    clock_gettime(CLOCK_MONOTONIC, &tp); return (tp.tv_sec * 1000 + tp.tv_nsec / 1000000); } /** * @brief   ()   / * @param socket_fd -    * @param rw - 0    , 1    * @return   select(): -1 = , 0 -  (), 1 -  */ int try_socket(int socket_fd, int rw) { fd_set fset; struct timeval sock_timeout; sock_timeout.tv_sec = 0; sock_timeout.tv_usec = SOCK_POLL_TIMEOUT; FD_ZERO(&fset); FD_SET(socket_fd, &fset); setsockopt(socket_fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&sock_timeout, sizeof(struct timeval)); //       SO_SNDTIMEO. . return select(socket_fd + 1, ((!rw) ? &fset : NULL), ((rw) ? &fset : NULL), NULL, &sock_timeout); } /** * @brief       * @param conninfo -     * @param s -    pqconn_s        * @param timeout -     * @return 0 -  (     s->error), 1 -  */ int pgsql_connection_start(const char* conninfo, struct pqconn_s* s, long timeout) { if (!s) return 0; if (!conninfo) { s->error = ALLOCATION_FAIL; return 0; } s->conn = PQconnectStart(conninfo); s->state = CONN_POLLING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; ConnStatusType status; status = PQstatus(s->conn); if (status == CONNECTION_BAD) { s->state = ERROR; s->error = POLLING_FAIL; return 0; } return 1; } /** * @brief          * @param command - SQL- * @param s -    pqconn_s        * @param timeout -     * @return 0 - , 1 -  */ int pgsql_send_query(struct pqconn_s* s, const char *command, long timeout) { if (s->state != READY) { return 0; } if (!PQsendQuery(s->conn, command)) { return 0; } PQsetnonblocking(s->conn, 0); s->state = QUERY_FLUSHING; s->start = time_ms(); s->timeout = timeout; s->error = NO_ERROR; return 1; } /** * @brief  ,     * @param s -    pqconn_s        */ void pgsql_event_loop(struct pqconn_s* s) { if ((s->state == DISCONNECTED) || (s->state == READY)) return; if ((time_ms() - s->start) > s->timeout) { s->state = CLOSING; s->error = TIMEOUT_FAIL; } if (s->state == CONN_POLLING) { PostgresPollingStatusType poll_result; poll_result = PQconnectPoll(s->conn); if (poll_result == PGRES_POLLING_WRITING) s->state = CONN_WRITING; if (poll_result == PGRES_POLLING_READING) s->state = CONN_READING; if (poll_result == PGRES_POLLING_FAILED) { s->state = ERROR; s->error = POLLING_FAIL; } if (poll_result == PGRES_POLLING_OK) s->state = READY; } if (s->state == CONN_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CONN_WRITING) { int sock_state = try_socket(PQsocket(s->conn), 1); if (sock_state == -1) { s->error = WRITING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = CONN_POLLING; } if (s->state == CLOSING) { PQfinish(s->conn); s->state = ERROR; } if (s->state == QUERY_FLUSHING) { int flush_res = PQflush(s->conn); if (0 == flush_res) s->state = QUERY_READING; if (-1 == flush_res) { s->error = WRITING_FAIL; s->state = CLOSING; } } if (s->state == QUERY_READING) { int sock_state = try_socket(PQsocket(s->conn), 0); if (sock_state == -1) { s->error = READING_FAIL; s->state = CLOSING; } if (sock_state > 0) s->state = QUERY_BUSY; } if (s->state == QUERY_BUSY) { if (!PQconsumeInput(s->conn)) { s->error = READING_FAIL; s->state = CLOSING; } if (PQisBusy(s->conn)) s->state = QUERY_READING; else s->state = READY; } }
      
      





最初に、必要なすべての状態と起こりうるエラーを説明し、接続と実行されたデータが格納される構造を宣言します。ライブラリがサーバーで動作するために必要なPGconn構造へのポインター、マシンの状態、エラーコード(存在する場合)、現在の操作の開始時間(タイムアウトを制御するため)。



2つの小さな関数time_ms()およびtry_socket()は、標準ライブラリの関数のラッパーであり、それぞれミリ秒単位で現在の時刻を取得し、ソケットのビジー状態をチェックします。



これを何らかの方法で次のように使用できます。



 int main(void) { struct pqconn_s s; pgsql_connection_start("dbname=db1 user=user1 password=password1 hostaddr=10.0.0.1 port=5432", &s, 15000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB connection failed \n"); return 1; } pgsql_send_query(&s, "SELECT * FROM history;", 50000); while ((s.state != ERROR) && (s.state != READY)) { pgsql_event_loop(&s); } if (s.state == ERROR) { perror("DB query failed \n"); return 1; } PGresult *res; int rec_count; int row; int col; res = PQgetResult(s.conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { perror("We did not get any data!\n"); return 1; } rec_count = PQntuples(res); printf("Received %d records.\n", rec_count); for (row=0; row<rec_count; row++) { for (col=0; col<3; col++) { printf("%s\t", PQgetvalue(res, row, col)); } puts(""); } PQclear(res); }
      
      





上記の例がブロックモードでも機能することは明らかです(構造体の状態フィールドがERRORまたはREADYに設定されるのを待つ必要があるため)、ご想像のとおり、ケースは小さいです:代わりにpgsql_event_loopを追加する必要があります()接続が成功した場合、データを受信した場合、またはエラーが発生した場合にコールバックを呼び出し、メインプログラムループ内の残りのアクションと共にイベントループをねじるか、タイマーで呼び出してから、データベースを非同期に操作します。



上記が誰かに役立つことを心から願っています。



All Articles