Taskurottaたたは分散システムでのプロセス制埡

こんにちは、habrauser



さたざたなサヌビスず既存のシステムを制埡されたプロセスにリンクするタスクがありたす。 速床は宇宙的ではなく必芁です぀たり、株䟡に基づいお応答を䜜成するこずはできたせんが、倚くのプロセスがあり、䜿甚する必芁があるコンポヌネントシステムも迫っおいたす。 p2pバむンディングを行いたくありたせん。 矎しくお管理しやすいものが欲しい。



垂堎を確認した埌、Amazon Simple Workflowに基づいおレプリカを䜜成するこずが決定されたした。これは、盎接䜿甚できないためです。 私たちに合ったフレヌムワヌクのプロパティ





これは私が望む最小倀ですが、実践が瀺すように、より倚くのプラスがありたす。 このプロゞェクトは「タスク」に敬意を衚しおタスクロッタず名付けられたした。これはタスクであり、フィンランド語でgopherであり、重芁ではありたせんが衚瀺されたせん。 オヌプン゜ヌスはGitHubで入手できたす 。 プロゞェクトはHazelcastを䜿甚しお実装され、サヌバヌ間で共通のメモリ空間ずランタむムを䜜成したした。RESTサヌビスを迅速か぀䟿利に実装するためのDropwizardず、最初の開発者であるAmazonの友人が初めお開発したした。 ドキュメントは難しいですが、すぐに修正したす。



理論から実際の䟋に移りたしょう。



ナヌザヌに文字列メッセヌゞを送信するアプリケヌションを開発する必芁があるずしたす。 入力時に、ナヌザヌIDず文字セットを取埗したす。 圌のプロファむルIDによるから、電子メヌルたたは電話番号でメッセヌゞを受信するための蚭定デヌタを取埗したす。 電話番号ずメヌルもプロファむルで入手できたす。 次に、必芁なトランスポヌトずずもにメッセヌゞを送信したす。 送信が倱敗した堎合アドレスたたは番号が正しくないため、将来の再詊行を防ぐためにプロファむルに蚘録する必芁がありたす。




これに、メッセヌゞを送信するサヌビスが既に存圚し、他のサブネット䞊にあり、再利甚する必芁があるずいう非機胜芁件を远加したす。



PS説明されおいる䟋のすべおの゜ヌスコヌドは、GitHub taskurotta \ taskurotta-getstartedでも入手できたす。





Taskurottaを䜿甚するず、開発者にずっお通垞の方法で盞互にやり取りするシステムコンポヌネントアクタヌを実装できたす-互いのメ゜ッドを呌び出したすが、非同期で実行したす。 アクタヌは、パフォヌマヌずコヌディネヌタヌの2぀のタむプに分けられたす。 実行者は、割り圓おられたタスクを明確に遂行する必芁がありたす。 それらは最も独立したモゞュヌルであり、それに応じお-最も再利甚可胜です。 アヌティストは、倖郚の䞖界任意の入力/出力ストリヌムず察話し、必芁な限り、そのような方法でタスクを実行できたす。 䞀方、コヌディネヌタヌは倖の䞖界に関連するタスクを実行したせん。 デヌタベヌス、ネットワヌク、その他の䞍安定な可胜性のあるコンポヌネントずの盎接的なやり取りに぀たずかないように、できるだけ早く解決する必芁がありたす。 実行者にタスクを蚭定し、アクションを調敎し、それによっおプロセスの実装説明を確実にする矩務。 コヌディネヌタヌは、再利甚可胜なサブプロセスのパラダむムを実装するこずにより、他のコヌディネヌタヌのタスクを蚭定できたす。



コヌディネヌタヌのタスクは、珟圚知られおいるタスクを配垃するこずです。 ぀たり 結果を埅っおブロックしないでください。 圌は、自分が知っおいるタスク間の䟝存関係を構築し、必芁に応じお、さらなるアクションを決定するための非同期ポむントを圢成する必芁がありたす。



このプロセスでは、コヌディネヌタヌは次のこずを行う必芁がありたす。



  1. ナヌザヌプロフィヌルをリク゚ストする
  2. プロフィヌルを埅぀
  3. ナヌザヌにメッセヌゞを送信
  4. 発送の結果を埅぀




ステヌトマシン、耇数のパフォヌマヌによる1぀のメッセヌゞの倉曎方法、その他の通垞の骚の折れる方法を䜿甚しお、この䞀連のアクションを゚ンコヌドしたせん。 Promise゚ンティティずコヌディネヌタヌのアクションを監芖するシステムを䜿甚しお、簡単か぀矎しく行いたす。



Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message);
      
      







この䟋は、サヌビスを呌び出した結果、実際のオブゞェクトではなく、特定の玄束、぀たりタスクの結果ぞのリンクを取埗するこずを瀺しおいたす。 このPromiseを匕数ずしお他のサヌビスタスクに枡すこずができたす。 他のサヌビスの呌び出しはシステムによっおむンタヌセプトされ぀たり、実際の同期呌び出しは発生したせん、それらの間に関係が構築されたす。 タむプPromiseのすべおの匕数が準備されるたで、タスクはサヌビスによっお実行されたせん。 必芁なすべおの予備タスクが完了するたで。



したがっお、プロセス制埡はコヌディネヌタヌずシステムによっお共同で実行されたす。 コヌディネヌタヌはタスク間の䟝存関係を構築し、システムは、ずりわけ、予備タスクの完了ず、それに䟝存するタスクのその埌の起動を埅぀機胜を匕き継ぎたす。



次に、さらなるアクションを決定するための非同期ポむントを明らかにしたす。



通知の送信が成功したこずを確認する必芁があり、そうでない堎合は、ナヌザヌぞの通知の送信をブロックする必芁がありたす。





この堎合、通知を送信した埌、さらにアクションを実行する前に、送信結果を分析する必芁がありたす。 ぀たり タスクが完了するたで埅っお分析し、結果に応じおブロックするかどうかを決定したす。 このような問題を解決するために、コヌディネヌタヌは自分でタスクを䜜成する機䌚がありたす。 必芁なPromiseの転送先ずなるさらなるアクションを定矩するためのポむント。 以䞋のようになりたす。



  public void start(String userId, String message) { Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message); decider.blockOnFail(userId, sendResultPromise); } @Asynchronous public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) { logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise); if (!sendResultPromise.get()) { userProfileService.blockNotification(userId); } }
      
      







startメ゜ッドはプロセスの開始です。 次は3぀のタスクのステヌトメントです。 最初にプロファむルを受信し、2番目ず3番目に、コヌディネヌタヌは結果の埌続の分析sendToTransportメ゜ッドずblockOnFailメ゜ッドの呌び出しのために自分甚に蚭定したす。 したがっお、コヌディネヌタヌは、最初のタスクの解決を埅っおいるかのようになりたすが、ブロックはしたせん。 問題が解決されるず、TaskurottaシステムはsendToTransportコヌディネヌタヌメ゜ッドを呌び出しお、既補のPromiseオブゞェクトを枡し、そこからgetメ゜ッドを䜿甚しお実際のデヌタを取埗できたす。 sendToTransportタスクが完了した埌、blockOnFailタスクが起動され、通知の送信䞭に゚ラヌが発生した堎合にナヌザヌuserIdのメッセヌゞをブロックするようにタスクをuserProfileServiceサヌビスに蚭定したす。



さらにアクションを決定するためのポむントを䜿甚しお、さたざたなプロセス動䜜を実装できたす。



PSblockOnFailタスクは決定オブゞェクトを介しお呌び出されたす。 これは、呌び出しをむンタヌセプトする人工オブゞェクトですが、実際にはblockOnFailメ゜ッドを呌び出したせん。 タスクを蚭定する必芁があり、同期的に呌び出さないでください。



シナリオによるず、電子メヌルずSMSを送信するための請負業者が既にいるため、プロファむルを操䜜する請負業者のみを䜜成できたす。 この請負業者には2぀のタスクがありたす。



  1. ナヌザヌIDでプロファむルを返す
  2. 特定のナヌザヌにメッセヌゞを送信できないこずに関するプロファむルをプロファむルに䜜成したす




むンタヌフェヌスを宣蚀するこずから始めたす。 コヌディネヌタヌはこのむンタヌフェヌスで動䜜したす。 以䞋、簡朔にするために、コヌドのコメントおよびその他の重芁でない郚分は省略されたす。



  @Worker public interface UserProfileService { public Profile get(String userId); public void blockNotification(String userId); }
      
      







アノテヌション@Worker



は、このむンタヌフェむスを請負業者ずしお定矩したす。 泚釈には、その名前ず契玄のバヌゞョンを定矩するオプションの属性がありたす。 デフォルトでは、名前はむンタヌフェヌスのフルネヌムであり、バヌゞョンは「1.0」です。 異なるバヌゞョンの゚グれキュヌタは、競合するこずなく異なるプロセスで同時に動䜜できたす。



むンタヌフェヌスの実装に移りたしょう。



  public class UserProfileServiceImpl implements UserProfileService { private static final Logger logger = LoggerFactory.getLogger(UserProfileServiceImpl.class); @Override public Profile get(String userId) { return ProfileUtil.createRandomProfile(userId); } @Override public void blockNotification(String userId) { logger.info(".blockNotification(userId = [{}]", userId); } }
      
      







ここでは、プロファむルマネヌゞャヌProfileUtilの初期化を省略したした。 デヌタベヌス、LDAP、たたはその他のレゞストリで動䜜したす。 この䟋は、請負業者がタスク呌び出しを受け取り、それらを実際のモゞュヌルに委任するこずを瀺しおいたす。



これで請負業者の䜜成が完了したした。



前に蚭定したタスクを解決するには、コヌディネヌタヌはただ受信しおいないナヌザヌプロファむルPromiseオブゞェクトぞのリンクを転送しお、さらにアクションを決定する必芁がありたす。 そこで、このナヌザヌぞのメッセヌゞ送信が既にブロックされおいる堎合、圌はトランスポヌトを遞択するか、䜕も送信したせん。



ただし、゚グれキュヌタヌのむンタヌフェむスは、゚グれキュヌタヌ自身ず同様に、結果を同期的に受信しお提䟛するため、実行結果を宣蚀内のPromiseオブゞェクトずしお持たず、クリヌンなデヌタオブゞェクトを返したす。 これは正しいです。 請負業者は、その䜿甚方法を知る必芁はありたせん。 たずえば、プロファむルを取埗するための請負業者は、ナヌザヌ識別子が既にわかっおいる堎合、たたはナヌザヌが䞍明で、この識別子をどこかから取埗する別のタスクにリンクを転送する必芁がある堎合に䜿甚できたす。 したがっお、請負業者ずのやり取りのむンタヌフェむスになりたす。 このむンタヌフェヌスは、コヌディネヌタヌが必芁に応じお決定したす。 ぀たり コヌディネヌタヌのパッケヌゞプロゞェクトで定矩されたす。 請負業者ずのやり取りのむンタヌフェむスを远加しお、プロファむルを操䜜したす。



  @WorkerClient(worker = UserProfileService.class) public interface UserProfileServiceClient { public Promise<Profile> get(String userId); public void blockNotification(String userId); }
      
      







@WorkerClient



アノテヌションでマヌクされたむンタヌフェむスが衚瀺されたす。 泚釈パラメヌタヌは、請負業者の実際のむンタヌフェヌスのクラスを参照したす。 このようにしお、既存のむンタヌフェヌスず特定のコヌディネヌタヌに必芁なむンタヌフェヌスずの間にリンクが確立されたす。 このむンタヌフェヌスを「請負業者のクラむアントむンタヌフェヌス」ず呌びたす。 このクラむアントむンタヌフェむスには、コヌディネヌタヌに必芁なすべおのメ゜ッド未䜿甚を宣蚀するこずはできたせんおよび匕数の同䞀の眲名が含たれおいる必芁がありたす。 ただ完了しおいないタスクの結果を匕数ずしお枡したい堎合、任意の匕数をPromiseタむプにするこずができたす。



それでは、最も興味深い郚分、぀たりコヌディネヌタヌの䜜成に移りたしょう。 たず、コヌディネヌタヌむンタヌフェむスを以䞋に瀺したす。これを䜿甚しお、Taskurottaクラむアントが必芁なプロセスを開始したす。



  @Decider public interface NotificationDecider { @Execute public void start(String userId, String message); }
      
      







このむンタヌフェヌスは@Decider



ずしお定矩されおい@Decider



-぀たり コヌディネヌタヌずしお。 この泚釈には、 @Worker



泚釈ず同じプロパティ名前ずバヌゞョンがありたす。 デフォルトでは、むンタヌフェヌスのフルネヌムが名前ずしお䜿甚され、「1.0」がバヌゞョンずしお䜿甚されたす。



startメ゜ッドは@Execute



ずしおマヌクされ@Execute



。 これは、この方法でプロセスを開始できるこずを意味したす。



次に、コヌディネヌタヌの実装に進みたす



  public class NotificationDeciderImpl implements NotificationDecider { private static final Logger logger = LoggerFactory.getLogger(NotificationDeciderImpl.class); private UserProfileServiceClient userProfileService; private MailServiceClient mailService; private SMSServiceClient smsService; private NotificationDeciderImpl decider; @Override public void start(String userId, String message) { logger.info(".start(userId = [{}], message = [{}])", userId, message); Promise<Profile> profilePromise = userProfileService.get(userId); Promise<Boolean> sendResultPromise = decider.sendToTransport(profilePromise, message); decider.blockOnFail(userId, sendResultPromise); } @Asynchronous public Promise<Boolean> sendToTransport(Promise<Profile> profilePromise, String message) { logger.info(".sendToTransport(profilePromise = [{}], message = [{}])", profilePromise, message); Profile profile = profilePromise.get(); switch (profile.getDeliveryType()) { case SMS: { return smsService.send(profile.getPhone(), message); } case EMAIL: { return mailService.send(profile.getEmail(), message); } } return Promise.asPromise(Boolean.TRUE); } @Asynchronous public void blockOnFail(String userId, Promise<Boolean> sendResultPromise) { logger.info(".blockOnFail(userId = [{}], sendResultPromise = [{}])", userId, sendResultPromise); if (!sendResultPromise.get()) { userProfileService.blockNotification(userId); } } }
      
      







このコヌドでは、プラむベヌトオブゞェクトの初期化も省略したした。 完党で実甚的なサンプルコヌドはtaskurotta-getstartedプロゞェクトにありたす。 ここで、プラむベヌトフィヌルドの倀は、コヌディネヌタヌのプロキシオブゞェクトの特別なファクトリヌを通じお取埗されるこずに泚意しおください。



実装䟋では、コヌディネヌタヌによる未完了タスクの完了結果の埅機ポむントが2぀ありたす。 これは、sendToTransportおよびblockOnFailメ゜ッドです。 これらのメ゜ッドは、Promise型のすべおの匕数の準備ができたずきにのみ呌び出されたす。

぀たり 察応するタスクが完了したす。



タむプMailServiceClientおよびSMSServiceClientのフィヌルドオブゞェクトは、察応する請負業者ぞのクラむアントむンタヌフェむスでもありたす。 taskurotta-getstartedプロゞェクトで初期化を確認するこずもできたす。



珟時点では、すべおの請負業者ずコヌディネヌタヌを実装しおいたす。 アクタヌ゚グれキュヌタヌずコヌディネヌタヌの立ち䞊げに盎接進みたす。



タスクは、アプリケヌションサヌバヌ内で実行するこずも、個別のJavaアプリケヌションずしお実行するこずもできたすこの䟋では、taskurotta \ bootstrapモゞュヌルずは別のアプリケヌションのバリアントを䜿甚したす。 別のアプリケヌションは䜕をしたすか







別のJavaアプリケヌションを実行するには、ブヌトストラップパッケヌゞ、より具䜓的にはru.taskurotta.bootstrap.Mainクラスが䜿甚されたす。 YAML圢匏の蚭定ファむルの堎所を匕数ずしお枡す必芁がありたす。



どのように開始しようずしたすか ずおも簡単です。 以䞋は、サヌバヌ、アクタヌ、および゜ヌスコヌドからの起動の段階的なアセンブリです。 Linuxをお持ちでない堎合は泚意しおください、マむナヌな倉曎が必芁です。



あなたはすでに持っおいるず仮定したす





サヌバヌTaskurottaを組み立おる



 git clone https://github.com/taskurotta/taskurotta.git cd taskurotta/
      
      







アセンブリを実行したす。 スピヌドアップするには、テストを無効にしたす。



 mvn clean install -DskipTests
      
      







次に、2぀のノヌドのクラスタヌを実行したすデモの目的で1台のマシンを䜿甚しおいるため、起動オプションで異なるポヌトを䜿甚しおいたす。 実際の環境では、必芁な数のマシンを同じ構成で実行できたす。



最初のクラスタヌノヌドを起動したす。



 java -Xmx64m -Ddw.http.port=8081 -Ddw.http.adminPort=9081 -Ddw.logging.file.currentLogFilename="assemble/target/server1.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
      
      







2番目のノヌドを起動したす初期段階で意図的にメモリを制限しおリヌクの可胜性を特定したす。この䟋の構成では、デヌタベヌスは䜿甚されないため、より倚くのメモリを远加する必芁がありたす。



 java -Xmx64m -Ddw.http.port=8082 -Ddw.http.adminPort=9082 -Ddw.logging.file.currentLogFilename="assemble/target/server2.log" -jar assemble/target/assemble-0.4.0-SNAPSHOT.jar server assemble/src/main/resources/hz.yml
      
      







䞡方のサヌバヌが盞互に接続するず、ログは次のメッセヌゞのようになりたす。



  Members [2] { Member [192.168.1.2]:7777 Member [192.168.1.2]:7778 this }
      
      







ブラりザヌでコン゜ヌルを開きたす。 http// localhost8081 / index.html-最初のノヌドたたはhttp// localhost8082 / index.html -2番目のノヌド。



任意のノヌドを䜿甚しお、コン゜ヌルを操䜜できたす。 䞻に同じ情報が衚瀺されたす。 珟圚、すべおのコン゜ヌル機胜がこの構成で動䜜するわけではありたせんデヌタベヌスなし。 すべおがoracleおよびmongodb dbの構成で機胜したす。 ドキュメントの展開オプションを参照しおください。



それでは、プロセスを実行したしょう。 このためにtaskurotta-getstartedリポゞトリを耇補したす



 git clone https://github.com/taskurotta/taskurotta-getstarted.git cd taskurotta-getstarted/ mvn clean install
      
      







アクタヌが䜜業を開始するには、プロセスを開始する必芁がありたす。 たずえば91個を実行したす。



 java -cp target/getstarted-process-1.0-SNAPSHOT.jar ru.taskurotta.example.starter.NotificationModule http://localhost:8081 91
      
      







コン゜ヌルhttp// localhost8081 / index.htmlを確認したす。 [キュヌ]タブを遞択したす。 コヌディネヌタヌ91には、91番目の実行プロセスに察応するタスクがあるこずがわかりたす。



画像



次に、コヌディネヌタヌを実行したす。 YAML構成ファむルで定矩されおいるのは、゚グれキュヌタヌなしです。 したがっお、起動埌、プロセスのすべおのタスクが機胜しなくなり、実行者のタスクが䞊んで衚瀺されたす。



 java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-decider.yml
      
      







クラスタヌの最初のノヌドは、構成ファむルでコヌディネヌタヌのTaskurottaサヌバヌずしお定矩されおいたす



  spreader: - Spreader: class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig instance: endpoint: "http://localhost:8081" threadPoolSize: 10 readTimeout: 0 connectTimeout: 3000
      
      







コン゜ヌルでキュヌリストを曎新し、゚グれキュヌタヌを埅っおいるタスクがあるこずを確認できたす。



ここで、゚グれキュヌタを実行しコヌディネヌタを動䜜させたす、それらを2番目のクラスタノヌドに向けおデモンストレヌションしたす。 クラスタヌノヌドは共有メモリず内郚タスクを実行するための環境を圢成するため、゚グれキュヌタヌからの芁求がどのサヌバヌからのものであるかは関係ありたせん。



 java -Xmx64m -jar target/getstarted-process-1.0-SNAPSHOT.jar -f src/main/resources/config-workers.yml
      
      







パフォヌマヌには、盞互䜜甚のための2番目のクラスタヌノヌドがありたす。



  spreader: - Spreader: class: ru.taskurotta.example.bootstrap.SimpleSpreaderConfig instance: endpoint: "http://localhost:8082" threadPoolSize: 10 readTimeout: 0 connectTimeout: 3000
      
      







その結果、すべおのプロセスが完党に機胜し、管理コン゜ヌルで順番に確認できたす。



画像



これたでのずころ、私が珟時点で共有したいすべおのこず。 提案や建蚭的な批刀を歓迎したす。



All Articles