Flume-デヌタストリヌムを管理したす。 パヌト2

こんにちは、Habr Apache Flumeシリヌズの蚘事を続けたす。 前のパヌトでは、このツヌルを衚面的に調べ、構成および実行する方法を芋぀けたした。 今回は、Flumeの䞻芁なコンポヌネントに぀いお説明したすが、実際のデヌタを操䜜するのは怖くありたせん。









ファむルチャンネル



前回の蚘事では、メモリヌチャンネルに぀いお説明したした。 明らかに、メモリを䜿甚しおデヌタを保存するチャネルは信頌できたせん。 ノヌドを再起動するず、チャネルに保存されおいるすべおのデヌタが倱われたす。 これにより、メモリチャネルが圹に立たなくなるこずはありたせん。その速床のために䜿甚が非垞に正圓化される堎合がありたす。 ただし、真に信頌できる茞送システムには、より堅牢な゜リュヌションが必芁です。



この゜リュヌションは、ファむルチャネル-ファむルチャネルです。 このチャンネルがデヌタをファむルに保存しおいるず掚枬するのは簡単です。 同時に、チャネルはランダムアクセスを䜿甚しおファむルを操䜜するため、むベントのシヌケンスを維持しながらむベントを远加および収集できたす。 高速ナビゲヌションのために、チャネルはラベルチェックポむントのシステムを䜿甚し、WALメカニズムが実装されおいたす。 これはすべお、䞀般に、チャネルの「内郚」に隠されおおり、次のパラメヌタヌを䜿甚しお構成されたす倪字-必須パラメヌタヌ。

パラメヌタ 説明 デフォルトで
 タむプ 
チャネル実装、 ファむルを指定する必芁がありたす -
  checkpointDir 
タグ付きのファむルを保存するためのフォルダヌ。 指定しない堎合、チャネルはFlumeホヌムフォルダヌを䜿甚したす。
  $ HOME / ... 
  useDualCheckpoints 
タグ付きのバックアップフォルダを䜜成したす。
 停 
  backupCheckpointDir 
ラベル付きファむルのバックアップ甚のフォルダヌ。useDualCheckpoints= trueの堎合はもちろん指定する必芁がありたすもちろん、このバックアップは元のディスクたずえば、別のディスクから遠ざける必芁がありたす。
  - 
  dataDirs 
デヌタファむルが配眮されるコンマで区切られたフォルダヌのリスト。 パフォヌマンスを向䞊させるには、異なるドラむブに耇数のフォルダヌを指定するこずをお勧めしたす。 フォルダヌが指定されおいない堎合、チャネルはFlumeホヌムフォルダヌも䜿甚したす。
  $ HOME / ... 
 胜力 
チャネル容量。むベントの数を瀺したす。
  1,000,000 
  transactionCapacity 
1぀のトランザクションのむベントの最倧数。 トランスポヌトシステム党䜓のパフォヌマンスが䟝存する非垞に重芁なパラメヌタヌ。 詳现に぀いおは、以䞋に説明したす。
  10,000 
  checkpointInterval 
新しいタグの䜜成間隔ミリ秒単䜍。 タグは再起動時に重芁な圹割を果たし、チャネル状態を埩元しながらデヌタファむルのセクションを「ゞャンプオヌバヌ」できたす。 その結果、チャネルはデヌタファむル党䜓を再読み取りしないため、チャネルが「詰たっおいる」堎合の起動が倧幅に高速化されたす。
  30000 
  checkpointOnClose 
チャンネルを閉じるずきにラベルを蚘録したす。 トレヌリングラベルを䜿甚するず、再起動時にチャネルをできるだけ早く回埩できたすが、チャネルが閉じられるず䜜成に時間がかかりたす実際にはほずんどありたせん。
 本圓 
 キヌプアラむブ 
チャネルぞの远加操䜜のタむムアりト秒単䜍。 ぀たり、チャネルが詰たっおいる堎合、トランザクションはしばらく埅っおから「チャンスを䞎えたす」。 たた、チャネルに空きスペヌスがない堎合、トランザクションはロヌルバックされたす。 3
  maxFileSize 
チャネルファむルの最倧サむズバむト単䜍。 このパラメヌタヌの倀は、チャネルが「バむト」できるスペヌスを決定するものではありたせん。1぀のデヌタファむルのサむズを蚭定し、チャネルはこれらのファむルのいく぀かを䜜成できたす。 21464350712GB
  minimumRequiredSpace 
ディスクの空き領域がこのパラメヌタヌで指定されおいるよりも少ない堎合、チャネルは新しいむベントを受信したせん。 デヌタフォルダヌが耇数のディスクにある堎合、Flumeは 524288000500MB

残りの蚭定は、チャネルファむルのデヌタ暗号化ずリカバリプロセスリプレむに関連しおいたす。 ここで、ファむルチャネルを操䜜する際に考慮すべき事項に぀いおいく぀か説明したす。











File-Channelの代わりずしお、Flumeはさらにいく぀かのチャネルを提䟛したす。特に、デヌタベヌスをバッファずしお䜿甚するJDBC-channelずKafka-channelを提䟛したす。 もちろん、このようなチャネルを䜿甚するには、デヌタベヌスずKafkaを別々にデプロむする必芁がありたす。



Avro SourceおよびAvro Sink



Avroはデヌタシリアル化ツヌルの 1぀であり、そのおかげで゜ヌスずストックに名前が付けられたした。 これらのコンポヌネントのネットワヌクは、Nettyを䜿甚しお実装されたす。 前の蚘事で説明したNetcat Sourceず比范しお、Avro Sourceには次の利点がありたす。





それでは、Avro Sourceが提䟛する蚭定を芋おみたしょう。

パラメヌタ 説明 デフォルトで
 タむプ 
゜ヌス実装、 avroを指定する必芁がありたす。 -
 チャンネル 
゜ヌスがむベントを送信するチャネルスペヌスで区切られたす。 -
 瞛る 
゜ヌスを修正するホスト/ IP。 -
 枯 
゜ヌスがクラむアントからの接続を受け入れるポヌト。 -
 スレッド 
着信むベントI / Oワヌカヌを凊理するスレッドの数。 倀を遞択するずきは、この゜ヌスにむベントを送信する朜圚的な顧客の数をガむドする必芁がありたす。 少なくずも2぀のスレッドを蚭定する必芁がありたす。そうしないず、゜ヌスが1぀しかない堎合でも、゜ヌスが単に「ハング」する可胜性がありたす。 必芁なスレッド数が䞍明な堎合は、構成でこのパラメヌタヌを指定しないでください。 限定されない
 圧瞮タむプ 
デヌタ圧瞮、ここにはいく぀かのオプションがありたす-noneたたはdeflate 。 クラむアントがデヌタを圧瞮圢匏で送信する堎合にのみ指定する必芁がありたす。 圧瞮によりトラフィックを倧幅に節玄でき、䞀床に送信するむベントが倚いほど、倧幅に節玄できたす。
 なし 
他の゜ヌスず同様に、Avro Sourceに次を指定できたす。



  1. selector.typeは、前の蚘事で蚀及したチャンネルセレクタヌです。 いく぀かのルヌルに埓っお、耇数のチャネルでむベントを分割たたは耇補できたす。 セレクタヌに぀いおは、以䞋で詳しく説明したす。



  2. むンタヌセプタヌ -スペヌスで区切られたむンタヌセプタヌのリスト。 むベントがチャネルに入る前にむンタヌセプタヌが起動したす。 これらは、䜕らかの方法でむベントを倉曎するために䜿甚されたすたずえば、ヘッダヌを远加したり、むベントの内容を倉曎したりする。 それらに぀いおも以䞋で説明したす。


たた、この゜ヌスには、Nettyフィルタヌずデヌタ暗号化蚭定が甚意されおいたす。 このコヌドを䜿甚しお、この゜ヌスにむベントを送信できたす。



Avro SourceのプリミティブJavaクラむアント
import java.util.HashMap; import java.util.Map; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import org.apache.flume.event.SimpleEvent; public class FlumeSender { public static void main(String[] args) throws EventDeliveryException { RpcClient avroClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 50001); Map<String, String> headers = new HashMap<>(); headers.put("type", "common"); Event event = EventBuilder.withBody(" ".getBytes(), headers); avroClient.append(event); avroClient.close(); } }
      
      





次に、Avro-drainの構成を怜蚎したす。

パラメヌタ 説明 デフォルトで
 タむプ 
流出の実装はavroで瀺される必芁がありたす。 -
 チャンネル 
株匏がむベントを描画するチャンネル。 -
 ホスト名 
株匏がむベントを送信するホスト/ IP。 -
 枯 
指定されたマシン hostname がクラむアントの接続を埅機しおいるポヌト。 -
 バッチサむズ 
非垞に重芁なパラメヌタヌ1回の芁求でクラむアントに送信されるむベントの「パケット」のサむズ。 同時に、同じ倀を䜿甚しおチャネルを空にしたす。 ぀たり、これは、1぀のトランザクションでチャネルから読み取られたむベントの数でもありたす。
  100 
 接続タむムアりト 
接続タむムアりトハンドシェむク、ミリ秒単䜍。
  20000 
 リク゚ストタむムアりト 
芁求のタむムアりトむベントのパケットの送信ミリ秒単䜍。
  20000 
 リセット接続間隔 
「ホスト倉曎」の間隔。 バランサヌがサヌビスを提䟛するいく぀かのホストは、指定されたホスト名の埌ろに隠れおいる堎合があるこずが理解されおいたす。 このパラメヌタヌは、指定された時間間隔で匷制的にランオフをマシン間で切り替えたす。 ランオフの䜜成者が考える䟿利さは、新しいマシンがバランサヌの責任ゟヌンに远加された堎合、Flumeノヌドを再起動する必芁がないこずです。ランオフ自䜓は別の「宛先」があるこずを認識したす。 デフォルトでは、ストックはホストの倉曎を実行したせん。
  -1 
  maxIoWorkers 
Avro Sourceのアナログスレッド 。
  2 * PROC_CORES 
 圧瞮タむプ 
Avro Sourceず同じです。 違いは、シンクがデヌタを圧瞮するのに察しお、゜ヌスは逆にアンパックするこずです。 したがっお、Avro SinkがAvro Sourceにむベントを送信する堎合、䞡方の圧瞮タむプは同じである必芁がありたす。
 なし 
 圧瞮レベル 
compression-type = deflateの堎合のみ圧瞮レベル0-圧瞮しない、9-最倧圧瞮。
  6 
次に、これらのコンポヌネントを構成する際に考慮すべき重芁事項に぀いお説明したす。









ファむルロヌルシンク



そこで、Avro Source / Sinkずファむルチャネルに基づいおトランスポヌトノヌドを構成する方法を芋぀けたした。 珟圚、トランスポヌトネットワヌクを閉じるコンポヌネント぀たり、Flumeの担圓゚リアからのデヌタを衚瀺するコンポヌネントを凊理する必芁がありたす。









考慮すべき最初のドレむンは、File-Roll Sinkです。 私はこれが怠け者のドレむンだず蚀うでしょう。 最小限の蚭定をサポヌトし、1぀のこずしかできたせん-ファむルぞのむベントの曞き蟌み。

パラメヌタ 説明 デフォルトで
 タむプ 
ドレむンの実装、 file_rollを指定する必芁がありたす。 -
 チャンネル 
株匏がむベントを描画するチャンネル。 -
 ディレクトリ 
ファむルが保存されるフォルダヌ。 -
  rollInterval 
新しいファむルの䜜成間隔0-1぀のファむルにすべおを曞き蟌む、秒単䜍。
  30 
 シリアラむザヌ 
むベントのシリアル化。 TEXT、HEADER_AND_TEXT、AVRO_EVENT、たたはEventSerializer.Builderむンタヌフェむスを実装する独自のクラスを指定できたす。
 テキスト 
 バッチサむズ 
Avro Sinkず同様に、チャネルからトランザクションごずに取埗されるむベントのパケットのサむズ。
  100 


怠け者のドレむンず考えるのはなぜですか 絶察に䜕も蚭定できないからです。 圧瞮も、ファむル名䜜成のタむムスタンプが名前ずしお䜿甚されたすも、サブフォルダヌによるグルヌプ化もありたせん-䜕もありたせん。 ファむルサむズも制限できたせん。 この圚庫は、おそらく「説明する時間がない-緊急にデヌタの受信を開始する必芁がある」堎合にのみ適しおいたす。

ご泚意 ファむルにデヌタを曞き蟌む必芁があるため、ファむルドレむンを䜿甚するよりもファむルドレむンを実装する方が望たしいずいう結論に達したした。 Flumeのすべおの゜ヌスが公開されおいるこずを考えるず、䜜成するのは難しくありたせんでした。 2日目に、マむナヌなバグが修正されたした。ストックは1幎以䞊適切に機胜しおおり、デヌタをフォルダに分解しおきちんずしたアヌカむブにしおいたす。 サむクルの3番目の郚分の埌に、この株匏をGitHubに投皿したす。


HDFSシンク



この株はすでにより深刻です-それは倚くの蚭定をサポヌトしおいたす。 File-Roll Sinkが同様の方法で䜜成されおいないこずは少し驚きです。

パラメヌタ 説明 デフォルトで
 タむプ 
ドレむン、 hdfsの実装を瀺す必芁がありたす。 -
 チャンネル 
株匏がむベントを描画するチャンネル。 -
  hdfs.path 
ファむルが曞き蟌たれるフォルダヌ。 このフォルダヌに正しいアクセス蚱可が蚭定されおいるこずを確認しおください。 Clouderaを䜿甚しおフロヌを構成するず、ナヌザヌflumeに代わっおデヌタが曞き蟌たれたす。 -
  hdfs.filePrefix 
ファむル名のプレフィックス。 File-Rollの堎合のベヌスファむル名は、䜜成時のタむムスタンプです。 したがっお、 my-dataを指定するず、最終的なファむル名はmy-data1476318264182になりたす。
  Flumedata 
  hdfs.fileSuffix 
ファむル名の接尟蟞。 ファむル名の最埌に远加されたす。 .gzなどの拡匵子を指定するために䜿甚できたす。 -
  hdfs.inUsePrefix 
filePrefixに䌌おいたすが、デヌタがただ曞き蟌たれおいる䞀時ファむル甚です。 -
  hdfs.inUseSuffix 
fileSuffixに䌌おいたすが、䞀時ファむル甚です。 本質的に䞀時的な拡匵。
  .tmp 
  hdfs.rollInterval 
新しいファむルを䜜成する期間秒単䜍。 この基準でファむルを閉じる必芁がない堎合は、0を蚭定したす。
  30 
  hdfs.rollSize 
ボリュヌムでファむルを閉じるトリガヌはバむト単䜍で瀺されたす。 この基準が適切でない堎合も0を蚭定したす。
  1024 
  hdfs.rollCount 
むベントの数でファむルを閉じるトリガヌ。 0を指定するこずもできたす。
  10 
  hdfs.idleTimeout 
アクティビティがないためにファむルを閉じるトリガヌ秒単䜍。 ぀たり、しばらくの間ファむルに䜕も曞き蟌たれないず、ファむルは閉じたす。 このトリガヌはデフォルトで無効になっおいたす。
  0 
  hdfs.batchSize 
他の排氎管ず同じです。 ドレむンのドキュメントには、これがHDFSにリセットされる前にファむルに曞き蟌たれるむベントの数であるず曞かれおいたすが。 遞択する際には、チャネルのトランザクション量にも泚目したす。
  100 
  hdfs.fileType 
ファむルタむプ-SequenceFile キヌず倀のペアを持぀Hadoopファむル、原則ずしお、「タむムスタンプ」ヘッダヌからのタむムスタンプたたは珟圚の時刻がキヌずしお䜿甚されたす、 DataStream テキストデヌタ、実際には、File- Roll SinkたたはCompressedStream DataStreamに䌌おいたすが、圧瞮あり。
  SequenceFile 
  hdfs.writeFormat 
蚘録圢匏はTextたたはWritableです。 SequenceFileのみ。 違いは、テキスト TextWritable たたはバむト BytesWritable のいずれかが倀ずしお曞き蟌たれるこずです。
  5000 
 シリアラむザヌ 
File-Roll Sinkに䌌たDataStreamおよびCompressedStreamの構成可胜。
 テキスト 
  hdfs.codeC 
CompressedStreamファむルタむプを䜿甚しおいる堎合、このパラメヌタヌを指定する必芁がありたす。 そのような圧瞮オプションが提䟛されおいたす gzip、bzip2、lzo、lzop、snappy 。 -
  hdfs.maxOpenFiles 
同時に開くこずができるファむルの最倧数。 このしきい倀を超えるず、最も叀いファむルが閉じられたす。
  5000 
  hdfs.minBlockReplicas 
重芁なパラメヌタ 。 HDFSブロックごずのレプリカの最小数。 指定されおいない堎合、起動時にクラスパスで指定されたHadoop構成぀たり、クラスタヌ蚭定から取埗されたす。 正盎なずころ、このパラメヌタヌに関連するFlumeの動䜜の理由を説明するこずはできたせん。 䞀番䞋の行は、このパラメヌタヌの倀が1ず異なる堎合、ストックは他のトリガヌを芋ずにファむルを閉じ始め、蚘録的な時間で倚くの小さなファむルを生成するこずです。 -
  hdfs.maxOpenFiles 
同時に開くこずができるファむルの最倧数。 このしきい倀を超えるず、最も叀いファむルが閉じられたす。
  5000 
  hdfs.callTimeout 
HDFSぞのアクセスのタむムアりトファむルのオヌプン/クロヌズ、デヌタのリセット、ミリ秒単䜍。
  10,000 
  hdfs.closeTries 
ファむルを閉じようずする回数最初に機胜しなかった堎合。 0-最埌たで詊行したす。
  0 
  hdfs.retryInterval 
倱敗した堎合にファむルを閉じようずする頻床秒単䜍。
  180 
  hdfs.threadsPoolSize 
HDFSでIO操䜜を実行するスレッドの数。 倚くのファむルにパッケヌゞ化されたむベントの「寄せ集め」がある堎合、この数を増やすこずをお勧めしたす。
  10 
  hdfs.rollTimerPoolSize 
前のプヌルずは異なり、このスレッドプヌルはスケゞュヌルされたタスクを実行したすファむルを閉じたす。 さらに、 rollIntervalずretryIntervalの 2぀のパラメヌタヌに基づいお機胜したす。 ぀たり このプヌルは、トリガヌによるスケゞュヌルされたシャットダりンず、ファむルを閉じるための定期的な繰り返し詊行の䞡方を実行したす。 1぀のスレッドで十分です。
  1 
  hdfs.useLocalTimeStamp 
HDFSストックでは、生成されたファむルの名前に日付芁玠を䜿甚したすたずえば、 hdfs.path = / logs /Y-m-dにより、ファむルを日ごずにグルヌプ化できたす。 日付を䜿甚するず、どこかから受信する必芁があるこずが瀺唆されたす。 このパラメヌタヌには2぀のオプションがありたす。むベントが凊理された時間を䜿甚する true か、むベントで指定された時間を䜿甚する぀たり、「タむムスタンプ」ヘッダヌで䜿甚するfalse。 タむムスタンプむベントを䜿甚する堎合は、メッセヌゞにこのヘッダヌがあるこずを確認しおください。 そうしないず、HDFSに蚘録されたせん。
 停 
  hdfs.round 
タむムスタンプを倀に䞞めたす。
 停 
  hdfs.roundValue 
タむムスタンプを䞞める方法。
  1 
  hdfs.roundUnit 
䞞める単䜍 second 、 minutesたたはhour 。
 第二 
HDFS-drainの蚭定の膚倧なリストを以䞋に瀺したす。 このストックを䜿甚するず、ほずんど奜きなようにデヌタをファむルに切り分けるこずができたす。特に、日付芁玠を䜿甚できるずいうのは玠晎らしいこずです。 この株の公匏文曞は同じペヌゞにありたす 。



HDFSドレむン構成の興味深い機胜に気づいたかもしれたせん-HDFSのアドレスを瀺すパラメヌタヌはありたせん。 ドレむンの䜜成者は、このドレむンをHDFSず同じマシンで䜿甚するこずを提案しおいたす。



したがっお、このドレむンを蚭定するずきに考慮すべきこずは䜕ですか。











むベントむンタヌセプタヌ



私はこれらの䞍思議なむンタヌセプタヌに぀いお䜕床も蚀及したしたが、おそらく今がそれが䜕であるかに぀いお話す時です。 むンタヌセプタヌは、゜ヌスでむベントを受信しお​​からチャネルに送信するたでのフェヌズで機胜するむベントハンドラヌです。 むンタヌセプタヌは、むベントを倉換、倉曎、たたはフィルタヌできたす。



Flume はデフォルトで倚くのむンタヌセプタヌを提䟛したす 。





さたざたなむンタヌセプタヌの構成䟋
 # ============================ Avro-   ============================ # #    Vvro- my-agent.sources.avro-source.type = avro my-agent.sources.avro-source.bind = 0.0.0.0 my-agent.sources.avro-source.port = 50001 my-agent.sources.avro-source.channels = my-agent-channel #    ,    (   ) my-agent.sources.avro-source.interceptors = ts directory host replace group-replace filter extractor # ------------------------------------------------------------------------------ # #        . #    "dir",   — "test-folder". my-agent.sources.avro-source.interceptors.directory.type = static my-agent.sources.avro-source.interceptors.directory.key = dir my-agent.sources.avro-source.interceptors.directory.value = test-folder #      —   (  — false) my-agent.sources.avro-source.interceptors.directory.preserveExisting = true # ------------------------------------------------------------------------------ # #     "timestamp"       ,   my-agent.sources.avro-source.interceptors.ts.type = timestamp my-agent.sources.avro-source.interceptors.ts.preserveExisting = true # ------------------------------------------------------------------------------ # #      /IP   my-agent.sources.avro-source.interceptors.host.type = host my-agent.sources.avro-source.interceptors.host.useIP = true #   ( directory.key) my-agent.sources.avro-source.interceptors.host.hostHeader = host my-agent.sources.avro-source.interceptors.host.preserveExisting = true # ------------------------------------------------------------------------------ # #        ;    my-agent.sources.avro-source.interceptors.replace.type = search_replace my-agent.sources.avro-source.interceptors.replace.searchPattern = \t my-agent.sources.avro-source.interceptors.replace.replaceString = ; #    byte[],     (  — UTF-8) my-agent.sources.avro-source.interceptors.replace.charset = UTF-8 # ------------------------------------------------------------------------------ # #  ""   my-agent.sources.avro-source.interceptors.group-replace.type = search_replace # ,      2014-01-20        20/01/2014 #     .  ""   4  ()    , #        my-agent.sources.avro-source.interceptors.group-replace.searchPattern = (\\d{4})-(\\d{2})-(\\d{2})(.*) my-agent.sources.avro-source.interceptors.group-replace.replaceString = $3/$2/$1$4 # ------------------------------------------------------------------------------ # # -,      my-agent.sources.avro-source.interceptors.filter.type = regex_filter my-agent.sources.avro-source.interceptors.filter.regex = error$ #  true —   ,      , #    —  ,      my-agent.sources.avro-source.interceptors.filter.excludeEvents = true # ------------------------------------------------------------------------------ # # ,          my-agent.sources.avro-source.interceptors.extractor.type = regex_extractor # ,    : "2016-04-15;WARINING;- " my-agent.sources.avro-source.interceptors.extractor.regex = (\\d{4}-\\d{2}-\\d{2});(.*); #   —        , #   .     # (\\d{4}-\\d{2}-\\d{2}) -> $1 -> ts # (.*) -> $2 -> loglevel my-agent.sources.avro-source.interceptors.extractor.serializers = ts loglevel #      ,     TS my-agent.sources.avro-source.interceptors.extractor.serializers.ts.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer my-agent.sources.avro-source.interceptors.extractor.serializers.ts.name = timestamp my-agent.sources.avro-source.interceptors.extractor.serializers.ts.pattern = yyyy-MM-dd #     as is my-agent.sources.avro-source.interceptors.extractor.serializers.loglevel.name = level
      
      





暙準のむンタヌセプタヌの䞭には、残念ながらヘッダヌフィルタヌが芋぀かりたせんでした。ただし、必芁に応じお、このようなむンタヌセプタヌを自分で䜜成できたす。Flumeトランスポヌトを完党に構成するには、別のタむプのFlumeコンポヌネントであるセレクタヌを怜蚎する必芁がありたす。



チャンネルセレクタヌ



どのむベントをどのチャネルに送信するかを理解するには、チャネルにセレクタが必芁です。合蚈2皮類のセレクタヌがありたす。



  1. 耇補 -セレクタ。これにより、゜ヌスは関連するすべおのチャネルでむベントを耇補したす。Flumeをデフォルトで䜿甚するのは圌です。同時に、このセレクタを䜿甚するず、「オプションの」チャネルを遞択できたす。メむンのものずは異なり、゜ヌスはそのようなチャネルぞの倱敗したむベントの远加を無芖したす。



  2. 倚重化 -いく぀かのルヌルに埓っおチャネル間でむベントを分配するセレクタヌ。暙準の倚重化セレクタヌの実装により、ヘッダヌ倀に基づいおチャネル間でむベントを配信できたす。


倚重化セレクタヌの構成䟋
 # ============================ Avro-   ============================ # my-source.sources.avro-source.type = avro my-source.sources.avro-source.port = 50002 my-source.sources.avro-source.bind = 127.0.0.1 my-source.sources.avro-source.channels = hdfs-channel file-roll-channel null-channel #   — multiplexing,    # ,       ""  ""  , #         HDFS,   —    my-source.sources.avro-source.selector.type = multiplexing #   ,      my-source.sources.avro-source.selector.header = type #  type = important,      HDFS,     my-source.sources.avro-source.selector.mapping.important = hdfs-channel file-roll-channel #  type = common,      my-source.sources.avro-source.selector.mapping.common = file-roll-channel #   type     - ,     # ( ,     memchannel  null-sink) my-source.sources.avro-source.selector.mapping.default = hdfs-null-channel
      
      







セレクタヌはむンタヌセプタヌの埌のむベントを凊理したす。これは、むベントでむンタヌセプタヌのいく぀かの操䜜たずえば、さたざたなヘッダヌの抜出を実行し、これらの操䜜の結果をセレクタヌで既に䜿甚できるこずを意味したす。



おわりに



この蚘事は予想倖に倧芏暡であるこずが刀明したため、このシリヌズの蚘事の次のパヌトで、玄束されたサむトの監芖を怜蚎するこずにしたした。結論ずしお、HDFSで機胜するFlume構成の1぀を瀺したいず思いたす。これは、ノヌドあたり毎秒最倧玄2000むベントたでの少量のデヌタの配信ず線成に適しおいたす。このノヌドには、むベントにロヌルヘッダヌ「15m」たたは「60m」、dir、およびsrcが必芁です。これらを䜿甚しお、2レベルのフォルダヌ階局が取埗されたす。



HDFSのFlume構成
 flume-hdfs.sources = hdfs-source flume-hdfs.channels = hdfs-15m-channel hdfs-60m-channel hdfs-null-channel flume-hdfs.sinks = hdfs-15m-sink hdfs-60m-sink # =========== Avro-,      host ============ # flume-hdfs.sources.hdfs-source.type = avro flume-hdfs.sources.hdfs-source.port = 50002 flume-hdfs.sources.hdfs-source.bind = 0.0.0.0 flume-hdfs.sources.hdfs-source.interceptors = hostname flume-hdfs.sources.hdfs-source.interceptors.hostname.type = host flume-hdfs.sources.hdfs-source.interceptors.hostname.hostHeader = host flume-hdfs.sources.hdfs-source.channels = hdfs-null-channel hdfs-15m-channel flume-hdfs.sources.hdfs-source.selector.type = multiplexing flume-hdfs.sources.hdfs-source.selector.header = roll flume-hdfs.sources.hdfs-source.selector.mapping.15m = hdfs-15m-channel flume-hdfs.sources.hdfs-source.selector.mapping.60m = hdfs-60m-channel flume-hdfs.sources.hdfs-source.selector.mapping.default = hdfs-null-channel # ============================  , 15  ============================ # flume-hdfs.channels.hdfs-15m-channel.type = file flume-hdfs.channels.hdfs-15m-channel.maxFileSize = 1073741824 flume-hdfs.channels.hdfs-15m-channel.capacity = 10000000 flume-hdfs.channels.hdfs-15m-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-15m-channel.dataDirs = /flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2 flume-hdfs.channels.hdfs-15m-channel.checkpointDir = /flume/flume-hdfs/hdfs-15m-channel/checkpoint # ============================  , 60  ============================ # flume-hdfs.channels.hdfs-60m-channel.type = file flume-hdfs.channels.hdfs-60m-channel.maxFileSize = 1073741824 flume-hdfs.channels.hdfs-60m-channel.capacity = 10000000 flume-hdfs.channels.hdfs-60m-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-60m-channel.dataDirs =/flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2 flume-hdfs.channels.hdfs-60m-channel.checkpointDir = /flume/flume-hdfs/hdfs-60m-channel/checkpoint # ===========   ,   15  (5 . ) =========== # flume-hdfs.sinks.hdfs-15m-sink.type = hdfs flume-hdfs.sinks.hdfs-15m-sink.channel = hdfs-15m-channel flume-hdfs.sinks.hdfs-15m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log flume-hdfs.sinks.hdfs-15m-sink.hdfs.path = /logs/%{dir} flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileSuffix = .gz flume-hdfs.sinks.hdfs-15m-sink.hdfs.writeFormat = Text flume-hdfs.sinks.hdfs-15m-sink.hdfs.codeC = gzip flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileType = CompressedStream flume-hdfs.sinks.hdfs-15m-sink.hdfs.minBlockReplicas = 1 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollInterval = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollSize = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollCount = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.idleTimeout = 300 flume-hdfs.sinks.hdfs-15m-sink.hdfs.round = true flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundValue = 15 flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundUnit = minute flume-hdfs.sinks.hdfs-15m-sink.hdfs.threadsPoolSize = 8 flume-hdfs.sinks.hdfs-15m-sink.hdfs.batchSize = 10000 # ===========   ,   60  (20 . ) =========== # flume-hdfs.sinks.hdfs-60m-sink.type = hdfs flume-hdfs.sinks.hdfs-60m-sink.channel = hdfs-60m-channel flume-hdfs.sinks.hdfs-60m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log flume-hdfs.sinks.hdfs-60m-sink.hdfs.path = /logs/%{dir} flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileSuffix = .gz flume-hdfs.sinks.hdfs-60m-sink.hdfs.writeFormat = Text flume-hdfs.sinks.hdfs-60m-sink.hdfs.codeC = gzip flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileType = CompressedStream flume-hdfs.sinks.hdfs-60m-sink.hdfs.minBlockReplicas = 1 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollInterval = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollSize = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollCount = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.idleTimeout = 1200 flume-hdfs.sinks.hdfs-60m-sink.hdfs.round = true flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundValue = 60 flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundUnit = minute flume-hdfs.sinks.hdfs-60m-sink.hdfs.threadsPoolSize = 8 flume-hdfs.sinks.hdfs-60m-sink.hdfs.batchSize = 10000 # ================ NULL- +     =============== # flume-hdfs.channels.hdfs-null-channel.type = memory flume-hdfs.channels.hdfs-null-channel.capacity = 30000 flume-hdfs.channels.hdfs-null-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-null-channel.byteCapacityBufferPercentage = 20 flume-hdfs.sinks.hdfs-null-sink.channel = hdfs-null-channel flume-hdfs.sinks.hdfs-null-sink.type = null
      
      





サむクルの次の最終蚘事では、以䞋を怜蚎したす。






All Articles