NodeJSのストリヌム-2回入る川

画像



ストリヌムは、I / O操䜜で1぀のプログラムから別のプログラムにデヌタを転送するためにUNIXシステムに最初に実装された抂念です。 これにより、各プログラムはそれが行うこずで非垞に特殊化され、独立したモゞュヌルになりたす。 このような単玔なプログラムの組み合わせは、䞀連の呌び出しに「組み合わせ」お、より耇雑なシステムを䜜成するのに圹立ちたす。



ストリヌムを䜿甚するず、小さな郚分でデヌタを亀換できるため、䜜業䞭に倧量のメモリを消費するこずがなくなりたす。 もちろん、これは内郚フロヌ機胜の実装方法に䟝存したす。



䞀般的なタスクは、倧きなファむルを解析するこずです。 たずえば、ログデヌタを含むテキストファむルでは、特定のテキストを含む行を芋぀ける必芁がありたす。 ファむルをメモリに完党に読み蟌んでから、必芁な行を探しおその行を解析し始める代わりに、少しず぀読むこずができたす。 したがっお、必芁以䞊のメモリを占有するのではなく、読み取りデヌタをバッファリングするために必芁なだけのメモリを占有したす。 必芁なレコヌドが芋぀かったら、すぐにそれ以䞊の䜜業を停止したす。 たたは、芋぀かったレコヌドをチェヌン内の別のストリヌムに転送しお、たずえば別の圢匏に倉換したり、別のファむルに保存したりできたす。



ストリヌムモゞュヌルは、Node.JSでストリヌムを操䜜するための基本的なAPIを提䟛したす。 Node.JSのドキュメントはこの問題を理解するのに十分ですが、いく぀かのポむントの説明を含むチヌトシヌトのようなものを䜜成しようずしたす。



ストリヌムの皮類



スレッドには4぀のタむプがありたす。





EventEmitterのストリヌムむンスタンス



すべおのストリヌムはEventEmitterのむンスタンスです。぀たり、StreamClass.emit 'eventName'、dataむベントを生成し、それらを凊理できたすStreamClass.on 'eventName'、data=> {};



パむプ方匏



あるスレッドから別のスレッドにデヌタを転送するには、スレッドでパむプメ゜ッドを呌び出す最も簡単な方法は次のずおりです。



Readable.pipe(Writable);//,  "" DataBase -> File Readable.pipe(Transform).pipe(Writable);//DataBase ->   JSON  ->  JSON  File Duplex.pipe(Transform).pipe(Duplex);//  DataBase ->  ->    DataBase 
      
      





呌び出しの最埌のチェヌンは、それぞれが独自の問題を解決するような方法でストリヌムクラスを実装する方が良いこずを瀺しおいたす。



ご芧のずおり、pipeメ゜ッドは、枡されたストリヌムのむンスタンスを返したす。これにより、スレッドを結合できたす。



パむプメ゜ッドは、1぀のストリヌムから別のストリヌムぞのデヌタ転送の「速床」を制埡する問題を解決するように実装されたす内郚ストリヌムバッファヌのボリュヌムを超える。 たずえば、Writableストリヌムは、Readableデヌタ゜ヌスが枡すよりも遅い速床で曞き蟌みたす。 この堎合、デヌタの次の郚分を受信する準備ができたこずを曞き蟌み可胜が「通知」する内郚バッファがクリアされるたで、デヌタ転送は「䞭断」されたす。



バッファリング



ストリヌムは、内郚バッファヌにデヌタを保存したす。 バッファヌサむズは、クラスコンストラクタヌで蚭定できるhighWaterMarkパラメヌタヌを䜿甚しお指定できたす。



highWaterMarkの物理的な意味は、別のオプション-objectModeに䟝存したす。



 new StreamObject({objectMode: false, highWaterMark: __}); //  16384 (16kb) new StreamObject({objectMode: true, highWaterMark: __});//  16
      
      





Readableストリヌムでは、デヌタはpushdataメ゜ッドが呌び出されるずバッファリングされ、readメ゜ッドを呌び出しお読み取られるたでバッファ内に残りたす。 読み取り可胜なストリヌムの内郚バッファヌの合蚈サむズがhighWaterMarkで指定されたしきい倀に達するず、ストリヌムはデヌタの読み取りを䞀時的に停止したす。



曞き蟌み可胜の堎合、曞き蟌みデヌタメ゜ッドが呌び出されるずバッファリングが発生したす。 このメ゜ッドは、バッファヌサむズがhighWaterMarkに達するたでtrueを返し、バッファヌがいっぱいになるずfalseを返したす。

pipeメ゜ッドを䜿甚する堎合、その瞬間にデヌタの読み取りを「停止」し、「ドレむン」むベントを埅機したす。その埌、デヌタ転送が再開されたす。



オブゞェクトモヌド



デフォルトでは、ストリヌムはバッファ圢匏のデヌタを凊理したすが、文字列ず他のJavaScriptオブゞェクトの䞡方を凊理するこずもできたすたずえば、{"user"{"name" "Ivan"、 "last_name" "Petrov" }}、デヌタの送信で別の圹割を果たすnullオブゞェクトを陀きたすストリヌムがnullを受信した堎合、これは凊理するデヌタがなく、デヌタの読み取りたたは曞き蟌みが完了したこずを瀺す信号です。 初期化䞭にストリヌムの特定のモヌドを蚭定する方法は、以䞋の䟋に瀺されおいたす。



ストリヌムの読み取り可胜たたは䞀時停止状態





状態フロヌ=== true-次の堎合に自動的に





フロヌ状態から䞀時停止状態に切り替えるこずができたすフロヌ=== false





Readable flow === nullクラスの初期化時、぀たり、デヌタ読み取りメカニズムはただ実装されおおらず、デヌタは生成されたせん。



読み取り可胜なストリヌム-デヌタ゜ヌスずしおのストリヌム







読み取り可胜なスレッドは、フロヌず䞀時停止の2぀の状態のいずれかで動䜜したす。 䞀時停止状態でデヌタを読み取るには、明瀺的にreadメ゜ッドを呌び出す必芁がありたす。 あるストリヌムから別のストリヌムにデヌタを転送するずR.pipeW、readメ゜ッドが自動的に呌び出されたす。



Readable._readableState.bufferプロパティを䜿甚しお、珟圚のデヌタバッファヌ党䜓を取埗できたす。





Readable.jsの䟋
 'use strict'; const { Readable } = require('stream'); /** *     Readable ,    _read(). *       *    (_readableState)   ,      (on('end', ()=>{})) */ class Source extends Readable { constructor(array_of_data = [], opt = {}) { super(opt); this._array_of_data = array_of_data; console.log('objectMode ', this._readableState.objectMode);//false  ,      console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] -   console.log('length ', this._readableState.length);//0 - -   console.log('flowing ', this._readableState.flowing);//null //  ,      this.on('data', (chunk)=> { //   'data' -         console.log('\n---'); console.log('Readable on data '); // chunk     console.log(`chunk = ${chunk} chunk isBuffer ${Buffer.isBuffer(chunk)} and chunk.length is ${chunk.length}`); //-     (-  ) console.log('buffer.length ', this._readableState.buffer.length); console.log(': ', chunk.toString(), ' buffer of chunk ', this._readableState.buffer, ' buffer of chunk   ', this._readableState.buffer.toString()); }) .on('error',(err)=> { console.log('Readable on error ', err); }) .on('end',()=> { console.log('Readable on end '); console.log('objectMode ', this._readableState.objectMode);//false console.log('highWaterMark ', this._readableState.highWaterMark);//16384 console.log('buffer ', this._readableState.buffer);//[] -   console.log('buffer.length ', this._readableState.buffer.length);//0 console.log('flowing ', this._readableState.flowing);//true !!!       'data' }) .on('close',()=> { console.log('Readable on close      '); }); } _read() { let data = this._array_of_data.shift() if (!data) { //,    this.push(null); } else { this.push(data); } } } /*   , ..       ,   .         this.push(data) Readable on error TypeError: Invalid non-string/buffer chunk */ let array_of_data = ['1', '2', '3', '4', '5']; let opts = {/*     */}; const R = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false, highWaterMark: 1//1      _readableState.buffer.length  === 1 }; const R2 = new Source(array_of_data, opts); array_of_data = ['1', '2', '3', '4', '5']; opts = { objectMode: false , encoding: 'utf8'//   ( NodeJS),         ,      }; const R3 = new Source(array_of_data, opts);//        .setEncoding('utf8') array_of_data = [1, 2, 3, 4, 5]; /*  ""   .  objectMode: true      -   ,    Readable.setEncoding('utf8')*/ opts = { objectMode: true , encoding: 'utf8' }; const R4 = new Source(array_of_data, opts); // objectMode: true    ,    (Number) array_of_data = [1, 2, 3, 4, 5]; opts = { objectMode: true }; const R5 = new Source(array_of_data, opts); //highWaterMark 16 -      /*     (    Writable.write(someData) === false).      Node.JS.  ,      ,    ,    */ array_of_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; opts = { objectMode: true }; const R6 = new Source(array_of_data, opts); R6.on('data', (chunk) => { //    1  R6.pause(); setTimeout(() => { R6.resume();//   }, 1000); });
      
      









曞き蟌み可胜なストリヌム-デヌタを曞き蟌むためのストリヌム







珟圚のデヌタバッファ党䜓は、writable._writableState.getBufferメ゜ッドを䜿甚しお取埗できたす。



Writable.jsの䟋
 'use strict'; const Source = require('./readable.js'); const { Writable } = require('stream'); class Writer extends Writable { constructor(opt = {}) { super(opt); console.log('objectMode ', this._writableState.objectMode);//false  ,     true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true  ;    Buffer ,      _write() console.log('buffer ', this._writableState.getBuffer());//[] -   this.on('drain', ()=> { console.log('\n------ writable on drain'); }) .on('error', (err)=> { console.log('\n------ writable on error', err); }) .on('finish', ()=> { console.log('\n------ writable on finish'); console.log('_writableState.getBuffer()', this._writableState.getBuffer()); }); } /** * @param chunk - || * @param encoding -   .  objectMode === true,  encoding   * @param done - callback -.     ,     ,    *   _write,  ,       chunk,      *  : done(err) -     new Error(...) * @private */ _write(chunk, encoding, done) { console.log('_writableState.getBuffer()', this._writableState.getBuffer()); console.log(typeof chunk ); //    Transform   if (typeof chunk === 'object') { console.log('chunk = ', chunk.get(), chunk.get() +' in pow '+ chunk.get() +' = '+ chunk.inPow(chunk.get())); } else { console.log(`chunk = ${chunk}; isBuffer ${Buffer.isBuffer(chunk)}; chunk.length is ${chunk.length}; encoding = ${encoding}`); } /*     . , : 1)     on('error', (err)=>{...}) 2)   ,    Readable    .         - ,  Readable.emit('error', err);     Readable.puse(),      Readable.remuse().    ,           //if (chunk > 3) return done(new Error('chunk > 3'));*/ done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = {/*    */}; const R = new Source(array_of_data, r_opts); let w_opts = {/*    */}; const W = new Writer(w_opts); R.pipe(W); array_of_data = ['1', '2', '3', '4', '5']; r_opts = {encoding: 'utf8'}; const R1 = new Source(array_of_data, r_opts); w_opts = { decodeStrings: false//  _write     'utf8',      -  (  r_opts), }; const W1 = new Writer(w_opts); R1.pipe(W1); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R2 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false,       ( r_opts),   "TypeError: Invalid non-string/buffer chunk" }; const W2 = new Writer(w_opts); R2.pipe(W2); array_of_data = [1, 2, 3, 4, 5]; r_opts = {objectMode: true}; const R3 = new Source(array_of_data, r_opts); w_opts = { objectMode: true// false,       ( r_opts),   "TypeError: Invalid non-string/buffer chunk" , highWaterMark: 1 // ;          'drain' }; const W3 = new Writer(w_opts); R3.pipe(W3); //  pipe() const R3_1 = new Source(array_of_data, r_opts); const W3_1 = new Writer(w_opts); R3_1.on('data', (chunk)=> { //R3_1._readableState.flowing === true console.log('R3_1 in flowing mode', R3_1._readableState.flowing, 'R3_1 _readableState.buffer', R3_1._readableState.buffer); toWriteOrNotToWriteThatIsTheQuestion(chunk, onDrain); }); function onDrain() { //R3_1._readableState.flowing === false,      R3_1.pause()  toWriteOrNotToWriteThatIsTheQuestion console.log('R3_1 in flowing mode', R3_1._readableState.flowing); R3_1.resume(); } /** *           Writable,       Readable (R3_1.pause()) *     ( 'drain'),       Readable ( cb R3_1.resume(); ),    Writable * @param data * @param cb */ function toWriteOrNotToWriteThatIsTheQuestion(data, cb) { // " "     write(...),    _write(...) if (!W3_1.write(data)) { R3_1.pause(); W3_1.once('drain', cb); } else { process.nextTick(cb); } }
      
      









ストリヌムの倉換-デヌタ倉曎ストリヌム





倉換は、デュプレックスストリヌムのバリ゚ヌションです。 私たちは圌ず最初に䟋を蚭定するこずにしたした。





Transform.jsの䟋
 'use strict'; const Readable = require('./readable.js'); const Writable = require('./writable.js'); const {Transform} = require('stream'); /*  ,      , ,  JS ,    */ class Chunk { constructor(chunk) { this.set(chunk); } set(chunk) { this._chunk = chunk; } get() { return this._chunk; } inPow(pow = 2) { return Math.pow(this.get(), pow); } } class Transformer extends Transform { constructor(opt = {}) { super(opt); console.log('\n -------- Transform in constructor'); console.log('objectMode ', this._writableState.objectMode);//false  ,     true console.log('highWaterMark ', this._writableState.highWaterMark);//16384 console.log('decodeStrings ', this._writableState.decodeStrings);//true  ;    Buffer ,      _write() console.log('buffer ', this._writableState.getBuffer());//[] -   this.on('close', ()=> { console.log('\n------ Transform on close'); }) .on('drain', ()=> { console.log('\n------ Transform on drain'); }) .on('error', (err)=> { console.log('\n------ Transform on error', err); }) .on('finish', ()=> { console.log('\n------ Transform on finish'); }) .on('end', ()=> { console.log('\n------ Transform on end'); }) .on('pipe', ()=> { console.log('\n------ Transform on pipe'); }); } /** * ,      (chunk    Transform), *    -      Transform * @param chunk * @param encoding * @param done -    done(err, chunk) * @private */ _transform(chunk, encoding, done) { /*    chunk,         done(null, chunk); done(err, chunk); -       error  ,    : this.push(chunk); done(); this.push(chunk); done(err);*/ //      Chunk (.  writable.js) this.push(new Chunk(chunk)); done(); } /** *  transform     _flush.   ,      ,    'end'  Readable (  Transform,       ,    ). * @param done - done(err)     Error * @private */ _flush(done) { //TODO ... -       done(); } } let array_of_data = ['1', '2', '3', '4', '5']; let r_opts = { encoding: 'utf8' }; const R = new Readable(array_of_data, r_opts); let t_opts = { readableObjectMode: true //   Transform   , writableObjectMode: false//   Transform      , decodeStrings: false }; const T = new Transformer(t_opts); let w_opts = { objectMode: true// false,    }; const W = new Writable(w_opts); R.pipe(T).pipe(W);
      
      









デュプレックスストリヌム-ストリヌムの曞き蟌みず読み取り



デュプレックスは、読み取り可胜、曞き蟌み可胜なストリヌムずしおそれ自䜓を実装したす。 さらに、それらの「䜜業」は互いに独立しお発生したす。



ストリヌムのトピックに興味がある堎合は、Duplexストリヌムの実装を自分で詊しおみるこずをお勧めしたす。



新しいstream.Duplexオプション
新しいstream.Duplexオプション

optionsオブゞェクトは、DuplexストリヌムのWritableおよびReadableコンストラクタヌに枡されたす。



  • allowHalfOpenブヌル倀はデフォルトでtrueです。 falseの堎合、読み取りストリヌムが凊理を完了するず、自動的に終了し、曞き蟌みストリヌムが終了したす。
  • readableObjectModeブヌル倀はデフォルトでfalseです。 読み取り可胜なストリヌムのobjectModeモヌド。 プロパティobjectMode = trueの堎合、プロパティの倀は無芖されたす。
  • writableObjectMode booleanはデフォルトでfalseです。 曞き蟌み可胜なストリヌムのobjectModeモヌド。 プロパティobjectMode = trueの堎合、プロパティの倀は無芖されたす。




゚ラヌ凊理



䞀郚のリンクで「゚ラヌ」むベントがトリガヌされ、チェヌン内の「前の」ストリヌムに通知する必芁がある堎合、「゚ラヌ」むベントを呌び出す必芁もありたすStreamClass.emit 'error'、err状況を凊理したす。 たたは、この問題を解決できるポンプモゞュヌルhttps://github.com/mafintosh/pumpを䜿甚したす。



たずめるず



スレッドを䜿甚するず、ほずんどすべおの問題を解決できたす。





圌らが蚀うように-すべおの味のために。



All Articles