RubyでのWebsocketのスケーリング

少し前に、著者がRuby、Sinatra、websoketを使用してアプリケーションを作成するためのフレームワークを説明した記事がありました。 しかし、その決定において、水平スケーリングの問題は扱われませんでした。 したがって、ノードの1つに接続すると、ユーザーは同じノードのユーザーによって引き起こされたイベント/変更に関する通知/データのみを受信でき、別のノードを介して変更が行われた場合、ユーザーは認識しません。 この問題を解決するには、共通のデータバスを編成する必要があります。 このタスクは、クライアントとクライアントのメッセージングのコンテキストで検討します。



データバス



タイヤに課せられる要件は次のとおりです。



定期的なポーリングを使用してリポジトリを介して、またはキューサーバーを介してバスを編成できます。

最初のオプションは2番目の条件を満たしていません。 伝送の遅延は、ストレージの調査期間と等しくなります。 期間を短くすると、負荷が増えます。 したがって、このオプションはすぐに破棄されます。



2番目のオプションが最適です。 この場合、 RabbitMQActiveMQに似た特殊なソリューションを使用できます。 これらの製品は両方とも、多くの機能、優れたスケーラビリティを備えた深刻なソリューションです。 それらを使用できますが、これがスズメの大砲になるかどうかを評価する必要があります。 そのようなソリューションに加えて、 Redisはキュー機能を提供し、さらにキー値ストレージも取得しますが、これも必要になります。



Redisは最も単純なPub-Subエンジンを提供しますが、これはタスクに十分です。 十分に高速で使いやすく、伝送遅延が低いです。



解決策



私たちのシステムには次のスキームがあります。







1つのノードのユーザー間のメッセージは直接送信され、ノード間のメッセージはバスを介して送信されます。

これを行うには:

  1. ノードは一意の名前を生成します。
  2. Redisのメッセージを購読します。
  3. このノードに接続されているすべてのクライアントは、クライアント識別子と接続先のノードの識別子の形式でキーと値のペアを記録します。
  4. 別のクライアントにメッセージを送信するとき、ノードの名前を見つけ、メッセージを処理のためにそのキューに転送します。


そして今、私たちは実装しています



websocketのライブラリとして、 faye-websocket-rubyが選択されています。 Redisを使用するには、標準のgem redis(hiredis)+ PubSubのサンプルコードはEventMachineを使用します 。gemからの実装はブロッキングモードで動作し、Webサーバーと同じスレッドで作業する場合、これは無効です。



module App class << self def configuration yield(config) if block_given? config.sessions = Metriks.counter('total_sessions') config.active = Metriks.counter('active_sessions') end def config @config ||= OpenStruct.new( redis: nil, root: nil ) end def id @instance_id ||= SecureRandom.hex end def logger @logger ||= Logger.new $stderr end def register config.redis.multi do config.redis.set "node_#{App.id}", true config.redis.expire "node_#{App.id}", 60*10 end if config.redis EM.next_tick do config.sub = PubSub.connect config.sub.subscribe App.id do |type, channel, message| case type when 'message' begin json = Oj.load(message, mode: :compat) WS::Base.remote_messsage json rescue => ex App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}" end else App.logger.debug "(#{type}) #{channel}:: #{message}" end end @pingpong = EM.add_periodic_timer(30) do App.config.redis.expire "node_#{App.id}", 60 end end rescue config.redis = nil end end end
      
      





このモジュールの主な機能はregisterメソッドです。このメソッドは、バスに自身を登録し、着信メッセージを予期します。 監視のために、 node_%node_id%の形式のキーが作成されます。TTLは60秒、更新期間は30秒で、ノードが落ちた場合に備えて作成されます。 したがって、現在ネットワーク上にあるノードの数とその名前をいつでも確認できます。



 module WS class Base NEXT_RACK = [404, {}, []].freeze def self.call(*args) instance.call(*args) end def self.instance @instance ||= self.new end def self.remote_messsage(json) user = User.get json['from'] instance.send :process, user, json if user rescue => ex user.error( { error: ex.to_s } ) end def initialize @ws_cache = {} end def call(env) return NEXT_RACK unless Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5) user = User.register(ws) ws.onmessage = lambda do |event| json = Oj.load(event.data, mode: :compat) process(user, json ) end ws.onclose = lambda do |event| App.logger.info [:close, event.code, event.reason] user.unregister user = nil end ws.rack_response rescue WS::User::NotUnique => ex ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } }) ws.close ws.rack_response end private def process(user, json) action = json['action'].to_s data = json['data'] return App.logger.info([:message, 'Empty action']) if action.empty? return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}" user.send "on_#{action}", data rescue => ex user.error({ error: ex.to_s }) puts ex.to_s puts ex.backtrace end end end
      
      





このクラスは、接続の確立とメッセージの処理を担当します。 呼び出しメソッドで新しいクライアントが作成され、ハンドラーがハングアップします。 remote_messsageクラスのメソッドは、(バスから)外部メッセージを受信するために使用されます。 プロセスメソッドは、クライアントから直接送信されるメッセージと、バスで送信されるメッセージの単一ポイントです。

お客さま
 module WS class User include UserBehavior attr_reader :id class Error < StandardError; end class RoomFull < Error; end class NotFound < Error attr_reader :id def initialize(id); @id = id end def to_s; "User '@#{id}' not found" end end class NotUnique < Error; end class << self def cache @ws_cache ||= {} end def get(id) fail NotFound.new(id) if id.to_s.empty? @ws_cache.fetch(id) rescue KeyError WS::RemoteUser.new(id) end def register(ws) self.new(ws) end def unregister(ws) url = URI.parse(ws.url) id = url.path.split('/').last get(id).unregister end end def initialize(ws) @ws = ws register @pingpong = EM.add_periodic_timer(5) do @ws.ping('') do App.config.redis.expire @id, 15 if App.config.redis end end end def unregister on_close if respond_to? :on_close App.config.active.decrement App.config.redis.del @id if App.config.redis User.cache.delete(@id) @pingpong.cancel @pingpong = nil @ws = nil @id = nil end def send_client(from, action, data) return unless @ws data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) @ws.send(data) end private def register url = URI.parse(@ws.url) @id = url.path.split('/').last if App.config.redis App.config.redis.multi do App.config.redis.set @id, App.id App.config.redis.expire @id, 15 end App.config.sessions.increment App.config.active.increment end User.cache[@id] = self App.logger.info [:open, @ws.url, @ws.version, @ws.protocol] on_register if respond_to? :on_close self end end class RemoteUser include UserBehavior attr_reader :id attr_reader :node def initialize(id) @id = id.to_s fail WS::User::NotFound.new(id) if @id.empty? @node = App.config.redis.get(@id).to_s fail WS::User::NotFound.new(id) if @node.empty? end def send_client(from, action, data) return if node.to_s.empty? App.logger.info ['REMOTE', self.id, from.id, action] data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat) App.config.redis.publish node, data end end end
      
      







registerメソッドは、ユーザーのIDをユーザーが接続されているノードのIDと照合することでリポジトリに登録し、ローカルリストにキャッシュします。 これに対して、 unregisterメソッドは、すべてのクライアントレコードを削除し、タイマーを削除します。 タイマーは、クライアントのステータスを定期的にチェックし、TTLを更新してRedisに死んだ魂がいないように記録するために使用されます。

クライアントIDは、接続要求が行われたURLから取得されます。 形式はws://%hostname%/ ws /%user_id%です 。user_idはランダムに生成された一意のシーケンスです。



send_clientメソッドは、クライアント自体にデータを送信します。



別の場所はgetクラスメソッドによって占有されます。 このメソッドは、IDでWS :: Userクラスのインスタンスを返すか、ユーザーがローカルキャッシュに見つからない場合、 WS :: RemoteUserクラスのインスタンスを作成します。 作成されると、リポジトリにそのようなIDが存在するかどうか、およびそれが属するノードがチェックされます。 IDが見つからない場合、例外がスローされます。



WS :: Userとは異なり、 WS :: RemoteUserクラスにはsend_clientメソッドが1つしかなく、生成されたメッセージをバス経由で必要なノードに転送します。



したがって、クライアントの場所に関係なく、 send_clientメソッド呼び出すと、データが宛先に配信されます。



 module UserBehavior module ClassMethods def register_action(action, params = {}) return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action block = lambda do |*args | if block_given? data, from = yield(self, *args) send_client from || self, action, data else send_client self, action, args.first end end define_method action, &block define_method "on_#{action}" do |data| self.send action, data end if params[:passthrough] end end def self.included(base) base.instance_exec do extend ClassMethods register_action :message do |user, from, text| [{ to: user.id, text: text }, from] end register_action :error, passthrough: true end end def on_message(data) App.logger.info ['MESSAGE', id, data.to_s] to_user_id = data['to'] to_user = WS::User.get(to_user_id) to_user.message self, data['text'] rescue WS::User::NotFound => ex error({ error: ex.to_s }) end end
      
      





イベント自体の処理は、別のUserBehaviorモジュールで実行されます。このモジュールは、メッセージに応答するためのメソッドで以前の2つのクラスを拡張します。 各メッセージにはFROMACTION、およびDATAフィールドがあります。 最初は、それが誰から来たかを識別し、2番目はメソッドを識別し、3番目は関連データを識別します。 そのため、値が "message"のACTIONの場合、 on_messageメソッドが呼び出され、 DATAフィールドの値が渡されます。



このアプローチを使用して、接続されたクライアント間で透過的なメッセージ転送を実装することが可能であり、それらが同じノード上にあるか、異なるノード上にあるかは関係ありません。 テストのために、異なるポートでいくつかのインスタンスを起動しました。メッセージは正しく送受信されました。



試したい人のために、私はgithubに動作中のアプリケーションコードを投稿しました。 単にラックアップから始まります



PS



この解決策は完全ではありません。改善し、不要なものを削除する方法があると思いますが、出発点としては問題なく機能します。



All Articles