先日、単純なRedisのようなデータベースサーバーを作成するのがいいと思いました。 私はWSGIアプリケーションを使用したかなりの経験がありますが、データベースサーバーは新たな課題を提示し、Pythonでソケットを操作することを学ぶ上で優れた実践であることが証明されています。 この記事では、研究の過程で学んだことをお伝えします。
私のプロジェクトの目標は、 hueyというタスクキューで使用できる簡単なサーバーを作成することでした 。 HueyはRedisをデフォルトのストレージエンジンとして使用して、キュー内のジョブ、結果などを追跡します。 この記事では、プロジェクトのソースコードを水なしで実行するように短縮しました。 不足しているコードを自分で簡単に追加できますが、興味がある場合は、 最終結果を確認できます。
作成するサーバーは、次のコマンドに応答できます。
GET <key>
-
SET <key> <value>
-
DELETE <key>
-
FLUSH
-
MGET <key1> ... <keyn>
-
MSET <key1> <value1> ... <keyn> <valuen>
また、次のデータ型もサポートします。
- 文字列とバイナリデータ。
- 数字。
- NULL
- 配列(ネスト可能)。
- 辞書(ネストすることができます)。
- エラーメッセージ。
複数のクライアントを非同期的に処理するには、 geventを使用しますが、 ForkingMixin
またはThreadingMixin
を使用して標準ライブラリのSocketServerモジュールを使用することもできます。
プロジェクトのスケルトン
サーバーのスケルトンを作成しましょう。 サーバー自体と、新しいクライアントが接続したときに実行されるコールバック関数が必要です。 さらに、クライアントからのリクエストを処理し、クライアントにレスポンスを送信するためのロジックが必要です。
始めましょう:
from gevent import socket from gevent.pool import Pool from gevent.server import StreamServer from collections import namedtuple from io import BytesIO from socket import error as socket_error # We'll use exceptions to notify the connection-handling loop of problems. class CommandError(Exception): pass class Disconnect(Exception): pass Error = namedtuple('Error', ('message',)) class ProtocolHandler(object): def handle_request(self, socket_file): # Parse a request from the client into it's component parts. pass def write_response(self, socket_file, data): # Serialize the response data and send it to the client. pass class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} def connection_handler(self, conn, address): # Convert "conn" (a socket object) into a file-like object. socket_file = conn.makefile('rwb') # Process client requests until client disconnects. while True: try: data = self._protocol.handle_request(socket_file) except Disconnect: break try: resp = self.get_response(data) except CommandError as exc: resp = Error(exc.args[0]) self._protocol.write_response(socket_file, resp) def get_response(self, data): # Here we'll actually unpack the data sent by the client, execute the # command they specified, and pass back the return value. pass def run(self): self._server.serve_forever()
上記のコードがうまくいけば十分です。 プロトコル処理が、 handle_request
とwrite_response
2つのパブリックメソッドを持つ独自のクラスになるように、タスクを分割しwrite_response
。 サーバー自体は、プロトコルハンドラーを使用してクライアント要求を解凍し、クライアントサーバーへのサーバー応答を繰り返します。 get_response()
メソッドは、クライアントによって開始されたコマンドを実行するために使用されます。
connection_handler()
メソッドを詳細に見ると、ファイルと同様に、ソケットオブジェクトのラッパーが取得されていることがわかります。 このシェルを使用すると、クリーンソケットを操作するときに通常発生する機能の一部を無視できます。 関数は無限ループに入り、クライアントからの要求を読み取り、応答を送信し、クライアントが切断するとループを終了します(空の文字列を返すread()
メソッドでマークされます)。
型付き例外を使用して、クライアントが切断したときに処理し、ユーザーにエラー処理コマンドを通知します。 たとえば、ユーザーが誤って構成されたリクエストをサーバーに送信すると、エラーに応じてシリアル化されてクライアントに送信されるCommandError
をスローします。
先に進む前に、クライアントとサーバーがどのように相互作用するかについて説明しましょう。
有線通信プロトコル
私が最初に遭遇したタスクは、有線通信プロトコルを介したバイナリデータの送信を処理する方法でした。 私がインターネットで見つけた例のほとんどは、ソケットをファイルのようなオブジェクトに変換し、単にreadline()
と呼ばれる無意味なエコーサーバーでした。 いくつかのピクルデータまたは新しい行を含む行を格納する場合、何らかのシリアル化形式が必要になります。
適切なものを考え出そうと時間を費やしたので、Redisプロトコルに関するドキュメントを読むことにしました。
Redisプロトコルは、クライアントの要求/応答関係テンプレートを使用します。 サーバーからの応答は、最初のバイトを使用してデータ型を示し、次にデータがキャリッジリターン/ラインリターンで完了します。
データ型 | プレフィックス | 構造 | 例 |
---|---|---|---|
シンプルライン | + | + {文字列データ} \ r \ n | +これは単純な文字列です\ r \ n |
エラー | - | -{エラーメッセージ} \ r \ n | -ERR不明なコマンド "FLUHS" \ r \ n |
全体 | : | :{数字} \ r \ n | :1337 \ r \ n |
バイナリデータ | $ | $ {バイト数} \ r \ n {データ} \ r \ n | $ 6 \ r \ n
foobar \ r \ n |
配列 | * | * {要素数} \ r \ n {上記の0以上} \ r \ n | * 3 \ r \ n
+単純な文字列要素\ r \ n :12345 \ r \ n $ 7 \ r \ n テスト\ r \ n |
語彙 | % | %{キーの数} \ r \ n {上記の0以上} \ r \ n | %3 \ r \ n
+ key1 \ r \ n + value1 \ r \ n + key2 \ r \ n * 2 \ r \ n + value2-0 \ r \ n + value2-1 \ r \ n :3 \ r \ n $ 7 \ r \ n テスト\ r \ n |
ヌル | $ | $ -1 \ r \ n(長さ-1の文字列) | $ -1 \ r \ n |
プロトコルハンドラクラスを作成して、Redisプロトコルを実装しましょう。
class ProtocolHandler(object): def __init__(self): self.handlers = { '+': self.handle_simple_string, '-': self.handle_error, ':': self.handle_integer, '$': self.handle_string, '*': self.handle_array, '%': self.handle_dict} def handle_request(self, socket_file): first_byte = socket_file.read(1) if not first_byte: raise Disconnect() try: # Delegate to the appropriate handler based on the first byte. return self.handlers[first_byte](socket_file) except KeyError: raise CommandError('bad request') def handle_simple_string(self, socket_file): return socket_file.readline().rstrip('\r\n') def handle_error(self, socket_file): return Error(socket_file.readline().rstrip('\r\n')) def handle_integer(self, socket_file): return int(socket_file.readline().rstrip('\r\n')) def handle_string(self, socket_file): # First read the length ($<length>\r\n). length = int(socket_file.readline().rstrip('\r\n')) if length == -1: return None # Special-case for NULLs. length += 2 # Include the trailing \r\n in count. return socket_file.read(length)[:-2] def handle_array(self, socket_file): num_elements = int(socket_file.readline().rstrip('\r\n')) return [self.handle_request(socket_file) for _ in range(num_elements)] def handle_dict(self, socket_file): num_items = int(socket_file.readline().rstrip('\r\n')) elements = [self.handle_request(socket_file) for _ in range(num_items * 2)] return dict(zip(elements[::2], elements[1::2]))
プロトコルのシリアル化の部分では、逆を行います。Pythonオブジェクトをシリアル化されたコピーに移動します!
class ProtocolHandler(object): # ... above methods omitted ... def write_response(self, socket_file, data): buf = BytesIO() self._write(buf, data) buf.seek(0) socket_file.write(buf.getvalue()) socket_file.flush() def _write(self, buf, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, bytes): buf.write('$%s\r\n%s\r\n' % (len(data), data)) elif isinstance(data, int): buf.write(':%s\r\n' % data) elif isinstance(data, Error): buf.write('-%s\r\n' % error.message) elif isinstance(data, (list, tuple)): buf.write('*%s\r\n' % len(data)) for item in data: self._write(buf, item) elif isinstance(data, dict): buf.write('%%%s\r\n' % len(data)) for key in data: self._write(buf, key) self._write(buf, data[key]) elif data is None: buf.write('$-1\r\n') else: raise CommandError('unrecognized type: %s' % type(data))
独自のクラスでプロトコルを処理することのもう1つの利点は、 handle_request
およびwrite_response
を再利用してクライアントライブラリを作成できることです。
コマンドの実装
これで、設計するServer
クラスにget_response()
メソッドが必要になります。 コマンドはクライアントから送信された単純な文字列またはコマンド引数の配列と見なされるため、 get_response()
渡されるデータパラメータはバイト文字列またはリストのいずれかになります。 処理を簡素化するために、データが単純な文字列であることが判明した場合は、スペースに分割してリストに変換します。
最初の引数は、指定されたコマンドに属する追加の引数を持つコマンドの名前になります。 最初のバイトのProtocolHandler
のハンドラーへのマッピングと同様に、 Server
クラスのコールバック関数へのコマンドのマッピングを作成しましょう。
class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} self._commands = self.get_commands() def get_commands(self): return { 'GET': self.get, 'SET': self.set, 'DELETE': self.delete, 'FLUSH': self.flush, 'MGET': self.mget, 'MSET': self.mset} def get_response(self, data): if not isinstance(data, list): try: data = data.split() except: raise CommandError('Request must be list or simple string.') if not data: raise CommandError('Missing command') command = data[0].upper() if command not in self._commands: raise CommandError('Unrecognized command: %s' % command) return self._commands[command](*data[1:])
サーバーの準備がほぼ完了しました! get_commands()
メソッドで定義されたコマンドに6つのメソッドを実装するだけです。
class Server(object): def get(self, key): return self._kv.get(key) def set(self, key, value): self._kv[key] = value return 1 def delete(self, key): if key in self._kv: del self._kv[key] return 1 return 0 def flush(self): kvlen = len(self._kv) self._kv.clear() return kvlen def mget(self, *keys): return [self._kv.get(key) for key in keys] def mset(self, *items): data = zip(items[::2], items[1::2]) for key, value in data: self._kv[key] = value return len(data)
彼がいる! これでサーバーはリクエストの処理を開始する準備ができました。 次のセクションでは、クライアントを使用してサーバーと対話します。
お客様
サーバーとやり取りするために、 ProtocolHandler
クラスを再利用して簡単なクライアントを実装しましょう。 クライアントはサーバーに接続し、リストとしてエンコードされたコマンドを送信します。 サーバー応答のエンコードおよび処理のリクエストに対して、それぞれwrite_response()
およびhandle_request()
ロジックの両方を再利用します。
class Client(object): def __init__(self, host='127.0.0.1', port=31337): self._protocol = ProtocolHandler() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((host, port)) self._fh = self._socket.makefile('rwb') def execute(self, *args): self._protocol.write_response(self._fh, args) resp = self._protocol.handle_request(self._fh) if isinstance(resp, Error): raise CommandError(resp.message) return resp
execute()
メソッドを使用して、配列としてエンコードされ、サーバーに送信されるパラメーターの任意のリストを渡すことができます。 サーバーの応答は解析され、Pythonオブジェクトとして返されます。 便宜上、個々のコマンドのクライアントメソッドを作成できます。
class Client(object): # ... def get(self, key): return self.execute('GET', key) def set(self, key, value): return self.execute('SET', key, value) def delete(self, key): return self.execute('DELETE', key) def flush(self): return self.execute('FLUSH') def mget(self, *keys): return self.execute('MGET', *keys) def mset(self, *items): return self.execute('MSET', *items)
クライアントをテストするには、コマンドラインからサーバーを直接起動するようにPythonスクリプトを構成しましょう。
# Add this to bottom of module: if __name__ == '__main__': from gevent import monkey; monkey.patch_all() Server().run()
サーバーチェック
サーバーをテストするには、コマンドラインからサーバー側のPythonモジュールを起動するだけです。 別のターミナルで、Pythonインタープリターを開き、サーバーモジュールからClient
クラスをインポートします。 クライアントインスタンスを作成すると、接続が開かれ、コマンドを実行できるようになります!
>>> from server_ex import Client >>> client = Client() >>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3') 3 >>> client.get('k2') ['v2-0', 1, 'v2-2'] >>> client.mget('k3', 'k1') ['v3', 'v1'] >>> client.delete('k1') 1 >>> client.get('k1') >>> client.delete('k1') 0 >>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}}) 1 >>> client.get('kx') {'vx': {'vy': 0, 'vz': [1, 2, 3]}} >>> client.flush() 2
この記事で紹介するコードは、デモンストレーションのみを目的としています。 私がこのプロジェクトについて書くのが好きな方法で、このプロジェクトについて読んで楽しんだことを願っています。 ここで、ソースコードの完全なコピーを見つけることができます。
プロジェクトを拡張する場合は、次のことを考慮してください。
- さらにチームを追加してください!
- プロトコルハンドラーを使用して、追加専用モードでコマンドログを実装します。
- より信頼性の高いエラー処理。
- クライアントが接続を閉じて再接続できるようにします。
- ロギング
- 標準ライブラリおよび
ThreadingMixin
からSocketServer
を使用するようにコードを再配置します。
この出版物の著者はチャールズ・ライファーです。 翻訳-エフゲニー・ザヤテフ。
オリジナルはチャールズのブログで入手できます。