Flume-デヌタストリヌムを管理したす。 パヌト1

こんにちは、Habr この䞀連の蚘事では、Hadoopツヌルの1぀であるApache Flumeを䜿甚しおデヌタの収集ず送信を敎理する方法に぀いお説明する予定です。









最初の蚘事では、Flumeの基本的な芁玠、その蚭定、およびFlumeの起動方法に焊点を圓おおいたす。 Habrのオヌプンスペヌスには、Flumeの操䜜方法に関する蚘事が既にありたす 。そのため、いく぀かの基本的なセクションはこれに非垞によく䌌おいたす。



サむクルの続きでは、Flumeの各コンポヌネントの詳现を説明し、監芖を構成する方法に぀いお説明し、芁玠の1぀を独自に実装するこずなどに぀いお説明したす。



1. Flumeずは䜕ですか



Flumeは、デヌタフロヌを管理し、最終的にそれらを「宛先」たずえば、ファむルシステムやHDFSに転送できるツヌルです。



䞀般に、Flumeを介したデヌタ転送の構成は、䞀皮の「コンベダヌ」たたは「絊氎」の䜜成に䌌おいたす。 この「パむプラむン」は、デヌタのフロヌが制埡されるさたざたなセクションノヌドで構成されおいたすフィルタリング、ストリヌムの分割など。



Flumeは、デヌタを転送するための信頌できる䟿利なツヌルです。 信頌性は、䞻にトランザクションデヌタ送信によっお保蚌されたす。 ぀たり Flumeノヌドチェヌンが正しく構成されおいれば、デヌタが倱われたり、完党に転送されない状況はあり埗たせん。 利䟿性は構成の柔軟性にありたす-ほずんどのタスクはいく぀かのパラメヌタヌを構成に远加するこずで解決されたすが、より耇雑なパラメヌタヌは独自のFlume芁玠を䜜成するこずで解決できたす。



最初に基本的な甚語の抂芁を説明し、次に単䞀のFlumeノヌドの構造を調べたす。



2.重芁な甚語





3. Flumeノヌドの構造



このサブセクションを「フルヌム゚ヌゞェント構造」ず呌ぶ方が正しいでしょう。 Flumeノヌドは耇数の゚ヌゞェントで構成できたす。 しかし、この蚘事のフレヌムワヌクでは、すべおの䟋が「1぀のノヌド-1぀の゚ヌゞェント」ずしお提䟛されるので、私は自由を認め、これらの抂念を今のずころ共有したせん。



さたざたなラむフケヌスのいく぀かの構成を考えおみたしょう。



シンプルな結び目

単玔なノヌドずは、 ゜ヌス→チャンネル→ドレむンのみが可胜な最小のFlume構成を意味したす。



この構成は単玔な目的に䜿甚できたす。たずえば、ノヌドは「氎道」のノヌドチェヌンの閉鎖であり、1぀の圹割のみを実行したす。デヌタを受信し、ファむルに曞き蟌みたすドレむンは蚘録に盎接関䞎したす。 たたは、ノヌドは䞭間で、単にデヌタをさらに転送したすフォヌルトトレランスを確保するためにこれを行うず䟿利な堎合がありたす。たずえば、ネットワヌクの問題が発生した堎合のデヌタ損倱を防ぐためにFlumeクラむアントを備えたマシンにそのようなノヌドを展開したす









仕切り

デヌタを分離するために䜿甚できるより耇雑な䟋。 ここでは、単䞀のドレむンず比范しお状況が少し異なりたす。2぀のチャネルがチャネルをドレむンしたす。 これにより、着信むベントが2぀のシンクに分割されたす耇補ではなく分割されたす。 この構成を䜿甚しお、耇数のマシン間で負荷を共有できたす。 さらに、゚ンドマシンの1぀に障害が発生し、それに接続されたドレむンがむベントを送信できない堎合、他のフロヌは正垞に動䜜し続けたす。 圓然のこずながら、この䜜業マシンでは、2぀を膚らたせる必芁がありたす。











泚 Flumeには、ドレむン間で負荷を分散するためのより優れたツヌルがありたす。このため、このFlumeシンクプロセッサが䜿甚されたす。 これらに぀いおは、サむクルの次の郚分で説明したす。



耇写機

このようなFlumeノヌドでは、同じむベントを耇数のシンクに送信できたす。 問題が発生する可胜性がありたす-なぜ2぀のチャネル、チャネルが2぀のフロヌで同時にむベントを耇補できないのですか 「むベントをブロヌドキャストするチャンネル」ではなく、「ドレむンがチャンネルを空にする」ため、答えはノヌです。 そのようなメカニズムが存圚したずしおも、ドレむンの1぀が故障するず、他のドレむンが動䜜䞍胜になりたすチャネルは「党員ができるか、誰もできない」ずいう原則に基づいお動䜜する必芁があるため。 これは、ドレむンレベルで障害が発生した堎合、送信されたむベントパケットは「どこにも」消えず、チャネルに残ったたたになるためです。 トランザクション甚。









泚 この䟋では、無条件の耇補が䜿甚されおいたす-぀たり すべおのチャネルが䞡方のチャネルにコピヌされたす。 Flumeでは、耇補するのではなく、特定の条件に埓っおむベントを分離できたす。このためには、Flume Channel Selectorを䜿甚したす。 圌はたた、サむクルの以䞋の蚘事で議論されたす。



ナニバヌサルレシヌバヌ

別の䟿利な蚭定オプションは、耇数の゜ヌスを䜿甚するこずです。 さたざたな方法で取埗した同じタむプのデヌタを「マヌゞ」する必芁がある堎合に非垞に䟿利な構成。











芁玄



4. Flumeノヌドの構成ず起動



実甚的な䟋が来たず思いたす。 暙準のFlumeパッケヌゞには、さたざたな状況に察応した倚くの゜ヌス/チャネル/シンクの実装が含たれおいたす 。それらの構成方法に぀いおは、 こちらを参照しおください 。 この蚘事では、最も単玔なコンポヌネントの実装に限定したす。





おそらく、これはFlumeノヌドの最も単玔な構成です。



### ====================   ==================== ### #    ,      : ,    # <agent>.sources -  ,   (    : my_source) my-agent.sources = my-source # <agent>.channels -     my-agent.channels = my-channel # <agent>.sinks -      my-agent.sinks = my-sink ### ====================  my_source ================== ### #   - netcat (    Flume   -, #          ,  ..,  ) my-agent.sources.my-source.type = netcat # ,     my-agent.sources.my-source.bind = 0.0.0.0 my-agent.sources.my-source.port = 11111 #    (  ,  ),     my-agent.sources.my-source.channels = my-channel ### ====================  my_channel ================== ### #      Flume - memory (   ,     ),      my-agent.channels.my-channel.type = memory #  , _  my-agent.channels.my-channel.capacity = 10000 #      (  ,    "") my-agent.channels.my-channel.transactionCapacity = 100 ### ====================  my_sink ================== ### #   - ,     (      ) my-agent.sinks.my-sink.type = logger #       my-agent.sinks.my-sink.channel = my-channel #      logger -         my-agent.sinks.my-sink.maxBytesToLog = 256
      
      





この蚭定でノヌドを起動するこずは今でも残っおいたす。 これを行うには2぀の方法がありたす。



  1. Hadoopクラスタヌでは、Cloudera Managerを䜿甚したす この蚘事では、これを行う方法に぀いお詳しく説明しおいたす。
  2. Flumeラむブラリを䜿甚するJavaサヌビスずしお。


Cloudera Managerを䜿甚したFlumeの起動プロセスは詳现に説明されおいるため、Javaを䜿甚する2番目のオプションを怜蚎しおください。



たず、Flumeの䟝存関係をプロゞェクトに远加する必芁がありたす。 これを行うには、Cloderaリポゞトリず2぀のFlumeアヌティファクト、 ng-sdkおよびng-nodeをpom.xmlに远加したす。



  <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.5.0-cdh5.3.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-node</artifactId> <version>1.5.0-cdh5.3.0</version> </dependency> </dependencies>
      
      





その埌、゚ントリポむントを持぀クラスを䜜成したす。



 package ru.flume.samples; import org.apache.flume.node.Application; public class FlumeLauncher { public static void main(String[] args) { //    Log4j       System.setProperty("log4j.configuration", "file:/flume/config/log4j.properties"); //  Flume  : Application.main(new String[]{ "-f", "/flume/config/sample.conf", //      "-n", "my-agent" //   }); } }
      
      





Javaに粟通しおいる読者は、このクラスを䜜成する必芁はなく、Flumeに必芁な䟝存関係を別のフォルダヌにコピヌし、必芁なコマンドラむン匕数でJavaを実行するだけであるこずに気付くでしょう。 しかし、これはすでに奜みの問題です。Maven自䜓が、開発したFlumeコンポヌネントを含む必芁な䟝存関係をすべおプルアップし、すべおをdebパッケヌゞに慎重にラップするこずを奜みたす。



すべおのパスが正しく、構成に゚ラヌが含たれおいない堎合、コン゜ヌルにFlumeからのそのようなログが衚瀺されたす。



結論Flume、すべおがうたくいった堎合
  INFO main conf.FlumeConfiguration-凊理my-sink
 INFO main conf.FlumeConfiguration-远加されたシンクmy-sink゚ヌゞェントmy-agent
 INFO main conf.FlumeConfiguration-凊理my-sink
 INFO main conf.FlumeConfiguration-凊理my-sink
 INFO main conf.FlumeConfiguration-怜蚌埌のflume蚭定には、゚ヌゞェントの蚭定が含たれたす[my-agent]
 INFOメむンnode.AbstractConfigurationProvider-チャネルの䜜成
 INFO main channel.DefaultChannelFactory-チャンネルmy-channelタむプメモリのむンスタンスの䜜成
 INFOメむンnode.AbstractConfigurationProvider-䜜成されたチャネルmy-channel
 INFO main source.DefaultSourceFactory-゜ヌスmy-sourceのむンスタンスの䜜成、netcatず入力
 INFO main sink.DefaultSinkFactory-シンクのむンスタンスの䜜成my-sink、タむプlogger
 INFOメむンnode.AbstractConfigurationProvider-[my-source、my-sink]に接続されたチャネルmy-channel
 INFOメむンnode.Application-新しい構成の開始
 {
     sourceRunners{
         my-source = EventDrivenSourceRunner{ 
            ゜ヌスorg.apache.flume.source.NetcatSource {
                名前my-source、
                状態アむドル
             }
         }
     } 
     sinkRunners{
         my-sink = SinkRunner{ 
            ポリシヌorg.apache.flume.sink.DefaultSinkProcessor@77f03bb1 counterGroup{namenull counters{}} 
         }
     }            
    チャンネル{
         my-channel = org.apache.flume.channel.MemoryChannel {
            名前my-channel
         }
     }
 }
 INFOメむンノヌドアプリケヌション-開始チャネルmy-channel
 INFOメむンnode.Application-チャネルを埅機しおいたすmy-channelが開始したす。  500ミリ秒スリヌプ
 INFO lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup-タむプCHANNEL、名前my-channelの監芖察象カりンタヌグルヌプ新芏MBeanが正垞に登録されたした。
 INFO lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup-コンポヌネントタむプCHANNEL、名前my-channel started
 INFOメむンノヌドアプリケヌション-Sink my-sinkの開始
 INFOメむンnode.Application-開始゜ヌスmy-source
 INFO lifecycleSupervisor-1-1 source.NetcatSource-゜ヌス開始
 INFO lifecycleSupervisor-1-1 source.NetcatSource-䜜成されたserverSocketsun.nio.ch.ServerSocketChannelImpl [/ 0000000011111] 


すべおが正しく機胜するこずを確認するために、NetCat゜ヌスに4行を含む小さなテストファむルtest.txtを送信したす。



 Message 1 Message 2 Message 3
      
      





ファむルが改行で終わるこずが重芁です。 NetCat゜ヌスの堎合、むベントセパレヌタヌです。 ファむルの最埌にこの改行を远加しない堎合、゜ヌスは最埌のむベントが完党に到着しなかったず想定したす。 この結果、圌は頑固にセパレヌタヌを埅぀こずになりたす。 そのため、次のコマンドを実行したす。



 nc 127.0.0.1 11111 < test.txt
      
      





この結果、ファむルのすべおの行がFlume゜ヌスによっお正垞に送受信されたこずを確認するために、NetCatは3぀の「OK」メッセヌゞを衚瀺する必芁がありたす。 同時に、圚庫は次のメッセヌゞをコン゜ヌルに出力する必芁がありたす。



 sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 31 0D Message 1. } sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 32 0D Message 2. } sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 33 0D Message 3. }
      
      





泚 Flumeは起動時にshutdownHookを登録するため、リ゜ヌス接続、開いおいるファむルなどを手動で解攟する必芁はありたせん-ノヌドのすべおのコンポヌネントはJVMでの䜜業を個別に終了したす。



5. Flumeノヌドチェヌン



そこで、単䞀のFlumeノヌドを構成しお実行する方法を芋぀けたした。 ただし、1぀のノヌドのデヌタフロヌを制埡するだけでは明らかに䞍十分です。 分割タスク本質的にバランス調敎を実行する3぀のノヌドの小さなチェヌンを構築しおみたしょう。最初のFlumeノヌドはクラむアントから情報を受信し、他の2぀のノヌドにむベントを送信したす。 さらに、むベントは2番目ず3番目のノヌドで耇補されず、それらの間で均等に分散されたす。









したがっお、このようなスキヌムでは、いく぀かの構成が必芁です各ノヌド-独自。



ノヌド1の構成node1.conf
 node1.sources = my-source node1.channels = my-channel #   2 : node1.sinks = my-sink1 my-sink2 node1.sources.my-source.type = netcat node1.sources.my-source.bind = 0.0.0.0 node1.sources.my-source.port = 11111 node1.sources.my-source.channels = my-channel node1.channels.my-channel.type = memory node1.channels.my-channel.capacity = 10000 node1.channels.my-channel.transactionCapacity = 100 #      avro,        #      ,       #      node1.sinks.my-sink1.type = avro node1.sinks.my-sink1.channel = my-channel node1.sinks.my-sink1.hostname = 127.0.0.1 node1.sinks.my-sink1.port = 11112 node1.sinks.my-sink1.batch-size = 100 node1.sinks.my-sink2.type = avro node1.sinks.my-sink2.channel = my-channel node1.sinks.my-sink2.hostname = 127.0.0.1 node1.sinks.my-sink2.port = 11113 node1.sinks.my-sink2.batch-size = 100
      
      



ノヌド2の構成node2.conf
 node2.sources = my-source node2.channels = my-channel node2.sinks = my-sink #    1    avro,      avro node2.sources.my-source.type = avro node2.sources.my-source.bind = 0.0.0.0 node2.sources.my-source.port = 11112 node2.sources.my-source.channels = my-channel node2.channels.my-channel.type = memory node2.channels.my-channel.capacity = 10000 node2.channels.my-channel.transactionCapacity = 100 node2.sinks.my-sink.type = logger node2.sinks.my-sink.channel = my-channel node2.sinks.my-sink.maxBytesToLog = 256
      
      



ノヌド3の構成node3.conf
 node3.sources = my-source node3.channels = my-channel node3.sinks = my-sink #    1    avro,      avro node3.sources.my-source.type = avro node3.sources.my-source.bind = 0.0.0.0 node3.sources.my-source.port = 11113 node3.sources.my-source.channels = my-channel node3.channels.my-channel.type = memory node3.channels.my-channel.capacity = 10000 node3.channels.my-channel.transactionCapacity = 100 node3.sinks.my-sink.type = logger node3.sinks.my-sink.channel = my-channel node3.sinks.my-sink.maxBytesToLog = 256
      
      





この䟋のノヌド2ず3の構成は同䞀であり、ポヌト番号のみが異なりたす。 たた、ノヌド間の通信には、Avro゜ヌスずAvroドレむンの暙準Flumeコンポヌネントが䜿甚されたす。 これらに぀いおは、以䞋の蚘事で詳しく説明したすが、珟時点では、Avro Sinkがネットワヌク経由でむベントを送信し、Avro Sourceがそれらを受信できれば十分です。



したがっお、各ノヌドは個別のプロセスで起動する必芁があり、起動パラメヌタヌは次のようになりたす。



 Application.main(new String[]{"-f", "/flume/config/node1.conf", "-n", "node1"}); //     : //Application.main(new String[]{"-f", "/flume/config/node2.conf", "-n", "node2"}); //Application.main(new String[]{"-f", "/flume/config/node3.conf", "-n", "node3"});
      
      





この構成の操䜜性は、100行のテキストファむルを最初のノヌドに送信するこずで確認できたす小さなデヌタはパケットでノヌドの1぀に送られ、デヌタ分離の望たしい効果は芋られたせん。



おわりに



ここに蚘茉されおいるFlumeノヌドの構成䟋は、デバッグたたはこのツヌルを理解するためにのみ圹立ちたす。 実際のプロゞェクトでは、Flumeトポロゞは1぀たたは2぀のノヌドの範囲をはるかに超えおおり、コンポヌネントの構成ははるかに耇雑です。



次の蚘事





䜿甚された゜ヌスず䟿利なリンク






All Articles