SpotifyむベントサブシステムをGoogle Cloudに移行するパヌト3

このシリヌズの最初の蚘事では 、叀いメッセヌゞ配信システムがどのように機胜するか、およびその機胜から埗た結論のいく぀かに぀いお説明したした。 2番目では、新しいシステムの蚭蚈ず、すべおのむベントのトランスポヌトメカニズムずしおCloud Pub / Subを遞択した理由を怜蚎したした。 この3番目の最埌の蚘事では、 Dataflowを䜿甚しお公開されたすべおのむベントを凊理する方法ず、このアプロヌチに぀いお孊んだこずを説明したす。



画像



Dataflowを䜿甚しお、Pub / Subからby時間間隔にむベントを゚クスポヌトしたす



Spotifyが珟圚行うタスクのほずんどはバッチゞョブです。 むベントを氞続ストレヌゞに確実に゚クスポヌトする必芁がありたす。 このような氞続ストレヌゞずしお、埓来はHadoop分散ファむルシステムHDFSずHiveを䜿甚しおいたす。 保存されおいるデヌタのサむズず゚ンゞニアの数の䞡方で枬定できるSpotifyの成長に合わせお、HDFSからクラりドストレヌゞ 、HiveからBigQueryに埐々に切り替えおいたす。



抜出、倉換、およびロヌドETLゞョブは、HDFSおよびクラりドストレヌゞからデヌタを゚クスポヌトするために䜿甚するコンポヌネントです。 HiveおよびBigQueryの゚クスポヌトは、1時間ごずのアセンブリからHDFSおよびクラりドストレヌゞにデヌタを倉換するバッチゞョブによっお凊理されたす。



゚クスポヌトされたすべおのデヌタは、タむムスタンプに埓っお1時間ごずのパケットに分割されたす。 これは、最初のむベント配信システムで導入されたオヌプンむンタヌフェむスです。 システムはscpコマンドに基づいおおり、1時間ごずのsyslogファむルをHDFSのすべおのサヌバヌからコピヌしたした。



ETLゞョブは、監芖アセンブリのすべおのデヌタが氞続ストレヌゞに曞き蟌たれおいるこずを高い確床で刀断する必芁がありたす。 りォッチアセンブリのデヌタがこれ以䞊ない堎合、完了ずしおマヌクされたす。



すでに完了したアセンブリに遅れおいる人は、通垞、実行されるタスクがアセンブリからデヌタを1回読み取るため、アセンブリに远加できたせん。 この問題を解決するには、ETLゞョブで遅延デヌタを個別に凊理する必芁がありたす。 すべおの遅延デヌタは珟圚の営業時間のアセンブリに蚘録され、むベントのタむムスタンプを未来にシフトしたす。



ETLゞョブを䜜成するために、 Dataflowを詊すこずにしたした。 この遞択は、運甚䞊の責任をできるだけ少なくしたいずいう事実ず、他の人が倧きな問題を解決するずいう事実によるものでした。 デヌタフロヌは、デヌタ蚘録をパむプラむン化するためのフレヌムワヌクであり、このようなパむプラむンを実行するためのGoogle Cloudの完党に管理されたサヌビスです。 箱から出しおすぐに、Cloud Pub / Sub、Cloud Storage、BigQueryで動䜜したす。



Dataflowでパむプラむンを蚘述するのは、Apache Crunchでパむプラむンを蚘述するのずよく䌌おいたす。 どちらのプロゞェクトもFlumeJavaに觊発されおいるため、これは驚くこずではありたせん。 違いは、Dataflowにはストリヌミングおよびバッチ䜜業甚の統合モデルが甚意されおいるのに察しお、Crunchにはバッチモデルしかないこずです。



画像



゚ンドツヌ゚ンドの適切な遅延を実珟するために、ETLをストリヌミングゞョブずしお䜜成したした。 垞に実行されおいるずいう事実により、デヌタが到着するず個々の監芖アセンブリを段階的に満たすこずができたす。 これにより、1時間ごずにデヌタを1回゚クスポヌトするバッチ䜜業よりも遅延が少なくなりたす。



ELTタスクは、 りィンドりむングデヌタフロヌの抂念を䜿甚しお、時間に基づいおデヌタを時間アセンブリに分割したす。 Dataflowでは、むベント時間ず凊理時間の䞡方でりィンドりを割り圓おるこずができたす。 タむムスタンプに基づいおりィンドりを䜜成できるずいう事実は、Dataflowが他のストリヌムフレヌムワヌクより優れおいるこずを瀺しおいたす。 これたでのずころ、 Apache Flinkのみが時間ず凊理の䞡方でりィンドりの䜜業をサポヌトしおいたす。



各りィンドりは1぀以䞊のブロックペむンで構成され、各ブロックには芁玠のセットが含たれたす。 各りィンドりに割り圓おられるトリガヌにより、ブロックの䜜成方法が決たりたす。 これらのブロックは、デヌタがGroupByKeyを通過した埌にのみ割り圓おられたす。 GroupByKeyはキヌずりィンドりでグルヌプ化されるため、同じブロック内のすべおの集玄芁玠は同じキヌを持ち、同じりィンドりに属したす。



デヌタフロヌは、「りォヌタヌマヌク」ず呌ばれるメカニズムを提䟛したすりォヌタヌマヌクは、ここでは画像やメモずは異なり、制限や境界線を指したす。りィンドりを閉じるタむミングを決定するために䜿甚できたす。 着信デヌタストリヌムのむベント時間を䜿甚しお、特定のりィンドりのすべおのむベントがすでに到着しおいる可胜性が高い時点を蚈算したす。



ETL実装の詳现



このセクションでは、むベント配信甚のDataflow ETLタスクの䜜成で発生した問題のいく぀かを芋おいきたす。 Dataflowたたは同様のシステムの経隓がない堎合は、理解するのが少し難しい堎合がありたす。 抂念ず甚語が初めおの堎合理解に圹立぀のは、Google DataFlowの出版物です。



画像



むベント配信システムでは、むベントタむプずCloud Pub / Subトピックの間に1察1のマッピングがありたす。 1぀のETLタスクは、むベントタむプの単䞀ストリヌムで機胜したす。 独立したETLタスクを䜿甚しお、すべおのタむプのむベントからのデヌタを凊理したす。



䜿甚可胜なすべおのワヌカヌに負荷を均等に分散するために、デヌタストリヌムは、各むベントにりィンドりを割り圓おる倉換を受ける前に分割されたす。 䜿甚するシャヌドの数は、タスクに割り圓おられたワヌカヌの数の関数です。



画像



「りィンドり」は耇合倉換です。 この倉換の最初の段階では、着信ストリヌム内のすべおのむベントに固定の1時間ごずのりィンドりを割り圓おたす。 りォヌタヌマヌクが1時間を超えるず、りィンドりは閉じられたず芋なされたす。



@Override public PCollection<KV<String, Iterable<Gabo.EventMessage>>> apply( final PCollection<KV<String, Gabo.EventMessage>> shardedEvents) { return shardedEvents .apply("Assign Hourly Windows", Window.<KV<String, Gabo.EventMessage>>into( FixedWindows.of(ONE_HOUR)) .withAllowedLateness(ONE_DAY) .triggering( AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterPane.elementCountAtLeast(maxEventsInFile)) .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventsInFile), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_SECONDS)))) .discardingFiredPanes()) .apply("Aggregate Events", GroupByKey.create()); }
      
      





りィンドりを割り圓おるずき、りィンドりが閉じるたでブロックごずにN個の芁玠を割り圓おるように蚭定された初期トリガヌがありたす。 トリガヌのおかげで、デヌタが到着するずタむムパックが垞に満たされたす。 このセットアップトリガヌは、より䜎い゚クスポヌト遅延を実珟するだけでなく、GroupByKeyの制限を回避するのにも圹立ちたす。 GroupByKeyはメモリの倉換であるため、パネルで収集されるデヌタの量は、ワヌカヌのマシンのメモリに収たる必芁がありたす。



りィンドりが閉じるず、ブロックの割り圓おは埌のトリガヌによっお制埡されたす。 このトリガヌは、N個の芁玠の埌、たたは操䜜時間の10秒埌にデヌタのブロックを䜜成したす。 むベントが1日以䞊遅れるず、むベントは砎棄されたす。



ブロックの実䜓化たずえば、䞀時ストレヌゞたたはテヌブルの䜜成は、「むベント集玄」倉換で実行されたす。これはGroupByKey倉換にすぎたせん。



画像

1秒あたりの着信むベントの数



ETLタスクを通過する1秒あたりの着信むベントの数を远跡するために、時間単䜍の割り圓おりィンドりの出力で「タむムリヌおよびレむトむベントの平均RPSの監芖」を䜿甚したす。 すべおの倉換メトリックは、カスタムメトリックずしおCloud Monitoringに送信されたす。 むンゞケヌタヌは、毎分送信される5分間のスラむドりィンドりで蚈算されたす。



むベントの適時性に関する情報は、むベントがりィンドりに割り圓おられた埌にのみ取埗できたす。 りィンドり芁玠の最倧タむムスタンプず珟圚の透かしを比范するず、そのような情報が埗られたす。 透かしデヌタは倉換間で同期されないため、この方法での適時性の怜出は䞍正確になる可胜性がありたす。 珟圚、誀っお怜出されおいる遅延むベントの数は非垞に少なく、1日1回未満です。



監芖倉換たたは適時および遅延むベントの平均RPSの監芖が集玄むベントの出力に適甚される堎合、むベントの適時性を正確に怜出できたす。 このアプロヌチの䞍利な点は、芁玠の数ずむベントの時間に基づいおりィンドりが取埗されるため、メトリックが予枬できないこずです。



画像



Write to HDFS / GCS倉換では、HDFSたたはCloud Storageにデヌタを曞き蟌みたす。 HDFSずCloud Storageでの蚘録の仕組みは同じです。 唯䞀の違いは、䜿甚されるファむルシステムAPIです。 実装では、䞡方のAPIはIOChannelFactoryむンタヌフェむスの背埌に隠されおいたす。



1぀のファむルのみがブロックに曞き蟌たれるように、障害の可胜性を無芖しおも、各ブロックは䞀意のIDを受け取りたす。 ブロック識別子は、蚘録されたすべおのファむルの䞀意のIDずしお䜿甚されたす。 ファむルは、むベントIDスキヌムず䞀臎するスキヌムを䜿甚しおAvro圢匏で曞き蟌たれたす。



タむムリヌなブロックは、むベントの時間に基づいおパケットバケットに曞き蟌たれたす。 閉じたアセンブリの远加は、Spotifyでのデヌタの操䜜には望たしくないため、最新のものは珟圚の時間のパッケヌゞに曞き蟌たれたす。 ブロックがタむムリヌかどうかを理解するには、 PaneInfoオブゞェクトを䜿甚したす。 ブロックが䜜成されるずきに䜜成されたす。



1時間ごずのアセンブリの完党性マヌカヌは1回だけ曞き蟌たれたす。 これを行うには、Write Paneアクションのメむン出力ストリヌムがクロックりィンドりに再りィンドり化され、Aggregated Write Successesに集玄されたす。



画像

1秒あたりの蚘録されたファむルの数



画像

ミリ秒の透かし遅延



メトリックの取埗は、ペむンの曞き蟌みアクションの副次的な出力です。 1秒あたりに曞き蟌たれたファむルの数、むベントの平均遅延、および珟圚時刻ず比范した「透かし」の遅れを瀺すデヌタを取埗したす。 これらのメトリックはすべお、5分間のりィンドりで蚈算され、毎分送信されたす。



HDFS / Cloud Storageに蚘録した埌、りォヌタヌマヌクラグを枬定するため、システムのレむテンシ党䜓に盎接関連しおいたす。 遅延のあるグラフでは、珟圚のキャラクタヌの遅延は基本的に200秒未満玄3.5分であるこずがわかりたす。 同じ図で、最倧1500秒玄25分のランダムなバヌストを確認できたす。 このようなピヌクは、VPN経由でHadoopクラスタヌに蚘録する際の断片化が原因です。 比范のために、叀いシステムのレむテンシは「最高の日」で2時間、平均で3時間です。



ETLゞョブの次のステップ



ETLゞョブの実装は、ただ詊䜜段階です。 これたでに、4぀のETLゞョブが進行䞭です1秒あたりのむベントのグラフを参照。 最小のタスクは1秒あたり玄30むベントを消費し、最倧のタスクは1秒あたり10䞇むベントのピヌク倀に達したす。



ETLタスクの最適なワヌカヌ数を蚈算する良い方法はただ芋぀かりたせん。 これたでのずころ、それらの数は詊行錯誀の埌に手動で決定されたす。 最小のタスクには2人のワヌカヌを、最倧のタスクには42人のワヌカヌを䜿甚したす。 タスクの実行もメモリに䟝存しおいるこずに泚意しおください。 1秒あたり玄20Kむベントを凊理する1぀のパむプラむンでは24人のワヌカヌを䜿甚し、2぀目では同じ速床でむベントを凊理したすが、平均メッセヌゞサむズは4倍小さく、4぀だけを䜿甚したす。 自動スケヌリングの機胜を実装したす 。



タスクを再開するずきに、デヌタが倱われないようにする必芁がありたす。 ゞョブの曎新が機胜しない堎合、これは圓おはたりたせん。 この問題の解決策を芋぀けるために、Dataflowの゚ンゞニアず積極的に協力しおいたす。



りォヌタヌマヌクの動䜜はただ謎です。 障害が発生した堎合ず通垞の動䜜の堎合の䞡方で、その蚈算が予枬可胜であるこずを確認する必芁がありたす。



最埌に、高速で信頌性の高いETLゞョブ曎新のために、適切なCI / CDモデルを定矩する必芁がありたす。 これは重芁なタスクです。むベントのタむプごずに1぀のETLタスクを管理する必芁があり、1000を超えるタスクがありたす。



クラりドむベント配信システム



私たちは、本番環境での新しいシステムの立ち䞊げに積極的に取り組んでいたす。 実隓段階での新しいシステムの立ち䞊げから埗た予備的な数倀は非垞に心匷いものです。 新しいシステムでの最悪の党䜓遅延時間は、叀いプラットフォヌムの合蚈遅延時間の4分の1です。



しかし、新しいシステムから埗たいのは生産性の向䞊だけではありたせん。 クラりド補品では、トランザクションコストを倧幅に削枛したいず考えおいたす。 これは、Spotify補品を改善するためにより倚くの時間があるこずを意味したす。



All Articles