Angular2 + Websocket + RxJS + Rails5

画像



みなさんこんにちは! この記事では、Websocketを使用して、Angular2クライアントアプリケーションをRails 5サーバーに接続する方法について説明します。



レール



たとえば、最も単純なApplicationCable :: Channelが必要です。



class ChatChannel < ApplicationCable::Channel def subscribed stream_from 'chat_channel' end def index ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def create( data ) Message.create( name: data[ 'name' ], text: data[ 'text' ] ); ActionCable.server.broadcast 'chat_channel', { messages: Message.serialize_all( Message.all ) } end def unsubscribed end end
      
      







Angular2



WebSocketService



まず、アプリケーションにサーバーとのデータ交換を提供するサービスが必要です。



 import { Injectable } from "@angular/core"; import { Subject, Observable, Subscription } from 'rxjs/Rx'; import { WebSocketSubject } from "rxjs/observable/dom/WebSocketSubject"; @Injectable() export class WebSocketService { private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string; public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject(); public close():void{ this.socket.unsubscribe(); this.ws.complete(); } public sendMessage( message:string ):void{ this.ws.next( message ); } public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); } }
      
      





このサービスには、3つのプライベート変数と2つのパブリック変数、および3つのパブリック関数があります。



 private ws: WebSocketSubject<Object>; private socket: Subscription; private url: string;
      
      





wsはWebSocketSubject監視可能変数です。

socket-wsにサブスクライブするための変数。

url-ソケット参照。



 public message: Subject<Object> = new Subject(); public opened: Subject<boolean> = new Subject();
      
      





メッセージ-ソケットからのすべてのデータが変換される監視可能な変数。

オープン-ソケット接続の開始/終了を監視する監視可能な変数。



 public close():void{ this.socket.unsubscribe(); this.ws.complete(); }
      
      





ソケットを閉じる関数。



 public sendMessage( message:string ):void{ this.ws.next( message ); }
      
      





ソケットにデータを送信する関数。



 public start( url: string ):void{ let self = this; this.url = url; this.ws = Observable.webSocket( this.url ); this.socket = this.ws.subscribe( { next: ( data:MessageEvent ) => { if( data[ 'type' ] == 'welcome' ){ self.opened.next( true ); } this.message.next( data ); }, error: () => { self.opened.next( false ); this.message.next( { type: 'closed' } ); self.socket.unsubscribe(); setTimeout( () => { self.start( self.url ); }, 1000 ); }, complete: () => { this.message.next( { type: 'closed' } ); } } ); }
      
      





この関数は、ソケットへの接続を開き、そのオブジェクトを監視可能な変数に書き込み、そこからブロードキャストをサブスクライブします。 接続が失われた場合、毎秒、接続の復元が試行されます。



ChannelWebsocketService



継承されたRails5アクションケーブルチャネルサブスクリプションサービス:



 import { Injectable } from "@angular/core"; import { Subject } from "rxjs/Subject"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChannelWebsocketService { private socketStarted: boolean; public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public identifierStr: string; public subscribed: Subject<boolean> = new Subject(); constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; } private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); }; private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; } private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); } private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); } public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); } public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); } }
      
      





このサービスには、2つのプライベート変数と3つのパブリック変数、および7つのプライベート関数と2つのパブリック関数があります。



 private socketStarted: boolean; private identifierStr: string;
      
      





socketStarted-ソケットへのサブスクリプションのステータスが変換される変数。

identifierStr-Rails5アクションケーブルチャネル用に特別に準備された識別子文字列。



 public observableData: Subject<Object> = new Subject(); public identifier:Object = {}; public subscribed: Subject<boolean> = new Subject();
      
      





observableData-チャネルのソケットからのメッセージが書き込まれる監視対象変数。

identifier-Rails5アクションケーブルチャネルの識別子オブジェクト。

subscribed-サブスクリプション状態が書き込まれる監視変数。



 constructor( private websocketService: WebSocketService ){ this.observeOpened(); this.observeMessage(); } ... private observeMessage(){ let self = this; this.websocketService.message.subscribe( ( data: Object ) => { if( self.isThisChannel( data ) ){ if( data[ 'type' ] && data[ 'type' ] == 'confirm_subscription' ){ this.subscribed.next( true ); } else if ( data[ 'message' ] ){ this.observableData.next( data[ 'message' ] ); } } } ); } private observeOpened(){ let self = this; this.websocketService.opened.subscribe( ( data: boolean ) => { self.socketStarted = data; if( data ){ self.subscribe(); } } ); }
      
      





このサービスのコンストラクターで、2つの関数observeMessageとobserveOpenedを呼び出します。これらの関数は、それぞれソケットによって送信されたデータとソケットの状態を監視します。



 private static encodeIdentifier( identifier:string ):Object{ return JSON.parse( identifier ); } private static getDataString( parameters:Object ):string{ let first = true, result = ''; for ( let key in parameters ){ if( first ){ first = false; result += `\"${ key }\":\"${ parameters[ key ] }\"`; } else { result += `, \"${ key }\":\"${ parameters[ key ] }\"`; } } return `{ ${ result } }`; }
      
      





encodeIdentifier-静的プライベート関数は、ソケットによって返された識別子文字列をデコードして、メッセージがチャネルに属していることを識別します。

getDataString-オブジェクトをRails5アクションケーブルが受け入れる文字列形式に変換します。



 private getSubscribeString():string{ this.identifierStr = ChannelWebsocketService.getDataString( this.identifier ); return JSON.stringify( { command: 'subscribe', identifier: this.identifierStr } ); };
      
      





Rails5アクションケーブルチャネルにサブスクライブする文字列を返します。



 private isThisChannel( data:Object ):boolean { if( data[ 'identifier' ] ){ let identifier = ChannelWebsocketService.encodeIdentifier( data[ 'identifier' ] ); if ( JSON.stringify( identifier ) === JSON.stringify( this.identifier ) ){ return true; } } return false; }
      
      





メッセージがこのチャネルのソケットに属しているかどうかを判別します。



 private subscribe(){ this.websocketService.sendMessage( this.getSubscribeString() ); }
      
      





チャンネルを購読します。



 public send( data: Object ){ this.websocketService.sendMessage( JSON.stringify( { command:'message', identifier: this.identifierStr, data: ChannelWebsocketService.getDataString( data ) } ) ); }
      
      





Rails5アクションケーブルチャネルにデータを送信します。



 public unsubscribe(){ this.websocketService.sendMessage( JSON.stringify( { command: 'unsubscribe', identifier: this.identifierStr } ) ); this.subscribed.next( false ); }
      
      





チャンネルから退会します。



ChatChannelService



ChatChannelにサブスクライブするためにChannelWebsocketServiceから継承されたサービス:



 import { Injectable } from "@angular/core"; import { ChannelWebsocketService } from "./channel.websocket.service"; import { WebSocketService } from "./websocket.service"; @Injectable() export class ChatChannelService extends ChannelWebsocketService { constructor( websocketService: WebSocketService ){ super( websocketService ); this.identifier = { channel: 'ChatChannel' }; } }
      
      





このサービスのコンストラクターは、チャネルを識別するために識別子変数をオーバーライドします。



ChatComponent



ChatChannelServiceを使用して、チャネルとの間でデータを送受信するコンポーネント。

コードの例を挙げません。GitHubにあります。リンクは以下にあります





ここからサンプルをダウンロードできます。



クライアントアプリケーションを起動するには、「クライアント」フォルダーに移動して呼び出します。



 npm install gulp
      
      





この記事がこの問題の理解に役立つことを願っています。



All Articles