シンプルなチャットでaiohttpをテストする

目次
  • はじめに
  • 構造
  • ルート
  • ハンドラー、リクエストおよびレスポンス
  • 構成設定
  • ミドルウェア
  • データベース
  • パターン
  • セッション、認可
  • 静的
  • Webソケット
  • Herokuにアップロードする





はじめに







昨年の秋、私はキエフでいくつかのpythonミートアップを訪問しました。

Nikolai Novikはそのうちの1つで話をし、Pythonインタープリターのバージョン3で非同期呼び出しのためにasyncioライブラリで実行される新しいaiohttp非同期フレームワークについて話しました。 このフレームワークは、コアpython開発者によって作成され、Web用のpythonフレームワークの概念として位置付けられているという事実に興味を持ちました。







現在、膨大な数の異なるフレームワークがあり、それぞれに独自の哲学があります。

一般的なWebテンプレートの構文と実装。 時間が経つにつれて、このすべての多様性が

aiohttp-1つの基準になります。







構造







aiohttpのすべての機能を最大限にテストするために、Webソケットで簡単なチャットを開発しようとしました。 aiohttpの中核は、ハンドラーがスピンする無限ループです。 ハンドラー-いわゆるコルーチン、入力/出力(I / O)をブロックしないオブジェクト。 このタイプのオブジェクトは、Python 3.4のasyncioライブラリに登場しました。 与えられたオブジェクトのすべての計算が行われるまで、眠りに落ちたように見え、この時点でインタープリターは他のオブジェクトを処理できます。 明確にするために、例を挙げます。 多くの場合、サーバーからの応答はデータベースからの応答を待機し、この応答が到着して処理されるまで、他のオブジェクトが順番に待機するときにすべてのサーバー遅延が発生します。 この場合、他のオブジェクトは、データベースからの応答が到着するまで処理されます。 ただし、これを実装するには、非同期ドライバーが必要です。

現在、aiohttpには、ほとんどの一般的なデータベース( postgresqlmysqlredis )用の非同期ドライバーとラッパーがあります

mongodbには、チャットで使用されるモーターがあります。







チャットのエントリポイントはapp.pyファイルです。 アプリオブジェクトが作成されます。







import asyncio from aiohttp import web loop = asyncio.get_event_loop() app = web.Application(loop=loop, middlewares=[ session_middleware(EncryptedCookieStorage(SECRET_KEY)), authorize, db_handler, ])
      
      





ご覧のとおり、初期化中に、ループはアプリとミドルウェアリストに渡されます。ミドルウェアリストについては後述します。







ルート







aiohttpが非常によく似ているフラスコとは異なり、ルートは既に初期化されたアプリに追加されます。







 app.router.add_route('GET', '/{name}', handler)
      
      





ところで、 Andrei Svetlov説明があります。







ルートは別のroutes.pyファイルに記入されます。







 from chat.views import ChatList, WebSocket from auth.views import Login, SignIn, SignOut routes = [ ('GET', '/', ChatList, 'main'), ('GET', '/ws', WebSocket, 'chat'), ('*', '/login', Login, 'login'), ('*', '/signin', SignIn, 'signin'), ('*', '/signout', SignOut, 'signout'), ]
      
      





最初の要素はhttpメソッド、次にurlが配置され、ハンドラーオブジェクトはタプルの3番目、最後にルート名なので、コードで呼び出すのが便利です。







次に、ルートリストがapp.pyにインポートされ、アプリケーションへの単純なループで埋められます。







 from routes import routes for route in routes: app.router.add_route(route[0], route[1], route[2], name=route[3])
      
      





すべてがシンプルで論理的です。







ハンドラー、リクエストおよびレスポンス







Djangoフレームワークの例に従って、リクエスト処理を行うことにしました。 authフォルダーには、ユーザー、承認、ユーザー作成とログインの処理に関連するすべてが含まれています。 また、 チャットフォルダーには、それぞれチャットのロジックがあります。 aiohttpでは、 ハンドラーを関数またはクラスとして実装できます。

クラスを通じて実装を選択します。







 class Login(web.View): async def get(self): session = await get_session(self.request) if session.get('user'): url = request.app.router['main'].url() raise web.HTTPFound(url) return b'Please enter login or email'
      
      





セッションについては以下に書かれます、そして私が考える他のすべては明確です。 リダイレクトは、戻る(戻る)か、パラメーターがパスに渡されるweb.HTTPFound()オブジェクトの形式で例外をスローすることで発生することに注意してください。 クラスのHttpメソッドは、非同期関数get、postなどを通じて実装されます。 クエリパラメータを使用する必要がある場合、いくつかの機能があります。







 data = await self.request.post()
      
      





構成設定







すべての設定はsettings.pyファイルに保存されます。 機密データの保存には、 envparseを使用します 。 このユーティリティを使用すると、環境変数からデータを読み取り、これらの変数が保存されている特別なファイルを解析できます。







 if isfile('.env'): env.read_envfile('.env')
      
      





第一に、Herokuでプロジェクトを上げる必要があり、第二に、非常に便利でした。 最初はローカルデータベースを使用してから、リモートデータベースでテストし、切り替えは.envファイルの1行のみを変更することで構成されました。







ミドルウェア







アプリケーションを初期化するときに、ミドルウェアを指定できます。 ここでは、それらは別のファイルに配置されます 。 標準実装はデコレータ関数です。この関数では、リクエストを使用してチェックまたはその他のアクションを実行できます。







許可チェックの例







 async def authorize(app, handler): async def middleware(request): def check_path(path): result = True for r in ['/login', '/static/', '/signin', '/signout', '/_debugtoolbar/']: if path.startswith(r): result = False return result session = await get_session(request) if session.get("user"): return await handler(request) elif check_path(request.path): url = request.app.router['login'].url() raise web.HTTPFound(url) return handler(request) else: return await handler(request) return middleware
      
      





データベースを接続するためのミドルウェアもあります。







 async def db_handler(app, handler): async def middleware(request): if request.path.startswith('/static/') or request.path.startswith('/_debugtoolbar'): response = await handler(request) return response request.db = app.db response = await handler(request) return response return middleware
      
      





接続の詳細は次のとおりです。







データベース







チャットは、MongodbとMotorの非同期ドライバーを使用します。 データベースへの接続は、アプリケーションの初期化中に発生します。







 app.client = ma.AsyncIOMotorClient(MONGO_HOST) app.db = app.client[MONGO_DB_NAME]
      
      





そして、特別なシャットダウン機能で接続が閉じられます。







 async def shutdown(server, app, handler): server.close() await server.wait_closed() app.client.close() # database connection close await app.shutdown() await handler.finish_connections(10.0) await app.cleanup()
      
      





非同期サーバーの場合、すべての並列タスクを正しく完了する必要があることに注意してください。







イベントループの作成についてもう少し。







 loop = asyncio.get_event_loop() serv_generator, handler, app = loop.run_until_complete(init(loop)) serv = loop.run_until_complete(serv_generator) log.debug('start server', serv.sockets[0].getsockname()) try: loop.run_forever() except KeyboardInterrupt: log.debug(' Stop server begin') finally: loop.run_until_complete(shutdown(serv, app, handler)) loop.close() log.debug('Stop server end')
      
      





ループ自体はasyncioから作成されます。







 serv_generator, handler, app = loop.run_until_complete(init(loop))
      
      





run_until_completeメソッドは、ループにcorutinesを追加します。 この場合、アプリケーション初期化関数が追加されます。







 try: loop.run_forever() except KeyboardInterrupt: log.debug(' Stop server begin') finally: loop.run_until_complete(shutdown(serv, app, handler)) loop.close()
      
      





実際には、例外の場合に中断される無限ループ自体の実装。 終了する前に、すべての接続を終了し、サーバーを正しく停止するシャットダウン関数が呼び出されます。







次に、クエリの作成方法、データの取得および変更方法を理解する必要があります







 class Message(): def __init__(self, db, **kwargs): self.collection = db[MESSAGE_COLLECTION] async def save(self, user, msg, **kw): result = await self.collection.insert({'user': user, 'msg': msg, 'time': datetime.now()}) return result async def get_messages(self): messages = self.collection.find().sort([('time', 1)]) return await messages.to_list(length=None)
      
      





ORMは使用していませんが、データベースクエリは別のクラスで行う方が便利です。 チャットフォルダーに、Messageクラスが置かれているmodels.pyファイルが作成されました。 get_messagesメソッドでは、保存されたすべてのメッセージを時間でソートして取得するリクエストが作成されます。 saveメソッドでは、メッセージをデータベースに保存するリクエストが作成されます。







パターン







人気のあるテンプレートエンジン、特にaiohttp_jinja2aiohttp_makoの aiohttp用に、いくつかの非同期ラッパーが記述されています。 私はjinja2をチャットに使用しています。







 aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
      
      





これは、テンプレートサポートがアプリケーションで初期化される方法です。

FileSystemLoader( 'templates')は、テンプレートがテンプレートフォルダーにあることをjinja2に伝えます。







 class ChatList(web.View): @aiohttp_jinja2.template('chat/index.html') async def get(self): message = Message(self.request.db) messages = await message.get_messages() return {'messages': messages}
      
      





デコレータを使用して、ビューで使用するテンプレートを指定し、コンテキストを埋めるために、変数で辞書を返し、テンプレートで使用します。







セッション、認可







セッションの操作には、 aiohttp_sessionライブラリがあります。 暗号化を使用して、セッションをRedisまたはCookieに暗号化された形式で保存できます。 ライブラリをインストールする場合でも、保存方法が示されます。







 aiohttp_session[secure]
      
      





セッションを初期化するには、ミドルウェアに追加します。







 session_middleware(EncryptedCookieStorage(SECRET_KEY)),
      
      





セッションで値を取得または配置するには、最初にリクエストから値を抽出する必要があります。







 session = await get_session(request)
      
      





ユーザーを認証するには、ユーザーのIDをセッションに追加し、ミドルウェアでその可用性を確認します。 もちろん、セキュリティにはさらにチェックが必要ですが、コンセプトをテストするにはこれで十分です。







静的







静的コンテンツを含むフォルダーは、アプリケーションの初期化中に別のルートで接続されます。







 app.router.add_static('/static', 'static', name='static')
      
      





テンプレートで使用するには、アプリから取得する必要があります。







 <script src="{{ app.router.static.url(filename='js/main.js') }}"></script>
      
      





すべてが単純で、複雑なものは何もありません。







Webソケット







最後に、aiohttpの最もおいしい部分に到達しました)。 ソケットの実装は非常に簡単です。 javascriptでは、動作するために最低限必要な機能を追加しました。







 try{ var sock = new WebSocket('ws://' + window.location.host + '/ws'); } catch(err){ var sock = new WebSocket('wss://' + window.location.host + '/ws'); } // show message in div#subscribe function showMessage(message) { var messageElem = $('#subscribe'), height = 0, date = new Date(); options = {hour12: false}; messageElem.append($('<p>').html('[' + date.toLocaleTimeString('en-US', options) + '] ' + message + '\n')); messageElem.find('p').each(function(i, value){ height += parseInt($(this).height()); }); messageElem.animate({scrollTop: height}); } function sendMessage(){ var msg = $('#message'); sock.send(msg.val()); msg.val('').focus(); } sock.onopen = function(){ showMessage('Connection to server started') } // send message from form $('#submit').click(function() { sendMessage(); }); $('#message').keyup(function(e){ if(e.keyCode == 13){ sendMessage(); } }); // income message handler sock.onmessage = function(event) { showMessage(event.data); }; $('#signout').click(function(){ window.location.href = "signout" }); sock.onclose = function(event){ if(event.wasClean){ showMessage('Clean connection end') }else{ showMessage('Connection broken') } }; sock.onerror = function(error){ showMessage(error); }
      
      





サーバー側を実装するには、WebSocketクラスを使用します







 class WebSocket(web.View): async def get(self): ws = web.WebSocketResponse() await ws.prepare(self.request) session = await get_session(self.request) user = User(self.request.db, {'id': session.get('user')}) login = await user.get_login() for _ws in self.request.app['websockets']: _ws.send_str('%s joined' % login) self.request.app['websockets'].append(ws) async for msg in ws: if msg.tp == MsgType.text: if msg.data == 'close': await ws.close() else: message = Message(self.request.db) result = await message.save(user=login, msg=msg.data) log.debug(result) for _ws in self.request.app['websockets']: _ws.send_str('(%s) %s' % (login, msg.data)) elif msg.tp == MsgType.error: log.debug('ws connection closed with exception %s' % ws.exception()) self.request.app['websockets'].remove(ws) for _ws in self.request.app['websockets']: _ws.send_str('%s disconected' % login) log.debug('websocket connection closed') return ws
      
      





ソケット自体は、WebSocketResponse()関数を使用して作成されます。 調理する前に必ず使用してください。 開いているソケットのリストは、アプリケーションに保存されます(サーバーが閉じられたときに正しく閉じることができるように)。 新しいユーザーが接続すると、すべての参加者は、新しい参加者がチャットに参加したという通知を受け取ります。 次に、ユーザーからのメッセージを期待します。 有効な場合、データベースに保存し、他のチャット参加者に送信します。

ソケットが閉じると、リストからソケットを削除し、参加者の1人がソケットを離れたことをチャットに通知します。 非常に単純な実装で、視覚的に同期スタイルで、たとえばTornadoのように多くのコールバックがありません。 使用してください。







Herokuにアップロードする







視覚的なデモンストレーションのために、Herokuにテストチャットを投稿しました。 インストール中にいくつかの問題がありました。特に、内部mongodbデータベースを使用するために、クレジットカード情報を入力する必要がありましたが、これはしたくないので、 MongoLabのサービスを使用してデータベースを作成しました。 さらに、アプリケーション自体のインストールに問題がありました。 暗号化をインストールするには、requirements.txtで明示的に指定する必要がありました。 また、pythonのバージョンを示すには、プロジェクトルートにruntime.txtファイルを作成する必要があります。







結論







一般に、チャットの作成、aiohttpの学習、ソケットやその他のテクノロジーの分析は、これまで一緒に行ったことがなかったため、夕方や週末にはめったに3週間かかりました。

aiohttpのドキュメントは非常に優れており、多くの非同期ドライバーとラッパーがテストの準備ができています。

おそらく、すべての製品の準備が整っているわけではありませんが、開発は非常に活発です(aiohttpは3週間でバージョン0.19から0.21に更新されました)。

プロジェクトにソケットを追加する必要がある場合、このオプションは、重い竜巻を追加しないように最適です。







参照資料









すべてのエラーと欠点をPMに送信してください:)







UPD







記事が公開されてから少し時間が経ちました。残念ながら、Herokuのアプリケーションは長い間利用できなかったため、デモのテストはもう機能しません。 コードと依存関係を更新する時間はほとんどありませんでしたが、記事にはまだ古いコードがあります。








All Articles