ツイストを例として使用した非同期アプリケーションの競争力

理論的には、競合するアクセスの問題は非同期アプリケーションでは一般的ではありません。 ある種の共有リソースのふりをする複数のタスクをいつでも実行できる並列アーキテクチャを備えたアプリケーションとは対照的に、非同期アプリケーションでは、一度に1つのアクティビティのみが実行されます。



しかし実際には、すべてが少し異なって見えます:



次の状況を予測しています。 このサービスは、リクエストを並行して処理するリモートサービス(S)と連携します。 Sのデータに依存するリソースAがあります。リクエストR1はAからSにデータを要求します。この時点で、R2が到着し、Sの対応するデータを変更します。以前に処理されました(図1を参照)。



画像

図1 クエリが実行される順序。



現在、R1はR2の変更を考慮に入れたデータを受け取りますが、リソースAは、これが操作R2の結果であることをまだ知りません。 そこには競合状態があります。



例:


Kolyaには妻と銀行口座があります。 Kolyaと彼の妻は、家族に平等な権利と共通の予算があると判断したため、それぞれにこのアカウントにカードが添付されています。 Kolyaはフリーランスで働いており、最後のプロジェクトの支払いが必要になる時期が来ています。

妻はKolyaを喜ばせ、DRに新しいiPad2を提供することにしました。 早朝から上昇し、彼女は彼女の口座でN $を見つけました! 彼女は夫にiPad2を注文し、自分用の新しい時計を注文します。 Kolyaが銀行口座を確認すると、「まだ」ゼロがあります。 彼は、最新の操作をチェックすることも、顧客と長い間呪いをかけることも考えていません。



Kolya-リクエスト1、妻-リクエスト2、銀行口座-リモートサービスのリソース、銀行カード-リモートリソースのプロキシ。



Kolyaの代わりに、彼の妻と銀行口座には、金融サービス、プロセス制御システムなどがあります。 NDA以来、問題の本質を具体的に開示することはできません。



戦う方法。


ツイストを使用して戦います。 一般的な概念は非同期フレームワークには当てはまりますが。



明らかに、解決策はリソースAをロックすることです。理論的には、並列アーキテクチャと同じ構造がロックに適しています:セマフォ、ミューテックス、条件変数。 問題は実装です。 したがって、ミューテックスを検討してください(なぜミューテックスなのか?条件変数が自明なので、セマフォについてはまだアプリケーションを見つけることができませんでした)。



ミューテックスの作り方


オプションが機能していません。 並列アーキテクチャでは、ストリームはスリープするだけで、何もありません。 非同期アーキテクチャでは-スリープ状態になった場合(たとえば、time.sleep(100))-すべてが賭けになり、リソースをブロックしたリクエストの処理に切り替えるためにeventloopが必要になるため、スリープ状態になります-これは起こりません。



オプションが間違っています。 それは、reactor.callLater(1、self.some_method、* args)を介して実装できます。self.some_methodは、ロックが終了するのを待つメソッドです。



欠点は次のとおりです。





オプション。 そして最後に、適切なオプション。 Deferredsで構築された非同期アプリケーションがあります。 その中で何かが起こるためには、Deferredが動作する必要があります。 結論-ロックはDeferred'ahで実行する必要があります。



class Mutex(object): def __init__(self): self.locked = False self.waiters = list() def acquire(self): d = Deferred() if self.locked: self.waiters.append(d) else: self.locked = True d.callback(True) return d def release(self): self.locked = False if self.waiters: self.locked = True d = self.waiters.pop() d.callback(True)
      
      





設計は単純です。リソースにアクセスしたい場合、Deferredを取得します。 Deferredがトリガーされた後にのみ、リソースの使用を開始します。 クラス自体はDeferredの開発を監視し、そのうちの1つが満たされるとすぐに、キューから次のものを取り出して開始します。



指定された実装例は完全ではありません。twisted(DeferredLock)のmutex実装は、 http//twistedmatrix.com/trac/browser/tags/releases/twisted-11.0.0/twisted/internet/defer.pyで見ることができます。 DeferredSemaphoreもあります。



使用例。


さまざまな方法で使用できます。オブジェクト自体がそのアクセスを監視するように作成できます。



 class ImportantObject(object): def __init__(self): self.lock = defer.DeferredLock() def get_lock(self): return self.lock.acquire() def release_lock(self): return self.lock.release()
      
      







または、オブジェクトがデータベースから選択され、セッションに格納されている場合(つまり、リクエストごとにid 1のImportantObjectが異なる場合)、ロックプールを作成できます。



 class Pool(object): __metaclass__ = Singleton def __init__(self, objects_list): self.__objects = dict() for o in objects_list: self.__objects[o.id] = defer.DeferredLock() def acquire(self, o): if o.id not in self.__objects: self.__objects[o.id] = defer.DeferredLock() return self.__objects[o.id].acquire() def release(self, o): self.__objects[o.id].release()
      
      







ここでのプールはわずかに単純化されています。記事が煩雑にならないように、監視対象オブジェクトのリストを更新する方法を「忘れました」。 また、Singletonの実装も見逃しましたが、初心者のPythonistにとっても、それを見つけること/作ることは難しくありません。



そして最後に:



 def multiplex(self, a): def get_value_from_remote_service(skipped, a): d = some_service.do_long_boring_call(a) return d def multiply(result, a): return result*a d = Pool().acquire(a) d.addCallback(get_value_from_remote_service, a) d.addCallback(power,a) return d
      
      







PS:残念なことに、ねじれたドキュメントは完全なものではありません。 さらに、このフレームワークの約30パーセントであれば十分にカバーします。 そのため、競争力の問題を解決したとき、3日以内にさまざまな自転車を発明しました。 ねじれたソースを見ることはまだ推測していません。 したがって、一般的なアドバイス-ねじれた状態での作業-情報源をもっと読んでください。自転車はあらゆる場面で隠れています。



ソース:

公式のねじれたドキュメント。

ねじれたソース



All Articles