PythonのRabbitMQでの作業について簡単に説明します

KDPV







そのため、RabbitMQを使用する場合、MegaFonで作業するプロセスで同じタスクに直面する必要がありました。 「このようなタスクの実装を単純化および自動化する方法は?」という疑問が自然に生じます。







頭に浮かぶ最初の解決策は、HTTPインターフェイスを使用することです。もちろん、RabbitMQにはすぐに使用できる優れたWebインターフェイスとHTTP APIがあります。 それでも、HTTP APIの使用は常に便利であるとは限らず、AMQPプロトコルを使用して作業する必要が生じるような瞬間には不可能な場合もあります(十分なアクセス権がないが、メッセージを公開したい場合など)。







ネットワークのオープンスペースで自分に適した既成のソリューションが見つからないため、AMQPプロトコルを使用してRabbitMQを操作するための小さなアプリケーションを作成することにしました コマンドラインを介して起動パラメータを転送し、最低限必要な機能セットを提供する機能:









Pythonは、このようなタスクを実装するための最も単純な(そして私の意見では美しい)ツールとして選ばれました。 (ここで議論することができますが、何が変わるのでしょうか?)







RabbitMQの公式ガイドの翻訳( 1回2回 )はハブに表示されますが、練習からの簡単な例が役立つ場合があります。 この記事では、小さなアプリケーションの例を使用して、PythonのAMQPチャネルを使用してウサギを操作するときに生じる主な問題を説明しようとします。 アプリケーション自体はGitHubで入手できます







AMQPプロトコルとRabbitMQメッセージブローカーについて簡単に説明します



AMQPは、分散システムのコンポーネント間で最も一般的なメッセージングプロトコルの1つです。 このプロトコルの主な特徴は、 キュー交換ポイントという2つの主要な構造要素を含むメッセージルートを構築する概念です。 キューは、受信されるまでメッセージを蓄積します。 交換ポイントは、目的のキューまたは別の交換ポイントにルーティングするメッセージディストリビューターです。 交換ポイントがメッセージの送信先を決定する配布ルール(バインディング)は、指定されたマスクに準拠しているかどうかメッセージのルーティングキーをチェックすることに基づいています 。 AMQPの仕組みについては、 こちらをご覧ください







RabbitMQは、AMQPを完全にサポートし、多くの追加機能を提供するオープンソースアプリケーションです。 RabbitMQを使用するために、Pythonを含むさまざまなプログラミング言語で多数のライブラリが記述されています。







Pythonの実装



いつでも個人使用のためにいくつかのスクリプトを投げることができ、それらのトラブルを知らないことがあります。 同僚にそれらを広めることになると、すべてがより複雑になります。 誰もがどのように何を起動するのか、何をどこで変更するのか、どこで最新バージョンを入手するのか、そして何が変更されたのかを示す必要があります。 使いやすさのために、アプリケーションを4つのモジュールに分割することが決定されました。







  1. 投稿を担当するモジュール
  2. キューからメッセージを減算するモジュール
  3. RabbitMQブローカーの構成を変更するように設計されたモジュール
  4. 以前のモジュールに共通のパラメーターとメソッドを含むモジュール


このアプローチにより、一連の起動パラメーターが簡素化されます。 目的のモジュールを選択し、その動作モードの1つを選択し、必要なパラメーターを渡しました(動作モードとパラメーターの詳細については、-helpヘルプを参照)。







MegaFonの「ウサギ」の構造は十分な数のノードで構成されているため、使用の便宜上、ノードに接続するためのデータは、一般的なパラメーターとメソッドrmq_common_tools.pyを使用してモジュールに転送されます。







PythonでAMQPを処理するには、 Pikaライブラリを使用します。







import pika
      
      





このライブラリを使用して、RabbitMQでの作業は3つの主要な段階で構成されます。







  1. 接続を確立する
  2. 必要な操作を実行する
  3. 接続を閉じる


最初と最後のステージはすべてのモジュールで同じであり、 rmq_common_tools.pyで実装されます







接続を確立するには:







 rmq_parameters = pika.URLParameters(rmq_url_connection_str) rmq_connection = pika.BlockingConnection(rmq_parameters) rmq_channel = rmq_connection.channel()
      
      





Pikaライブラリを使用すると、RabbitMQに接続するためのさまざまな設計オプションを使用できます。 この場合、最も便利なオプションは、パラメーターをURL文字列の形式で次の形式で渡すことです。







 'amqp://rabbit_user:rabbit_password@host:port/vhost'
      
      





接続を閉じるには:







 rmq_connection.close()
      
      





転記



メッセージの公開はおそらく最も簡単ですが、同時にウサギを操作する際の最も一般的な操作です。







rmq_publish.pyでコンパイルされたポストパブリッシングツール







メッセージを投稿するには、メソッドを使用します







 rmq_channel.basic_publish(exchange = params.exch, routing_key = params.r_key, body = text)
      
      





ここで:

exchange-メッセージが発行される交換ポイントの名前

routing_key-メッセージの公開に使用されるルーティングキー

body-メッセージ本文







rmq_publish.pyは、公開用に2つのメッセージ入力モードをサポートしています。







  1. メッセージは、コマンドライン(from_console)を介してパラメーターとして入力されます
  2. メッセージはファイル(from_file)から読み取られます


私の意見では、2番目のモードは、大きなメッセージまたはメッセージ配列を処理する場合により便利です。 最初の方法では、追加のファイルなしでメッセージを送信できます。これは、モジュールを他のシナリオに統合するときに便利です。







メッセージを受信する



メッセージを受信するという問題は、もはや発行のような些細なことではありません。 メッセージの読み取りに関しては、次のことを理解する必要があります。









rmq_consume.pyファイルに実装されたメッセージリーダー







次の2つの動作モードが提供されます。







  1. 既存のキューからメッセージを読み取る
  2. このキューからメッセージを読み取るためのタイムキューとルートの作成


キューとルートを作成する問題については、以下で検討します。







直接校正は次のように実装されます。







 channel.basic_consume(on_message, queue=params.queue) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() except Exception: channel.stop_consuming() rmq_tools.console_log(":\n", traceback.format_exc())
      
      





どこで

on_message-メッセージハンドラプロシージャ

params.queue-減算が行われるキューの名前







メッセージハンドラーは、読み取られたメッセージを使用して何らかの操作を実行し、メッセージ配信を確認する(または必要に応じて確認しない)必要があります。







 def on_message(channel, method_frame, header_frame, body): global all_cnt, lim if all_cnt >= lim: rmq_tools.console_log('   .') raise KeyboardInterrupt body_str = body.decode("utf-8")[:4000] rk = method_frame.routing_key rmq_params.file.write(rk + '\n') rmq_params.file.write(body_str + '\n\n') all_cnt = all_cnt + 1 if (lim != 0) and (rmq_params.file == sys.stdout): sys.stdout.write(f'[{rmq_tools.time_now()}] - {all_cnt} of {lim} messages consumed.\r') channel.basic_ack(delivery_tag=method_frame.delivery_tag)
      
      





どこで

all_cnt-グローバルカウンター

lim-読み取られるメッセージの数







このようなハンドラーの実装では、特定の数のメッセージが減算され、ファイルで記録が行われた場合に減算の進行状況に関する情報がコンソールに出力されます。







読み取りメッセージをデータベースに書き込むこともできます。 現在の実装では、このような機会は提示されていませんが、追加することは難しくありません。







DBに記録する

Oracleデータベースおよびcx_oracleライブラリのデータベースにメッセージを書き込む例を検討します。







データベースに接続する







 ora_adress = 'host:port/dbSID' ora_creds = 'user/pass' connection_ora = cx_Oracle.connect(ora_creds + '@' + ora_address) ora_cursor = connection_ora.cursor()
      
      





on_messageハンドラーに追加します







 global cnt, commit_int insert_rec = 'insert into ' + tab_name + '(routing_key, text) values (:rkey, :text)' ora_cursor.execute(insert_rec, text = body_str, rkey = rk) if cnt > commit_int : ora_cursor.execute('commit') cnt = 1 cnt = cnt + 1
      
      





どこで

cntは別のカウンターです

commit_int-データベースへの挿入数。その後、「コミット」する必要があります。 このようなパラメーターが存在するのは、データベースの負荷を軽減したいためです。 ただし、インストールはそれほど大きくありません。 障害が発生した場合、最後の正常なコミット後に読み取られたメッセージを失う可能性があります。







そして、予想どおり、作業の最後に最終コミットを行い、接続を閉じます







 ora_cursor.execute('commit') connection_ora.close()
      
      





このような何かがメッセージを読んでいます。 既読メッセージの数の制限を削除すると、「ウサギ」からのメッセージを継続的に読むためのバックグラウンドプロセスを作成できます。







構成



AMQPプロトコルは主にメッセージの公開と読み取りを目的としていますが、ルートの構成を使用して簡単な操作を実行することもできます(ネットワーク接続やその他のRabbitMQ設定をアプリケーションとして構成することについては話していません)。







基本的な構成操作は次のとおりです。







  1. キューまたは交換ポイントの作成
  2. 転送ルールの作成(バインド)
  3. キューまたは交換ポイントの削除
  4. 転送ルールの削除(バインド)
  5. キューのクリア


それらのそれぞれについて、pikaライブラリに既製の手順があるため、起動の便宜上、それらはrmq_setup.pyファイルに単純にコンパイルされます。 次に、パラメーターに関するいくつかのコメントとともに、pikaライブラリーからの手順をリストします。







キューを作成する







 rmq_channel.queue_declare(queue=params.queue, durable = params.durable)
      
      





ここではすべてが簡単です

queue-作成するキューの名前

耐久性 -論理パラメーター。値がTrueの場合、ウサギが再起動してもキューは存在し続けます。 Falseの場合、キューは再起動時に削除されます。 通常、2番目のオプションは、将来必要とされないことが保証されている一時キューに使用されます。







交換ポイントの作成(交換)







 rmq_channel.exchange_declare(exchange=params.exch, exchange_type = params.type, durable = params.durable)
      
      





ここに、新しいexchange_typeパラメーターが表示されます-交換ポイントのタイプ。 交換ポイントの種類については、 こちらをご覧ください

exchange-作成された交換ポイントの名前







キューまたは交換ポイントの削除







 rmq_channel.queue_delete(queue=params.queue) rmq_channel.exchange_delete(exchange=params.exch)
      
      





転送ルールの作成(バインド)







 rmq_channel.queue_bind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
      
      





exchange-転送が行われる交換ポイントの名前

queue-転送されるキューの名前

routing_key-転送に使用されるルーティングキーのマスク。







有効なエントリは次のとおりです。









転送ルールの削除(バインド)







 rmq_channel.queue_unbind(exchange=params.exch, queue=params.queue, routing_key=params.r_key)
      
      





すべてが転送ルールの作成に似ています。







キューのクリア







 rmq_channel.queue_purge(queue=params.queue)
      
      





queue-クリアするキューの名前







Pythonアプリケーションでのコマンドラインインターフェイスの使用について

起動オプションは、人生をずっと楽にします。 各起動前にコードを編集しないために、起動時にパラメーターを渡すためのメカニズムを提供することは論理的です。 この目的のためにargparseライブラリが選択されました。 その使用の複雑さについては詳しく説明しません。このテーマに関する十分なガイドがあります(1、2、3)。 このツールは、アプリケーションを使用するプロセスを大幅に簡素化するのに役立ちました(呼び出し可能な場合)。 単純なコマンドシーケンスをスローして同様のインターフェイスにラップしたとしても、本格的で使いやすいツールを入手できます。







日常生活での応用。 最も便利になったもの。



さて、今では日常生活でのAMQPの使用について少し印象を受けました。







最も人気のあった機能は、メッセージの公開です。 特定のユーザーのアクセス権では、必ずしもWebインターフェイスを使用できるとは限りませんが、特定のサービスをテストするだけでよい場合もあります。 ここで、このチャネルを使用するサービスに代わってAMQPと承認が支援に渡されます。







2番目に人気があったのは、タイムキューからメッセージを読み取る機能です。 この機能は、新しいルートとメッセージフローの設定、および事故の防止に役立ちます。







他の可能性も、さまざまなタスクに適用されます。








All Articles