Node.jsストリームとリアクティブプログラミング

この記事では、Node.js ストリームと少しのリアクティブプログラミングの実際の問題を解決しようとします。 後者についてはわかりません-RPはある程度「バグを抱えている」(流行語の翻訳方法?)誰もが話しているが、誰も「やっていない」。







この記事は実用的な例を考慮しており、プラットフォームに精通した読者を対象としています。したがって、基本的な概念を意図的に説明するものではありません。







問題の説明から始めましょう。「エイリアン」REST APIからすべてのデータを取得し、何らかの方法でそれを処理して「私たちの」データベースに書き込む「スパイダー」を構築する必要があります。 モデリングの便宜上、特定のAPIおよびデータベースに関する詳細は省略します(実際には、ホテルおよびPostgresデータベースに関連する有名なスタートアップのAPIでした)。







2つの関数があると想像してください(記事のコード全体と同様に、関数コードはここにあります )。







getAPI(n, count) //  -  API.   promise     count    n- insertDB(entries) //  -   .  promise         //,     : getAPI(0, 2).then(console.log) // [{ id: 0}, {id: 1}] getAPI(LAST_ITEM_ID, 1000).then(console.log) // [{id: LAST_ITEM_ID}] –             API. //    count  1000:    1001,       1000  insertDB([{id: 0}]).then(console.log) // { count: 1 }
      
      





簡単にするために、APIとデータベースを使用する場合、エラー処理を意図的に無視します。 関心が生じた場合は、別の記事で検討します。







退屈しないように、私たちの顧客は変態であると言い、彼は次のタスクを提起しました:私たちはデータベースに番号3を含むすべてのidエンティティを見たくないです。 {id: 9} -> {id: 9, timestamp: 1490571732068}



少し先取りされていますが、このような「スパイダー」で対処する必要がある処理およびフィルタリングタスクに似ています。







さあ、始めましょう。 この問題を「正面から」解決しようとしましょう。 ほとんどの場合、このコードに似たものになります。







 function grab(offset = 0, rows = 1000) { offset = offset return getAPI(offset, 1000).then((items) => { if(_.isEmpty(items)) { return } else { return insertDB(items).then(() => grab(offset + rows)) } }) } console.time('transition') grab().then(() => { console.timeEnd('transition') })
      
      





このコードの何が問題になっていますか?







  1. コードをすばやく読むと、コードの実行内容を理解するのが難しくなります。 これはコメントを追加することで修正できますが、それでもコードレベルでは、「リーダー」に、どこから読んでいてどこかに書いていることを理解してもらいたいと思います。
  2. それはあまりにも具体的です-値を処理するためのコードを追加することを想像してください。 どこに追加しますか?
  3. これは再帰的です。つまり、APIに十分な数のエンティティがあると、エラーが発生します。 ... whileに書き換えることで処理されますが、これによりコードが読みやすくなることはほとんどありません。
  4. 彼は非生産的です。 データソースがコンシューマよりもはるかに高速であると想像してください。この状況では、データをバッファに集約し、可能であれば一度に書き込みたいと考えています。


ご想像のとおり、この問題はStreamsで簡単に解決できます。 まず、このタスクを読み取りと書き込みの2つのサブタスクに分割します。







読むことから始めましょう、ReadableStreamを試してみましょう:







 const {Writable, Readable} = require('stream') const {getAPI, insertDB} = require('./io-simulators') const ROWS = 1000 class APIReadable extends Readable { constructor(options) { super({objectMode: true}) this.offset = 0 } _read(size) { getAPI(this.offset, ROWS).then(data => { if(_.isEmpty(data)) { this.push(null) } else { this.push(data) } }) this.offset = this.offset + ROWS } }
      
      





少し大きく見えます。 objectMode: true



に注意する価値がありobjectMode: true



-オブジェクトを操作したいので、このフラグをコンストラクタに渡す価値があります。







では、録音に移りましょう。 Writableストリームを実装します。 このようなもの:







 class DBWritable extends Writable { constructor(options) { super({highWaterMark: 5, objectMode: true}); } _write(chunk, encoding, callback) { insertDB(chunk).asCallback(callback) } _writev(chunks, callback) { const entries = _.map(chunks, 'chunk') insertDB(_.flatten(entries)).asCallback(callback) //   Bluebird-promises,    } }
      
      





次のことに注意してください。







  1. objectMode-Readableストリームと同様に、バイナリデータではなくオブジェクトを操作したい
  2. highWaterMark-バッファーのサイズ。 ここで注意する価値があります。オブジェクトのバッファのサイズを設定しますが、これは実際の次元(ビット-バイト)とは関係ありません。 たとえば、この場合、リストを操作します。
  3. _writev-一度に複数の「断片」データをバッファから処理する方法を説明します


さて、次のようにコードを使用します。







 const dbWritable = new DBWritable() const apiReadable= new APIReadable() apiReadable.pipe(dbWritable)
      
      





これは非常にクールであるように思えますが、コードから、ある場所から読み取り、別の場所に書き込みをしていることが非常に明確になりました。 さらに、読者はコードが非常に効率的に機能し、バッファーを使用していることを確認できます。 さて、イベントループをブロックしないという事実のような、あらゆる種類の小さなパン。







うーん、注意深い読者は尋ねますが、データ処理はどうですか? これを行うには、別のTransformストリームを記述できますが、それはなんとなく「フラットで退屈」です。そのため、 Highland.jsライブラリを使用して、エンティティ「ストリーム」の要素にすべてのお気に入りのフィルターとマップを適用できます。 一般に、Highlandはこの単純なユースケース以上のものですが、小さな記事ではなく、別の記事のトピックです。 このようなもの:







 H(apiReadable) .flatten() .reject(x => _.includes(String(x.id), 3)) .map(function(x) { if(_.includes(String(x.id), 9)) { return _.extend(x, {timestamp: Date.now()}) } else { return x } }) .batchWithTimeOrCount(100, 1000) .pipe(dbWritable)
      
      





私にとっては、リスト操作に非常に似ており、読みやすいです。 .flatten()



および.batchWithTimeOrCount(100, 1000)



.flatten()



、ストリームが個別のオブジェクトではなく配列で動作するためです。







それだけです。 目標を達成し、StreamとHighland.jsの学習に読者が興味を持ったことを願っています。

この記事の英語訳








All Articles