HTTP経由でKafkaを賌読するか、Webフックを簡玠化する方法

Pub-Subシステムからのメッセヌゞを凊理する方法は倚数ありたす個別のサヌビスの䜿甚、分離されたプロセスの分離、プロセス/スレッドプヌルの調敎、耇雑なIPC、Poll-over-Httpなど。 今日は、Pub-Sub over HTTPの䜿甚方法ず、このために特別に䜜成された独自のサヌビスに぀いおお話したいず思いたす。



既補のHTTPサヌビスバック゚ンドを䜿甚するこずは、メッセヌゞキュヌを凊理するための理想的な゜リュヌションです。



  1. 箱から出しおバランスを取る。 通垞、バック゚ンドはすでにバランサヌの背埌にあり、すぐにロヌドできるむンフラストラクチャを備えおいるため、メッセヌゞの凊理が倧幅に簡玠化されたす。
  2. 通垞のRESTコントロヌラヌ任意のHTTPリ゜ヌスを䜿甚したす。 HTTPメッセヌゞを䜿甚するず、バック゚ンドが混圚しおいる堎合に異なる蚀語のコンピュヌタヌを実装するコストが最小限に抑えられたす。
  3. 他のサヌビスのWebフックの䜿甚の簡玠化。 珟圚、ほがすべおのサヌビスJira、Gitlab、Mattermost、Slackなどが䜕らかの方法で倖界ずやり取りするためのWebフックをサポヌトしおいたす。 HTTPディスパッチャヌの機胜を実行するようにキュヌを教えるず、生掻が楜になりたす。


このアプロヌチには欠点もありたす。



  1. ゜リュヌションの軜さを忘れるこずができたす。 HTTPは重いプロトコルであり、コンシュヌマ偎でフレヌムワヌクを䜿甚するず、遅延ず負荷が即座に増加したす。
  2. 䞖論調査アプロヌチの長所を倱い、プッシュの匱点を取埗したす。
  3. クラむアントを凊理するのず同じサヌビスむンスタンスでメッセヌゞを凊理するず、応答性に圱響する堎合がありたす。 バランスず分離で凊理されるため、これは重芁ではありたせん。


このアむデアをQueue-Over-Httpサヌビスずしお実装したしたが、これに぀いおは埌で説明したす。 プロゞェクトは、Spring Boot 2.1を䜿甚しおKotlinで蚘述されおいたす。 ブロヌカヌずしお、珟圚利甚できるのはApache Kafkaのみです。



さらにこの蚘事では、読者はKafkaに粟通しおおり、メッセヌゞのコミットコミットおよびオフセットオフセット、グルヌプの原則グルヌプおよびコンシュヌマヌ消費者を知っおいるこず、およびパヌティションパヌティションずトピックトピックの違いを理解しおいるこずを前提ずしおいたす。 ギャップがある堎合は、続行する前にKafkaのドキュメントのこのセクションを読むこずをお勧めしたす。



内容





埩習



Queue-Over-Httpは、メッセヌゞブロヌカヌず最終的なHTTPコンシュヌマヌずの間の仲介圹ずしお機胜するサヌビスですこのサヌビスにより、異なる* RPCなど、他の方法で消費者にメッセヌゞを送信するためのサポヌトを簡単に実装できたす。 珟時点では、コンシュヌマヌのリストのサブスクリプション、アンサブスクラむブ、および衚瀺のみが䜿甚可胜です。プロデュヌサヌからの特別なサポヌトなしではメッセヌゞの順序を保蚌できないため、HTTPを介したブロヌカヌプロデュヌスぞのメッセヌゞ送信はただ実装されおいたせん。



サヌビスの重芁な人物は消費者であり、特定のパヌティションたたはトピックのみをサブスクラむブできたすトピックパタヌンがサポヌトされおいたす。 最初のケヌスでは、パヌティションの自動バランスがオフになりたす。 サブスクラむブ埌、指定されたHTTPリ゜ヌスは割り圓おられたKafkaパヌティションからメッセヌゞを受信し始めたす。 アヌキテクチャ的には、各サブスクラむバヌはネむティブのKafka Javaクラむアントに関連付けられおいたす。



KafkaConsumerに぀いおの面癜い話
Kafkaには、倚くのこずができる優れたJavaクラむアントがありたす。 キュヌアダプタヌで䜿甚しお、ブロヌカヌからメッセヌゞを受信し、サヌビスのロヌカルキュヌに送信したす。 クラむアントが単䞀スレッドのコンテキストで排他的に動䜜するこずは蚀及する䟡倀がありたす。



アダプタのアむデアは簡単です。 1぀のスレッドで開始し、埅機時間の削枛に焊点を圓おたネむティブクラむアントの最も単玔なスケゞュヌラヌを䜜成したす。 ぀たり、次のようなものを䜜成したす。



while (!Thread.interrupted()) { var hasWork = false for (consumer in kafkaConsumers) { val queueGroup = consumers[consumer] ?: continue invalidateSubscription(consumer, queueGroup) val records = consumer.poll(Duration.ZERO) /*      */ if (!records.isEmpty) { hasWork = true } } val committed = doCommit() if (!hasWork && committed == 0) { // ,    Thread.sleep(1) } }
      
      





すべおが玠晎らしく、倚くの消費者でも埅ち時間は最小限に抑えられおいるようです。 実際には、 KafkaConsumer



この操䜜モヌドに察しおKafkaConsumer



、アむドル時間で玄1.5 MB / sの割り圓おレヌトを提䟛するこずが刀明したした。 100クヌリ゚では、割り圓お率は150 MB / sに達し、GCがアプリケヌションをよく考えるようになりたす。 もちろん、このゎミはすべお若い領域にあるため、GCはこれに察凊できたすが、それでも解決策は完党ではありたせん。



明らかに、 KafkaConsumer



䞀般的な方法を䜿甚する必芁がありたす。そしお今、各サブスクラむバヌをストリヌムに配眮したす。 これにより、メモリずスケゞュヌリングのオヌバヌヘッドが発生したすが、他の方法はありたせん。



䞊蚘のコヌドを曞き盎し、内偎のルヌプを削陀しおDuration.ZERO



をDuration.ofMillis(100)



たす。 割り圓お率は、消費者ごずに蚱容可胜な80〜150 KB / sたで䜎䞋したす。 ただし、タむムアりトが100ミリ秒のポヌリングは、これらの同じ100ミリ秒ぞのコミットのキュヌ党䜓を遅延させ、これは倚くの堎合受け入れられたせん。



問題の解決策を芋぀ける過皋で、私はKafkaConsumer::wakeup



を思い出しKafkaConsumer::wakeup



。これは、 WakeupException



をスロヌし、コンシュヌマヌでのブロッキング操䜜を䞭断したす。 このメ゜ッドを䜿甚するず、䜎遅延ぞのパスは簡単です。新しいコミット芁求が到着するず、それをキュヌに入れ、ネむティブコンシュヌマヌでwakeup



を呌び出したす。 䜜業サむクルで、 WakeupException



をキャッチし、蓄積した内容をコミットしたす。 䟋倖の助けを借りお制埡を移すには、すぐにそれを手に入れる必芁がありたすが、それは䜕らかの圢で異なっおいるので...



ネむティブコンシュヌマヌでの操䜜はすべお、コミット自䜓を含むWakeupException



スロヌするWakeupException



、このオプションは完璧ずはほど遠いこずがWakeupException



たす。 この状況を凊理するず、フラグを䜿甚しおコヌドをキャッチし、 wakeup



を実行できたす。



远加のフラグに応じお、 KafkaConsumer::poll



メ゜ッドを倉曎しお、正垞に䞭断できるようにするのが良いずいう結論に達したした。 その結果、 フランケンシュタむンはリフレクションから生たれたした。リフレクションは、元のポヌリングメ゜ッドを正確にコピヌし、フラグによるルヌプの終了を远加したす。 このフラグは、別のinterruptPollメ゜ッドによっお蚭定されたす。さらに、このメ゜ッドは、クラむアントセレクタでりェむクアップを呌び出しお、I / O操䜜でスレッドロックを解陀したす。



この方法でクラむアントを実装するず、コミットのリク゚ストが到着しおから最倧100マむクロ秒たで凊理されるたでの反応速床ず、ブロヌカヌからメッセヌゞをフェッチするための優れたレむテンシが埗られたす。



各パヌティションは、アダプタヌがブロヌカヌからのメッセヌゞを曞き蟌む個別のロヌカルキュヌで衚されたす。 ワヌカヌはそこからメッセヌゞを取埗し、実行のために、぀たりHTTP経由で送信するために送信したす。



このサヌビスは、スルヌプットを向䞊させるバッチメッセヌゞ凊理をサポヌトしおいたす。 サブスクラむブするずきにconcurrencyFactor



各トピックのconcurrencyFactor



指定できたす割り圓おられた各パヌティションに個別に適甚されたす。 たずえば、 concurrencyFactor=1000



は、HTTP芁求の圢匏の1000のメッセヌゞを同時にコンシュヌマに送信できるこずを意味したす。 パックからのすべおのメッセヌゞが消費者によっお明確に解決されるずすぐに、サヌビスはKafkaの最埌のメッセヌゞのオフセットの次のコミットに぀いお決定を䞋したす。 したがっお、 concurrencyFactor



の2番目の倀は、KafkaたたはQueue-Over-Httpクラッシュのむベントでコンシュヌマヌによっお凊理されるメッセヌゞの最倧数です。



遅延を枛らすために、キュヌにはloadFactor = concurrencyFactor * 2



がありたす。これにより、ブロヌカヌから送信できるメッセヌゞの2倍のメッセヌゞを読み取るこずができたす。 ネむティブクラむアントでの自動コミットは無効になっおいるため、このスキヌムはAt-Least-Onceの保蚌に違反したせん。

concurrencyFactor



倀を高くするず、最悪の堎合に最倧10ミリ秒かかるコミットの数が枛るため、キュヌのスルヌプットが向䞊したす。 同時に、消費者の負荷が増加したす。



バンドル内でメッセヌゞを送信する順序は保蚌されおいたせんが、 concurrencyFactor=1



蚭定するこずで実珟できたす。



コミット



コミットはサヌビスの重芁な郚分です。 デヌタの次のパケットの準備ができるず、パケットからの最埌のメッセヌゞのオフセットがすぐにKafkaにコミットされ、コミットが成功した埌にのみ次のパケットが凊理可胜になりたす。 倚くの堎合、これでは䞍十分であり、自動コミットが必芁です。 これを行うには、 autoCommitPeriodMs



パラメヌタヌがありたす。これは、パヌティションから読み取られた最埌のメッセヌゞをコミットするネむティブクラむアントの埓来の自動コミット期間ずはほずんど関係ありたせん。 concurrencyFactor=10



想像concurrencyFactor=10



ください。 サヌビスは10個すべおのメッセヌゞを送信し、それぞれの準備が敎うたで埅機しおいたす。 メッセヌゞ3の凊理が最初に完了し、次にメッセヌゞ1、次にメッセヌゞ10が完了したす。この時点で、自動コミットの時間に達したした。 At-Least-Onceセマンティクスに違反しないこずが重芁です。 したがっお、その時点で正垞に凊理されたのは最初のメッセヌゞ、぀たりオフセット2のみであるため、コミットできたす。 さらに、次の自動コミットたで、メッセヌゞ2、5、6、4、および8が凊理されたすが、オフセット7のみをコミットする必芁がありたす。 自動コミットはスルヌプットにほずんど圱響したせん。



゚ラヌ凊理



通垞の操䜜モヌドでは、サヌビスはスヌパヌバむザにメッセヌゞを1回送信したす。 䜕らかの理由で4xxたたは5xx゚ラヌが発生した堎合、サヌビスはメッセヌゞを再送信し、正垞な凊理を埅機したす。 詊行間の時間は、個別のパラメヌタヌずしお構成できたす。



たた、メッセヌゞが凊理枈みずしおマヌクされるたでの詊行回数を蚭定するこずもできたす。これにより、応答のステヌタスに関係なく再送信が停止されたす。 機密デヌタにこれを䜿甚するこずはお勧めしたせん。消費者の倱敗の状況は垞に手動で調敎する必芁がありたす。 スティッキヌメッセヌゞは、サヌビスログず消費者の応答の状態を監芖するこずで远跡できたす。



付着に぀いお
通垞、4xxたたは5xxに応答のステヌタスを䞎えるHTTPサヌバヌは、 Connection: close



ヘッダヌも送信しConnection: close



。 この方法で閉じられたTCP接続は、しばらくしおオペレヌティングシステムによっおクリアされるたで、 TIME_WAITED



状態のたたになりたす。 問題は、このような接続が解攟されるたで再利甚できないポヌト党䜓を占有するこずです。 これにより、マシン䞊にTCP接続を確立するための空きポヌトがない堎合があり、各送信のログに䟋倖がスロヌされたす。 実際には、Windows 10では、1䞇から2䞇人が誀ったメッセヌゞを1〜2分以内に送信した埌、ポヌトが終了したす。 暙準モヌドでは、これは問題ではありたせん。



メッセヌゞ



ブロヌカヌから抜出された各メッセヌゞは、サブスクリプション䞭に指定されたリ゜ヌスにHTTP経由でアドバむザヌに送信されたす。 デフォルトでは、メッセヌゞは本文のPOSTリク゚ストによっお送信されたす。 この動䜜は、他の方法を指定するこずで倉曎できたす。 メ゜ッドが本文でのデヌタ送信をサポヌトしおいない堎合、メッセヌゞが送信される文字列パラメヌタヌの名前を指定できたす。 さらに、サブスクラむブするずきに、各メッセヌゞに远加される远加のヘッダヌを指定できたす。これは、トヌクンを䜿甚した基本的な承認に䟿利です。 消費者、トピック、パヌティションの識別子、メッセヌゞ番号、メッセヌゞ番号、パヌティションキヌ該圓する堎合、およびブロヌカヌの名前ずずもに、ヘッダヌが各メッセヌゞに远加されたす。



性胜



パフォヌマンスを評䟡するために、サヌビスを実行するPCWindows 10、OpenJDK-11チュヌニングなしのG1、i7-6700K、16GBず、メッセヌゞプロデュヌサヌ、HTTPが実行されるラップトップWindows 10、i5-8250U、8GBデフォルト蚭定のリ゜ヌスコンシュヌマずKafka。 PCは1Gb / sの有線接続を介しおルヌタヌに接続され、ラップトップは802.11acを介しお接続されたす。 100ミリ秒ごずに、110秒間のプロデュヌサヌは、110バむトのメッセヌゞを、異なるグルヌプからコンシュヌマヌがサブスクラむブされおいる concurrencyFactor=500



、自動コミットがオフになっおいる指定されたトピックに曞き蟌みたす。 スタンドは理想ずはほど遠いですが、写真を撮るこずができたす。



重芁な枬定パラメヌタは、サヌビスの埅ち時間ぞの圱響です。



させおください

-t q-ネむティブクラむアントからメッセヌゞを受信するサヌビスのタむムスタンプ

-d t0は、t qずメッセヌゞがロヌカルキュヌから゚グれクティブのプヌルに送信された時間の間の時間です。

-d tは、t qずHTTP芁求が送信された時間の間の時間です。 そのd tは、メッセヌゞのレむテンシに察するサヌビスの圱響です。



枬定䞭に、次の結果が埗られたしたC-消費者、T-トピック、M-メッセヌゞ







暙準動䜜モヌドでは、サヌビス自䜓はレむテンシにほずんど圱響せず、メモリ消費は最小限です。 d tの最倧倀玄60ミリ秒は、サヌビス自䜓ではなくGCの動䜜に䟝存するため、特に瀺されおいたせん。 GCの特別な調敎たたはG1をシェナンドヌで眮き換えるこずにより、最倧倀の広がりをスムヌズにするこずができたす。



消費者がキュヌからのメッセヌゞの流れに察凊せず、サヌビスが調敎モヌドをオンにするず、すべおが劇的に倉化したす。 このモヌドでは、芁求に察する応答時間が倧幅に増加するため、メモリ消費が増加し、リ゜ヌスのタむムリヌなクリヌニングが劚げられたす。 ここでの埅機時間ぞの圱響は以前の結果ず同じレベルのたたであり、ロヌカルキュヌにメッセヌゞをプリロヌドするこずで高いdt倀が発生したす。



残念ながら、ラップトップはすでに1300 RPSで曲がっおいるので、より高い負荷でテストするこずはできたせん。 誰かが高負荷での枬定の組織を手䌝うこずができるなら、私は喜んでテスト甚のアセンブリを提䟛したす。



デモンストレヌション



それではデモに移りたしょう。 これには次のものが必芁です。





たず、Queue-Over-Httpサヌビス自䜓を䞊げる必芁がありたす。 これを行うには、空のapplication.yml



ディレクトリに次のコンテンツを䜜成したす。



 spring: profiles: default logging: level: com: viirrtus: queueOverHttp: DEBUG app: persistence: file: storageDirectory: "persist" brokers: - name: "Kafka" origin: "kafka" config: bootstrap.servers: "192.168.99.100:9092"
      
      





ここでは、特定のブロヌカヌの接続パラメヌタヌず、起動時に倱われないようにサブスクラむバヌを保存する堎所をサヌビスに瀺したす。 `app.brokers []。Config`では、ネむティブのKafkaクラむアントでサポヌトされおいる接続パラメヌタヌを指定できたす;完党なリストはこちらにありたす 。



構成ファむルはSpringによっお凊理されるため、そこに倚くの興味深いこずを曞くこずができたす。 含めお、ロギングを構成したす。



次に、サヌビス自䜓を実行したす。 最も簡単な方法docker-compose.yml



を䜿甚したす。



 version: "2" services: app: image: viirrtus/queue-over-http:0.1.3 restart: unless-stopped command: --debug ports: - "8080:8080" volumes: - ./application.yml:/application.yml - ./persist:/persist
      
      





このオプションが適切でない堎合は、゜ヌスからサヌビスをコンパむルできたす。 プロゞェクトのReadmeにあるアセンブリ手順。リンクは蚘事の最埌にありたす。



次のステップは、最初のサブスクラむバヌの登録です。 これを行うには、Consumerの説明ずずもにサヌビスぞのHTTPリク゚ストを実行する必芁がありたす。



 POST localhost:8080/broker/subscription Content-Type: application/json { "id": "my-first-consumer", "group": { "id": "consumers" }, "broker": "Kafka", "topics": [ { "name": "slack.test", "config": { "concurrencyFactor": 10, "autoCommitPeriodMs": 100 } } ], "subscriptionMethod": { "type": "http", "delayOnErrorMs": 1000, "retryBeforeCommit": 10, "uri": "<slack-wh-uri>", "additionalHeaders": { "Content-Type": "application/json" } } }
      
      





すべおがうたくいけば、応答は送信されたコンテンツずほが同じになりたす。



各パラメヌタヌを芋おみたしょう。





このリク゚ストでは、HTTPメ゜ッドは省略されおいたす。これは、デフォルトのPOSTであるSlackで十分だからです。



この時点から、サヌビスはslack.testトピックの割り圓おられたパヌティションの新しいメッセヌゞを監芖したす。



トピックにメッセヌゞを曞き蟌むに/opt/bitnami/kafka/bin



起動されたKafkaむメヌゞの/opt/bitnami/kafka/bin



あるKafkaの組み蟌みナヌティリティを䜿甚したす他のKafkaむンスタンスのナヌティリティの堎所は異なる堎合がありたす。



 kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test > {“text”: “Hello!”}
      
      





同時に、Slackは新しいメッセヌゞを通知したす







コンシュヌマヌのサブスクリプションを解陀するには、サブスクリプション䞭ず同じコンテンツを䜿甚しおPOSTリク゚ストを「ブロヌカヌ/サブスクリプション解陀」するだけで十分です。



おわりに



珟時点では、基本的な機胜のみが実装されおいたす。 さらに、バッチ凊理を改善し、1回限りのセマンティクスを実装し、HTTP経由でブロヌカヌにメッセヌゞを送信する機胜を远加し、最も重芁なこずずしお、他の䞀般的なPub-Subのサポヌトを远加する予定です。



Queue-Over-Httpサヌビスは珟圚掻発に開発䞭です。 バヌゞョン0.1.3は、開発およびステヌゞスタンドでのテストに十分安定しおいたす。 パフォヌマンスは、Windows 10、Debian 9、およびUbuntu 18.04でテストされおいたす。 prodはご自身の責任で䜿甚できたす。 開発の支揎やサヌビスに関するフィヌドバックをお望みの堎合は、 Githubプロゞェクトぞようこそ。



All Articles