イベントマシンがライフサイクルを保護

免責事項:この記事では、非自明な問題に対する非自明な解決策について説明します。 急ぐ前に たまご それを実践するために、私は記事を最後まで読み、二度考えることを勧めます。

but_why







みなさんこんにちは! コードを使用する場合、多くの場合stateを処理する必要があります 。 そのようなケースの1つは、オブジェクトのライフサイクルです。 いくつかの可能な状態でオブジェクトを管理することは、非常に重要なタスクです。 ここに非同期実行を追加すると、タスクは桁違いに複雑になります。 効果的で自然な解決策があります。 この記事では、イベントマシンとGoでのイベントマシンの実装方法について説明します。







なぜ州を管理するのですか?



まず、コンセプト自体を定義しましょう。 最も単純な状態の例:ファイルとさまざまな接続。 ファイルを受け取って読むことはできません。 最初に開かなければならず、最後に できれば 必ず閉じてください。 現在のアクションは前のアクションの結果に依存していることがわかります。読み取りは開始に依存します。 保存された結果は状態です。







状態の主な問題は複雑さです。 どの状態でもコードは自動的に複雑になります。 アクションの結果をメモリに保存し、ロジックにさまざまなチェックを追加する必要があります。 これが、ステートレスアーキテクチャがプログラマにとって非常に魅力的である理由です-誰も望んでいません トラブル 困難。 アクションの結果が実行ロジックに影響しない場合、状態は必要ありません。







ただし、困難を考慮する1つのプロパティがあります。 州では、特定のアクションの順序に従う必要があります。 一般に、このような状況は回避する必要がありますが、これは常に可能とは限りません。 例は、プログラムオブジェクトのライフサイクルです。 良好な状態管理のおかげで、複雑なライフサイクルを持つオブジェクトの予測可能な動作を取得できます。







それでは、 クールな方法を考えてみましょう。







問題を解決する方法としての自動



AK74







人々が状態について話すとき、有限状態マシンはすぐに思い浮かびます。 オートマトンは状態を管理する最も自然な方法であるため、論理的です。







オートマトン理論については詳しく説明しませんが、インターネットには十分な情報があります。

Goの有限状態マシンの例を探すと、Rob Pikeのレクサーに必ず出会うでしょう。 入力アルファベットが処理中のデータであるマシンの素晴らしい例。 つまり、状態遷移はレクサーが処理するテキストによって引き起こされます。 特定の問題に対するエレガントなソリューション。







理解すべき主なことは、オートマトンが厳密に特定の問題の解決策であることです。 したがって、すべての問題の解決策として考慮する前に、タスクを完全に理解する必要があります。 具体的には、制御するエンティティ:









字句解析器は美しいですが、処理するデータが原因で状態が変化するだけです。 しかし、ユーザーが遷移を呼び出す状況はどうでしょうか? これは、イベントマシンが役立つ場所です。







実際の例



わかりやすくするために、 phono



ライブラリの例を分析します。







コンテキストに完全に没頭するには、 入門記事を読むことができます。 これはこのトピックには必要ありませんが、私たちが管理しているものをよりよく理解するのに役立ちます。


そして、私たちは何を管理していますか?



phono



はDSPパイプラインに基づいています。 処理の3つの段階で構成されます。 各ステージには、1つからいくつかのコンポーネントが含まれます。







pipe_diagram







  1. pipe.Pump



    (英語のポンプ)は、常に1つのコンポーネントのみの、音を受信する必須の段階です。
  2. pipe.Processor



    (英語ハンドラー)-サウンド処理のオプションのステージ pipe.Processor



    コンポーネント。
  3. pipe.Sink



    (英語シンク)-音声伝送の必須段階pipe.Sink



    個のコンポーネント。


実際には、コンベヤのライフサイクルを管理します。







ライフサイクル



これがpipe.Pipe



状態図の外観です。







pipe_lifecycle







斜体は、内部実行ロジックによって引き起こされる遷移を示します。 太字 -イベントによる遷移。 この図は、状態が2つのタイプに分けられていることを示しています。









コードの詳細な分析の前に、すべての状態の使用の明確な例:







 // PlayWav  .wav    portaudio  -. func PlayWav(wavFile string) error { bufferSize := phono.BufferSize(512) //      w, err := wav.NewPump(wavFile, bufferSize) //  wav pump if err != nil { return err } pa := portaudio.NewSink( //  portaudio sink bufferSize, w.WavSampleRate(), w.WavNumChannels(), ) p := pipe.New( //  pipe.Pipe    ready w.WavSampleRate(), pipe.WithPump(w), pipe.WithSinks(pa), ) p.Run() //    running   p.Run() errc := p.Pause() //    pausing   p.Pause() err = pipe.Wait(errc) //     paused if err != nil { return err } errc = p.Resume() //    running   p.Resume() err = pipe.Wait(errc) //     ready if err != nil { return err } return pipe.Wait(p.Close()) //      }
      
      





さあ、まず最初に。







すべてのソースコードはリポジトリで入手できます


状態とイベント



最も重要なことから始めましょう。







 // state      . type state interface { listen(*Pipe, target) (state, target) //    transition(*Pipe, eventMessage) (state, error) //   } // idleState  .        . type idleState interface { state } // activeState  .         //   . type activeState interface { state sendMessage(*Pipe) state //    } //  . type ( idleReady struct{} activeRunning struct{} activePausing struct{} idlePaused struct{} ) //  . var ( ready idleReady running activeRunning paused idlePaused pausing activePausing )
      
      





異なるタイプのおかげで、状態ごとに遷移も個別に宣言されます。 これにより、巨大な ソーセージ 入れ子になったswitch



遷移関数。 状態自体には、データやロジックは含まれていません。 それらの場合、毎回これを行わないように、パッケージレベルで変数を宣言できます。 ポリモーフィズムにはstate



インターフェースが必要です。 activeState



idleState



についてはactiveState



idleState



ます。







マシンの2番目に重要な部分はイベントです。







 // event  . type event int //  . const ( run event = iota pause resume push measure cancel ) // target      . type target struct { state idleState //   errc chan error //   ,     } // eventMessage   ,    . type eventMessage struct { event //   params params //   components []string // id  target //      }
      
      





target



タイプが必要な理由を理解するために、簡単な例を考えてみましょう。 新しいコンベヤを作成しました。 ready



ready



ます。 p.Run()



実行します。 run



イベントがマシンに送信され、パイプラインがrunning



状態になります。 コンベアがいつ完成したかを知る方法は? これは、 target



タイプが役立つ場所です。 これは、イベント後に予想される安静状態を示します。 この例では、作業が完了すると、パイプラインは再びready



状態になります。 ダイアグラム内の同じもの:













状態の種類について詳しく説明します。 より正確には、 idleState



およびactiveState



。 さまざまなタイプのステージのlisten(*Pipe, target) (state, target)



関数を見てみましょう:







 // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) }
      
      





pipe.Pipe



は、遷移を待つためのさまざまな機能があります! 何がありますか?







 // idle     .    . func (p *Pipe) idle(s idleState, t target) (state, target) { if s == t.state || s == ready { t = t.dismiss() //  ,  target } for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { e.target.handle(err) } else if e.hasTarget() { t.dismiss() t = e.target } } if s != newState { return newState, t // ,    } } } // active     .     , //   . func (p *Pipe) active(s activeState, t target) (state, target) { for { var newState state var err error select { case e := <-p.events: //   newState, err = s.transition(p, e) //    if err != nil { //  ? e.target.handle(err) // ,    } else if e.hasTarget() { // ,  target t.dismiss() //   t = e.target //   } case <-p.provide: //     newState = s.sendMessage(p) //    case err, ok := <-p.errc: //   if ok { //   ,  interrupt(p.cancel) //   t.handle(err) //    } //    ,  return ready, t //    ready } if s != newState { return newState, t // ,    } } }
      
      





したがって、異なる状態の異なるチャネルを聞くことができます。 たとえば、これにより、一時停止中にメッセージを送信しないようにすることができます。対応するチャンネルを聞かないだけです。







コンストラクターとマシンの起動









 // New      . //      ready. func New(sampleRate phono.SampleRate, options ...Option) *Pipe { p := &Pipe{ UID: phono.NewUID(), sampleRate: sampleRate, log: log.GetLogger(), processors: make([]*processRunner, 0), sinks: make([]*sinkRunner, 0), metrics: make(map[string]measurable), params: make(map[string][]phono.ParamFunc), feedback: make(map[string][]phono.ParamFunc), events: make(chan eventMessage, 1), //    cancel: make(chan struct{}), //     provide: make(chan struct{}), consume: make(chan message), } for _, option := range options { //   option(p)() } go p.loop() //    return p }
      
      





初期化オプション機能オプションに加えて、メインサイクルで個別のゴルーチンが開始されます。 まあ、彼を見てください:







 // loop ,     nil . func (p *Pipe) loop() { var s state = ready //   t := target{} for s != nil { s, t = s.listen(p, t) //      p.log.Debug(fmt.Sprintf("%v is %T", p, s)) } t.dismiss() close(p.events) //    } // listen     ready. func (s idleReady) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idleReady) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) return nil, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case run: if err := p.start(); err != nil { return s, err } return running, nil } return s, ErrInvalidState }
      
      





コンベアが作成され、イベントを待ってフリーズしました。







働く時間



p.Run()



呼び出します!













 // Run   run  . //     pipe.Close  . func (p *Pipe) Run() chan error { runEvent := eventMessage{ event: run, target: target{ state: ready, //    errc: make(chan error, 1), }, } p.events <- runEvent return runEvent.target.errc } // listen     running. func (s activeRunning) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activeRunning) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case pause: return pausing, nil } return s, ErrInvalidState } // sendMessage   . func (s activeRunning) sendMessage(p *Pipe) state { p.consume <- p.newMessage() return s }
      
      





running



はメッセージを生成し、パイプラインが完了するまでrunning



されます。







一時停止



コンベアの実行中、一時停止できます。 この状態では、パイプラインは新しいメッセージを生成しません。 これを行うには、 p.Pause()



メソッドを呼び出します。













 // Pause   pause  . //     pipe.Close  . func (p *Pipe) Pause() chan error { pauseEvent := eventMessage{ event: pause, target: target{ state: paused, //    errc: make(chan error, 1), }, } p.events <- pauseEvent return pauseEvent.target.errc } // listen     pausing. func (s activePausing) listen(p *Pipe, t target) (state, target) { return p.active(s, t) } // transition       . func (s activePausing) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case measure: e.params.applyTo(p.ID()) p.feedback = p.feedback.merge(e.params) return s, nil case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil } return s, ErrInvalidState } // sendMessage   .   -, //      .    //    ,      .  , // ,   , : // 1.     // 2.      func (s activePausing) sendMessage(p *Pipe) state { m := p.newMessage() if len(m.feedback) == 0 { m.feedback = make(map[string][]phono.ParamFunc) } var wg sync.WaitGroup //     wg.Add(len(p.sinks)) //   Sink for _, sink := range p.sinks { param := phono.ReceivedBy(&wg, sink.ID()) // - m.feedback = m.feedback.add(param) } p.consume <- m //   wg.Wait() // ,     return paused }
      
      





すべての受信者がメッセージを受信するとすぐに、パイプラインはpaused



状態になります。 メッセージが最後の場合、 ready



状態への移行が発生します。







仕事に戻る!



paused



状態を終了するには、 p.Resume()



呼び出します。













 // Resume   resume  . //     pipe.Close  . func (p *Pipe) Resume() chan error { resumeEvent := eventMessage{ event: resume, target: target{ state: ready, errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc } // listen     paused. func (s idlePaused) listen(p *Pipe, t target) (state, target) { return p.idle(s, t) } // transition       . func (s idlePaused) transition(p *Pipe, e eventMessage) (state, error) { switch e.event { case cancel: interrupt(p.cancel) err := Wait(p.errc) return nil, err case push: e.params.applyTo(p.ID()) p.params = p.params.merge(e.params) return s, nil case measure: for _, id := range e.components { e.params.applyTo(id) } return s, nil case resume: return running, nil } return s, ErrInvalidState }
      
      





ここではすべてが簡単で、パイプラインは再びrunning



状態になります。







カールアップ



コンベアはどの状態からでも停止できます。 これにはp.Close()



があります。













 // Close   cancel  . //      . //    ,   . func (p *Pipe) Close() chan error { resumeEvent := eventMessage{ event: cancel, target: target{ state: nil, //   errc: make(chan error, 1), }, } p.events <- resumeEvent return resumeEvent.target.errc }
      
      





誰がこれを必要としますか?



皆のためではありません。 状態の管理方法を正確に理解するには、タスクを理解する必要があります。 イベント非同期マシンを使用できる状況は正確に2つあります







  1. 複雑なライフサイクル-非線形遷移を伴う3つ以上の状態があります。
  2. 非同期実行が使用されます。


イベントマシンは問題を解決しますが、かなり複雑なパターンです。 したがって、すべての長所と短所を完全に理解してから、慎重に使用する必要があります。







参照資料






All Articles