ReactiveX for Pythonを使用したシンプルなRSSアグリゲーターを使用したリアクティブプログラミングの原則







近年、一般に事後対応型プログラミング、特にReactiveXテクノロジーは、開発者の間でますます一般的になっています。 すでにこのアプローチのすべての利点を積極的に使用している人もいれば、「聞いたことがある」だけの人もいます。 私の側では、リアクティブプログラミングのいくつかの概念が、馴染みのあるものの見方をどのように変化させるかを想像できるように支援します。



大規模なシステムを編成するには、2つの根本的に異なる方法があります。システム内に存在するオブジェクトと状態に応じて、およびシステムを通過するデータフローに応じて。 リアクティブプログラミングパラダイムは、データストリームの表現の容易さと、これらのストリームを介した変更の伝播を意味します。 たとえば、命令型プログラミングでは、割り当て操作は結果の有限性を意味しますが、リアクティブプログラミングでは、新しい入力を受け取ったときに再計算されます。 値のストリームは、特定の問題を解決するために必要なシステム内の一連の変換を受けます。 スレッドで操作すると、システムを拡張可能かつ非同期にすることができ、発生したエラーに対する正しい応答はフォールトトレラントです。



ReactiveXは、監視可能なシーケンスを使用して非同期のイベント指向プログラムを作成できるライブラリです。 Observerテンプレートを拡張してデータシーケンスをサポートし、演算子を追加して宣言的に結合することで、同期とスレッドセーフティ、共有データ構造、ノンブロッキングI / Oを処理する必要がなくなります。



ReactiveXライブラリと関数型リアクティブプログラミングの主な違いの1つは、継続的に変化するのではなく、長時間「放出」される離散値で動作することです。



オブザーバー、オブザーバブル、サブジェクトとは何かについて少し話す価値があります。 Observableモデルはデータソースであり、配列などのデータコレクションに使用するのと同様の方法で非同期イベントのストリームを処理できます。 コールバックの代わりにこれらすべてを使用すると、コードが読みやすくなり、エラーが発生しにくくなります。



ReactiveXでは、ObserverはObservableにサブスクライブし、その後、送信する要素または要素のシーケンスに応答します。 Observableにサブスクライブしている各Observerについて、Observer.on_next()メソッドがデータストリームの各要素で呼び出され、その後Observer.on_complete()とObserver.on_error()の両方を呼び出すことができます。 多くの場合、Observableは、誰かがサブスクライブするまでデータの提供を開始しないような方法で使用されます。 これらはいわゆる「遅延計算」です。値は、必要な場合にのみ計算されます。



オブザーバー






イベントに関するメッセージを受信し、サブスクライバーに報告するために、ObserverとObservableを接続する必要があるタスクがあります。 このために、Subjectがあります。Subjectには、標準に加えて、さらにいくつかの実装があります。





ObservableとObserverは、ReactiveXの始まりにすぎません。 それらは、Observableを提供する要素のシーケンスを変換、結合、操作できる演算子の力をすべて備えているわけではありません。



ReactiveXのドキュメントでは、 オペレーターの説明に大理石図の使用が含まれています。 たとえば、これらの図がObservableとその変換を表す方法は次のとおりです。



観測可能



以下の図を見ると、マップ演算子がObservableによって返される要素を、それぞれに関数を適用することで変換していることが簡単にわかります。



地図



ReactiveXの機能の良い例は、RSSアグリゲーターアプリケーションです。 ここでは、値の非同期データのロード、フィルタリング、変換が必要になり、定期的な更新を通じて現在の状態を維持します。



この記事では、ReactiveXの基本原則を表す例が、Pythonプログラミング言語のrxライブラリを使用して記述されています。 ここでは、例えば、抽象的なオブザーバーの実装のように見えます:



class Observer(metaclass=ABCMeta): @abstractmethod def on_next(self, value): return NotImplemented @abstractmethod def on_error(self, error): return NotImplemented @abstractmethod def on_completed(self): return NotImplemented
      
      





リアルタイムのアプリケーションは、Webソケットを介してブラウザとメッセージを交換します。 これを簡単に実装する機能は、 Tornadoによって提供されます。



サーバーを起動すると、プログラムが起動します。 ブラウザがサーバーにアクセスすると、Webソケットが開きます。



コード
 import json import os import feedparser from rx import config, Observable from rx.subjects import Subject from tornado.escape import json_decode from tornado.httpclient import AsyncHTTPClient from tornado.platform.asyncio import AsyncIOMainLoop from tornado.web import Application, RequestHandler, StaticFileHandler, url from tornado.websocket import WebSocketHandler asyncio = config['asyncio'] class WSHandler(WebSocketHandler): urls = ['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open(self): print("WebSocket opened") #       def on_message(self, message): obj = json_decode(message) #  ,   user_input self.subject.on_next(obj['term']) def on_close(self): #   Observable;      observable self.combine_latest_sbs.dispose() print("WebSocket closed") class MainHandler(RequestHandler): def get(self): self.render("index.html") def main(): AsyncIOMainLoop().install() port = os.environ.get("PORT", 8080) app = Application([ url(r"/", MainHandler), (r'/ws', WSHandler), (r'/static/(.*)', StaticFileHandler, {'path': "."}) ]) print("Starting server at port: %s" % port) app.listen(port) asyncio.get_event_loop().run_forever()
      
      







ユーザーが入力したリクエストを処理するために、サブジェクトが作成され、サブスクリプションでデフォルト値(この場合は空の文字列)を送信し、その後、1秒に1回ユーザーが入力したものを送信し、条件を満たす:長さ0または2以上の値が変更されました。

 # Subject   Observable,  Observer self.subject = Subject() user_input = self.subject.throttle_last( 1000 #        ).start_with( '' #         ).filter( lambda text: len(text) == 0 or len(text) > 2 ).distinct_until_changed() #    
      
      





また、定期的なニュースの更新には、60秒ごとに値を返すObservableが提供されます。



 interval_obs = Observable.interval( 60000 #     60 (  ) ).start_with(0)
      
      





これらの2つのストリームはcombine_latest演算子によって接続されており、Observableはニュースのリストを取得するためにチェーンに組み込まれています。 このObservableのサブスクリプションが作成された後、この時点でのみチェーン全体が機能し始めます。



 # combine_latest  2       # ,        self.combine_latest_sbs = user_input.combine_latest( interval_obs, lambda input_val, i: input_val ).do_action( #      #        lambda x: send_response('clear') ).flat_map( #    Observable    self.get_data ).subscribe(send_response, on_error) #  ;        
      
      





「ニュースのリストを取得するために観測可能」とは何かについて、より詳細に説明する必要があります。 ニュースを受信するためのurlリストから、関数に要素が入るデータストリームを作成します。TornadoAsyncHTTPClient HTTPクライアントを使用して、urlsリストの各要素のデータを非同期にダウンロードします。 また、ユーザーが入力したリクエストによってフィルタリングされるデータストリームも作成します。 各ストリームから、フロントエンドへの送信に必要な形式につながる5つのニュースアイテムを取得します。



コード
 def get_rss(self, rss_url): http_client = AsyncHTTPClient() return http_client.fetch(rss_url, method='GET') def get_data(self, query): # Observable    url return Observable.from_list( self.urls ).flat_map( #   url  Observable,    lambda url: Observable.from_future(self.get_rss(url)) ).flat_map( #   ,    Observable lambda x: Observable.from_list( feedparser.parse(x.body)['entries'] ).filter( #          lambda val, i: query in val.title or query in val.summary ).take(5) #    5    url ).map(lambda x: {'title': x.title, 'link': x.link, 'published': x.published, 'summary': x.summary}) #      
      
      







出力データストリームが形成された後、そのサブスクライバは要素ごとにデータを受信し始めます。 send_response関数は受信した値をフロントエンドに送信し、フロントエンドはニュースをリストに追加します。



 def send_response(x): self.write_message(json.dumps(x)) def on_error(ex): print(ex)
      
      





フィーダー.jsファイル内



コード
  ws.onmessage = function(msg) { var value = JSON.parse(msg.data); if (value === "clear") {$results.empty(); return;} // Append the results $('<li><a tabindex="-1" href="' + value.link + '">' + value.title +'</a> <p>' + value.published + '</p><p>' + value.summary + '</p></li>' ).appendTo($results); $results.show(); }
      
      







したがって、データをサーバーからフロントエンドに送信するプッシュテクノロジが実装され、フロントエンドはニュースを検索するためにユーザーが入力したリクエストのみを送信します。



結論として、Observableの代わりにコールバックを使用する通常のアプローチ、データストリームを簡単に結合する機能、フロントエンドコンシューマに即座にデータを送信する機能、クエリ文字列の変更を追跡する必要性を使用して、どのような実装が実現するかを考えることを提案します。 Python開発者の間では、この技術は実際のところあまり普及していませんが、現在のプロジェクトに適用するいくつかの可能性をすでに見ています。



RSSアグリゲーターデモプロジェクトのあるgithub リポジトリで、ReactiveX for Pythonを使用する例を見つけることができます。



All Articles