æåã®èšäºã§ã¯ãFlumeã®åºæ¬çãªèŠçŽ ããã®èšå®ãããã³Flumeã®èµ·åæ¹æ³ã«çŠç¹ãåœãŠãŠããŸãã Habrã®ãªãŒãã³ã¹ããŒã¹ã«ã¯ãFlumeã®æäœæ¹æ³ã«é¢ããèšäºãæ¢ã«ãããŸã ããã®ãããããã€ãã®åºæ¬çãªã»ã¯ã·ã§ã³ã¯ããã«éåžžã«ãã䌌ãŠããŸãã
ãµã€ã¯ã«ã®ç¶ãã§ã¯ãFlumeã®åã³ã³ããŒãã³ãã®è©³çŽ°ã説æããç£èŠãæ§æããæ¹æ³ã«ã€ããŠèª¬æããèŠçŽ ã®1ã€ãç¬èªã«å®è£ ããããšãªã©ã«ã€ããŠèª¬æããŸãã
1. Flumeãšã¯äœã§ããïŒ
Flumeã¯ãããŒã¿ãããŒã管çããæçµçã«ãããããå®å ãïŒããšãã°ããã¡ã€ã«ã·ã¹ãã ãHDFSïŒã«è»¢éã§ããããŒã«ã§ãã
äžè¬ã«ãFlumeãä»ããããŒã¿è»¢éã®æ§æã¯ãäžçš®ã®ãã³ã³ãã€ãŒããŸãã¯ã絊氎ãã®äœæã«äŒŒãŠããŸãã ãã®ããã€ãã©ã€ã³ãã¯ãããŒã¿ã®ãããŒãå¶åŸ¡ãããããŸããŸãªã»ã¯ã·ã§ã³ïŒããŒãïŒã§æ§æãããŠããŸãïŒãã£ã«ã¿ãªã³ã°ãã¹ããªãŒã ã®åå²ãªã©ïŒã
Flumeã¯ãããŒã¿ã転éããããã®ä¿¡é Œã§ãã䟿å©ãªããŒã«ã§ãã ä¿¡é Œæ§ã¯ãäž»ã«ãã©ã³ã¶ã¯ã·ã§ã³ããŒã¿éä¿¡ã«ãã£ãŠä¿èšŒãããŸãã ã€ãŸã FlumeããŒããã§ãŒã³ãæ£ããæ§æãããŠããã°ãããŒã¿ã倱ãããããå®å šã«è»¢éãããªãç¶æ³ã¯ããåŸãŸããã å©äŸ¿æ§ã¯æ§æã®æè»æ§ã«ãããŸã-ã»ãšãã©ã®ã¿ã¹ã¯ã¯ããã€ãã®ãã©ã¡ãŒã¿ãŒãæ§æã«è¿œå ããããšã§è§£æ±ºãããŸãããããè€éãªãã©ã¡ãŒã¿ãŒã¯ç¬èªã®FlumeèŠçŽ ãäœæããããšã§è§£æ±ºã§ããŸãã
æåã«åºæ¬çãªçšèªã®æŠèŠã説æãã次ã«åäžã®FlumeããŒãã®æ§é ã調ã¹ãŸãã
2.éèŠãªçšèª
- ã€ãã³ãïŒã€ãã³ãïŒ -è¿œå ã®ã¡ã¿æ
å ±ãæã€ããŒã¿ã®åäœã ã€ãã³ãã®æ§é ã¯ãPOSTèŠæ±ã«äŒŒãŠããŸãã
- èŠåºãïŒããããŒïŒ -ã¡ã¿æ å ±ããããŒã-ãå€ãã®ãã¢ã®ã»ããã
- ã³ã³ãã³ãïŒæ¬äœïŒ -å®éã«ã¯ããã¹ãŠãéå§ãããããã®ããŒã¿ã ãã€ã[]ãšããŠæž¡ãããŸãã
- ã¯ã©ã€ã¢ã³ãã¯ãFlumeãã¹ãã«é¢ããŠããŒã¿ãæäŸããå€éšãµãŒãã¹ã§ãã
- ãœãŒã¹ -ããŒã¿ã®åä¿¡ãæ
åœããŸãã åæã«ãFlumeã¯EventDrivenSourceãšPollableSourceã®2çš®é¡ã®ãœãŒã¹ãæäŸããŸãã æåã®ã±ãŒã¹ã§ã¯ããœãŒã¹ã¯ã€ãã³ãããã£ãã«ã«è¿œå ããã¿ã€ãã³ã°ã決å®ããŸãïŒããšãã°ãHTTPSourceã¯HTTPèŠæ±ã®åä¿¡æã«ã€ãã³ããè¿œå ããŸãïŒã EventDrivenSourceãšã¯å¯Ÿç
§çã«PollableSourceã¯ååçã§ã-Flumeã¯ãå®æçã«æ°ããã€ãã³ãã®ãœãŒã¹ãããŒãªã³ã°ããŸãã
- ã·ã³ã¯ã¯ãããŒã¿ã次ã®åŠç段éã«è»¢éããã³ã³ããŒãã³ãã§ãã å¥ã®FlumeããŒãããã¡ã€ã«ã·ã¹ãã ãHDFSãªã©ã䜿çšã§ããŸãã
- ãã£ãã«ã¯ãããŒã¿ã転éãããšãã«ãããã¡ãšããŠæ©èœããã³ã³ããŒãã³ãã§ãã ãã£ãã«ã¯ååçãªã³ã³ããŒãã³ãã§ãããããèªäœã§ã¯ã¢ã¯ã·ã§ã³ãéå§ããŸããã ãœãŒã¹ã¯ã€ãã³ãããã£ãã«ã«è¿œå ãããã¬ã€ã³ã¯ç©ºã«ããŸãã
- ãšãŒãžã§ã³ãã¯ãFlumeã³ã³ããŒãã³ãïŒãœãŒã¹ããã£ãã«ããã¬ã€ã³ïŒãæ©èœããããã»ã¹ã§ãã äžè¬ã«ãJVMã€ã³ã¹ã¿ã³ã¹ã 1ã€ã®ããŒãã«è€æ°ã®ãšãŒãžã§ã³ããå«ããããšãã§ããŸãã
3. FlumeããŒãã®æ§é
ãã®ãµãã»ã¯ã·ã§ã³ãããã«ãŒã ãšãŒãžã§ã³ãæ§é ããšåŒã¶æ¹ãæ£ããã§ãããã FlumeããŒãã¯è€æ°ã®ãšãŒãžã§ã³ãã§æ§æã§ããŸãã ãããããã®èšäºã®ãã¬ãŒã ã¯ãŒã¯ã§ã¯ããã¹ãŠã®äŸãã1ã€ã®ããŒã-1ã€ã®ãšãŒãžã§ã³ãããšããŠæäŸãããã®ã§ãç§ã¯èªç±ãèªãããããã®æŠå¿µãä»ã®ãšããå ±æããŸããã
ããŸããŸãªã©ã€ãã±ãŒã¹ã®ããã€ãã®æ§æãèããŠã¿ãŸãããã
ã·ã³ãã«ãªçµã³ç®
åçŽãªããŒããšã¯ã ãœãŒã¹âãã£ã³ãã«âãã¬ã€ã³ã®ã¿ãå¯èœãªæå°ã®Flumeæ§æãæå³ããŸãã
ãã®æ§æã¯åçŽãªç®çã«äœ¿çšã§ããŸããããšãã°ãããŒãã¯ãæ°Žéãã®ããŒããã§ãŒã³ã®ééã§ããã1ã€ã®åœ¹å²ã®ã¿ãå®è¡ããŸããããŒã¿ãåä¿¡ãããã¡ã€ã«ã«æžã蟌ã¿ãŸãïŒãã¬ã€ã³ã¯èšé²ã«çŽæ¥é¢äžããŸãïŒã ãŸãã¯ãããŒãã¯äžéã§ãåã«ããŒã¿ãããã«è»¢éããŸãïŒãã©ãŒã«ããã¬ã©ã³ã¹ã確ä¿ããããã«ãããè¡ããšäŸ¿å©ãªå ŽåããããŸããããšãã°ããããã¯ãŒã¯ã®åé¡ãçºçããå Žåã®ããŒã¿æ倱ãé²ãããã«Flumeã¯ã©ã€ã¢ã³ããåãããã·ã³ã«ãã®ãããªããŒããå±éããŸãïŒ
ä»åã
ããŒã¿ãåé¢ããããã«äœ¿çšã§ããããè€éãªäŸã ããã§ã¯ãåäžã®ãã¬ã€ã³ãšæ¯èŒããŠç¶æ³ãå°ãç°ãªããŸãã2ã€ã®ãã£ãã«ããã£ãã«ããã¬ã€ã³ããŸãã ããã«ãããçä¿¡ã€ãã³ãã2ã€ã®ã·ã³ã¯ã«åå²ãããŸãïŒè€è£œã§ã¯ãªãåå²ãããŸãïŒã ãã®æ§æã䜿çšããŠãè€æ°ã®ãã·ã³éã§è² è·ãå ±æã§ããŸãã ããã«ããšã³ããã·ã³ã®1ã€ã«é害ãçºçããããã«æ¥ç¶ããããã¬ã€ã³ãã€ãã³ããéä¿¡ã§ããªãå Žåãä»ã®ãããŒã¯æ£åžžã«åäœãç¶ããŸãã åœç¶ã®ããšãªããããã®äœæ¥ãã·ã³ã§ã¯ã2ã€ãèšããŸããå¿ èŠããããŸãã
æ³šïŒ Flumeã«ã¯ããã¬ã€ã³éã§è² è·ãåæ£ããããã®ããåªããããŒã«ããããŸãããã®ããããã®Flumeã·ã³ã¯ããã»ããµã䜿çšãããŸãã ãããã«ã€ããŠã¯ããµã€ã¯ã«ã®æ¬¡ã®éšåã§èª¬æããŸãã
è€åæ©
ãã®ãããªFlumeããŒãã§ã¯ãåãã€ãã³ããè€æ°ã®ã·ã³ã¯ã«éä¿¡ã§ããŸãã åé¡ãçºçããå¯èœæ§ããããŸã-ãªã2ã€ã®ãã£ãã«ããã£ãã«ã2ã€ã®ãããŒã§åæã«ã€ãã³ããè€è£œã§ããªãã®ã§ããïŒ ãã€ãã³ãããããŒããã£ã¹ããããã£ã³ãã«ãã§ã¯ãªããããã¬ã€ã³ããã£ã³ãã«ã空ã«ããããããçãã¯ããŒã§ãã ãã®ãããªã¡ã«ããºã ãååšãããšããŠãããã¬ã€ã³ã®1ã€ãæ éãããšãä»ã®ãã¬ã€ã³ãåäœäžèœã«ãªããŸãïŒãã£ãã«ã¯ãå šå¡ãã§ãããã誰ãã§ããªãããšããååã«åºã¥ããŠåäœããå¿ èŠãããããïŒã ããã¯ããã¬ã€ã³ã¬ãã«ã§é害ãçºçããå Žåãéä¿¡ãããã€ãã³ããã±ããã¯ãã©ãã«ããæ¶ããããã£ãã«ã«æ®ã£ããŸãŸã«ãªãããã§ãã ãã©ã³ã¶ã¯ã·ã§ã³çšã
泚 ïŒãã®äŸã§ã¯ãç¡æ¡ä»¶ã®è€è£œã䜿çšãããŠããŸã-ã€ãŸã ãã¹ãŠã®ãã£ãã«ãäž¡æ¹ã®ãã£ãã«ã«ã³ããŒãããŸãã Flumeã§ã¯ãè€è£œããã®ã§ã¯ãªããç¹å®ã®æ¡ä»¶ã«åŸã£ãŠã€ãã³ããåé¢ã§ããŸãããã®ããã«ã¯ãFlume Channel Selectorã䜿çšããŸãã 圌ã¯ãŸãããµã€ã¯ã«ã®ä»¥äžã®èšäºã§è°è«ãããŸãã
ãŠãããŒãµã«ã¬ã·ãŒããŒ
å¥ã®äŸ¿å©ãªèšå®ãªãã·ã§ã³ã¯ãè€æ°ã®ãœãŒã¹ã䜿çšããããšã§ãã ããŸããŸãªæ¹æ³ã§ååŸããåãã¿ã€ãã®ããŒã¿ããããŒãžãããå¿ èŠãããå Žåã«éåžžã«äŸ¿å©ãªæ§æã
èŠçŽïŒ
- ããŒãã«ã¯ãå€ãã®ãœãŒã¹ããã£ãã«ãããã³ãã¬ã€ã³ãçµã¿èŸŒãããšãã§ããŸãã
- 1ã€ã®ãœãŒã¹ãè€æ°ã®ãã£ãã«ã«ã€ãã³ããè¿œå ã§ããŸãïŒã«ãŒã«ã«åŸã£ãŠè€è£œãŸãã¯é åžããŸãïŒã
- è€æ°ã®ãœãŒã¹ã1ã€ã®ãã£ãã«ã§ã€ãã³ããã¹ã¿ãã¯ããŸãã
- 1ã€ã®ãã¬ã€ã³ã¯ã1ã€ã®ãã£ãã«ã®ã¿ã§æ©èœããŸãã
- è€æ°ã®ã·ã³ã¯ã1ã€ã®ãã£ãã«ããã€ãã³ããåãåãããšãã§ããŸãïŒåçã«ããŸãã¯äœããã®ãã©ã³ã¹èŠåã«åŸã£ãŠïŒã
4. FlumeããŒãã®æ§æãšèµ·å
å®çšçãªäŸãæ¥ããšæããŸãã æšæºã®Flumeããã±ãŒãžã«ã¯ãããŸããŸãªç¶æ³ã«å¯Ÿå¿ããå€ãã®ãœãŒã¹/ãã£ãã«/ã·ã³ã¯ã®å®è£ ãå«ãŸããŠããŸã ããããã®æ§ææ¹æ³ã«ã€ããŠã¯ã ãã¡ããåç §ããŠãã ãã ã ãã®èšäºã§ã¯ãæãåçŽãªã³ã³ããŒãã³ãã®å®è£ ã«éå®ããŸãã
- MemchannelïŒRAMã䜿çšããŠã€ãã³ããä¿åãããã£ãã«ïŒã
- NetCatãœãŒã¹ã
- ãã¬ãŒã·ã³ã¯ïŒã€ãã³ããã³ã³ãœãŒã«ã«åºåããã¹ããã¯ïŒã
ãããããããã¯FlumeããŒãã®æãåçŽãªæ§æã§ãã
### ==================== ==================== ### # , : , # <agent>.sources - , ( : my_source) my-agent.sources = my-source # <agent>.channels - my-agent.channels = my-channel # <agent>.sinks - my-agent.sinks = my-sink ### ==================== my_source ================== ### # - netcat ( Flume -, # , .., ) my-agent.sources.my-source.type = netcat # , my-agent.sources.my-source.bind = 0.0.0.0 my-agent.sources.my-source.port = 11111 # ( , ), my-agent.sources.my-source.channels = my-channel ### ==================== my_channel ================== ### # Flume - memory ( , ), my-agent.channels.my-channel.type = memory # , _ my-agent.channels.my-channel.capacity = 10000 # ( , "") my-agent.channels.my-channel.transactionCapacity = 100 ### ==================== my_sink ================== ### # - , ( ) my-agent.sinks.my-sink.type = logger # my-agent.sinks.my-sink.channel = my-channel # logger - my-agent.sinks.my-sink.maxBytesToLog = 256
ãã®èšå®ã§ããŒããèµ·åããããšã¯ä»ã§ãæ®ã£ãŠããŸãã ãããè¡ãã«ã¯2ã€ã®æ¹æ³ããããŸãã
- Hadoopã¯ã©ã¹ã¿ãŒã§ã¯ãCloudera Managerã䜿çšããŸãïŒ ãã®èšäºã§ã¯ããããè¡ãæ¹æ³ã«ã€ããŠè©³ãã説æããŠããŸãïŒã
- Flumeã©ã€ãã©ãªã䜿çšããJavaãµãŒãã¹ãšããŠã
Cloudera Managerã䜿çšããFlumeã®èµ·åããã»ã¹ã¯è©³çŽ°ã«èª¬æãããŠãããããJavaã䜿çšãã2çªç®ã®ãªãã·ã§ã³ãæ€èšããŠãã ããã
ãŸããFlumeã®äŸåé¢ä¿ããããžã§ã¯ãã«è¿œå ããå¿ èŠããããŸãã ãããè¡ãã«ã¯ãCloderaãªããžããªãš2ã€ã®Flumeã¢ãŒãã£ãã¡ã¯ãã ng-sdkããã³ng-nodeãpom.xmlã«è¿œå ããŸãã
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.5.0-cdh5.3.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-node</artifactId> <version>1.5.0-cdh5.3.0</version> </dependency> </dependencies>
ãã®åŸããšã³ããªãã€ã³ããæã€ã¯ã©ã¹ãäœæããŸãã
package ru.flume.samples; import org.apache.flume.node.Application; public class FlumeLauncher { public static void main(String[] args) { // Log4j System.setProperty("log4j.configuration", "file:/flume/config/log4j.properties"); // Flume : Application.main(new String[]{ "-f", "/flume/config/sample.conf", // "-n", "my-agent" // }); } }
Javaã«ç²ŸéããŠããèªè ã¯ããã®ã¯ã©ã¹ãäœæããå¿ èŠã¯ãªããFlumeã«å¿ èŠãªäŸåé¢ä¿ãå¥ã®ãã©ã«ããŒã«ã³ããŒããå¿ èŠãªã³ãã³ãã©ã€ã³åŒæ°ã§Javaãå®è¡ããã ãã§ããããšã«æ°ä»ãã§ãããã ããããããã¯ãã§ã«å¥œã¿ã®åé¡ã§ããMavenèªäœããéçºããFlumeã³ã³ããŒãã³ããå«ãå¿ èŠãªäŸåé¢ä¿ããã¹ãŠãã«ã¢ãããããã¹ãŠãdebããã±ãŒãžã«æ éã«ã©ããããããšã奜ã¿ãŸãã
ãã¹ãŠã®ãã¹ãæ£ãããæ§æã«ãšã©ãŒãå«ãŸããŠããªãå Žåãã³ã³ãœãŒã«ã«Flumeããã®ãã®ãããªãã°ã衚瀺ãããŸãã
çµè«Flumeããã¹ãŠãããŸããã£ãå Žå
INFO main conf.FlumeConfiguration-åŠçïŒmy-sink INFO main conf.FlumeConfiguration-è¿œå ãããã·ã³ã¯ïŒmy-sinkãšãŒãžã§ã³ãïŒmy-agent INFO main conf.FlumeConfiguration-åŠçïŒmy-sink INFO main conf.FlumeConfiguration-åŠçïŒmy-sink INFO main conf.FlumeConfiguration-æ€èšŒåŸã®flumeèšå®ã«ã¯ããšãŒãžã§ã³ãã®èšå®ãå«ãŸããŸãïŒ[my-agent] INFOã¡ã€ã³node.AbstractConfigurationProvider-ãã£ãã«ã®äœæ INFO main channel.DefaultChannelFactory-ãã£ã³ãã«my-channelã¿ã€ãã¡ã¢ãªã®ã€ã³ã¹ã¿ã³ã¹ã®äœæ INFOã¡ã€ã³node.AbstractConfigurationProvider-äœæããããã£ãã«my-channel INFO main source.DefaultSourceFactory-ãœãŒã¹my-sourceã®ã€ã³ã¹ã¿ã³ã¹ã®äœæãnetcatãšå ¥å INFO main sink.DefaultSinkFactory-ã·ã³ã¯ã®ã€ã³ã¹ã¿ã³ã¹ã®äœæïŒmy-sinkãã¿ã€ãïŒlogger INFOã¡ã€ã³node.AbstractConfigurationProvider-[my-sourceãmy-sink]ã«æ¥ç¶ããããã£ãã«my-channel INFOã¡ã€ã³node.Application-æ°ããæ§æã®éå§ïŒ { sourceRunnersïŒ{ my-source = EventDrivenSourceRunnerïŒ{ ãœãŒã¹ïŒorg.apache.flume.source.NetcatSource { ååïŒmy-sourceã ç¶æ ïŒã¢ã€ãã« } } } sinkRunnersïŒ{ my-sink = SinkRunnerïŒ{ ããªã·ãŒïŒorg.apache.flume.sink.DefaultSinkProcessor@77f03bb1 counterGroupïŒ{nameïŒnull countersïŒ{}} } } ãã£ã³ãã«ïŒ{ my-channel = org.apache.flume.channel.MemoryChannel { ååïŒmy-channel } } } INFOã¡ã€ã³ããŒãã¢ããªã±ãŒã·ã§ã³-éå§ãã£ãã«my-channel INFOã¡ã€ã³node.Application-ãã£ãã«ãåŸ æ©ããŠããŸãïŒmy-channelãéå§ããŸãã 500ããªç§ã¹ãªãŒã INFO lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup-ã¿ã€ãïŒCHANNELãååïŒmy-channelïŒã®ç£èŠå¯Ÿè±¡ã«ãŠã³ã¿ãŒã°ã«ãŒãïŒæ°èŠMBeanãæ£åžžã«ç»é²ãããŸããã INFO lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup-ã³ã³ããŒãã³ãã¿ã€ãïŒCHANNELãååïŒmy-channel started INFOã¡ã€ã³ããŒãã¢ããªã±ãŒã·ã§ã³-Sink my-sinkã®éå§ INFOã¡ã€ã³node.Application-éå§ãœãŒã¹my-source INFO lifecycleSupervisor-1-1 source.NetcatSource-ãœãŒã¹éå§ INFO lifecycleSupervisor-1-1 source.NetcatSource-äœæãããserverSocketïŒsun.nio.ch.ServerSocketChannelImpl [/ 0ïŒ0ïŒ0ïŒ0ïŒ0ïŒ0ïŒ0ïŒ0ïŒ11111]
ãã¹ãŠãæ£ããæ©èœããããšã確èªããããã«ãNetCatãœãŒã¹ã«4è¡ãå«ãå°ããªãã¹ããã¡ã€ã«test.txtãéä¿¡ããŸãã
Message 1 Message 2 Message 3
ãã¡ã€ã«ãæ¹è¡ã§çµããããšãéèŠã§ãã NetCatãœãŒã¹ã®å Žåãã€ãã³ãã»ãã¬ãŒã¿ãŒã§ãã ãã¡ã€ã«ã®æåŸã«ãã®æ¹è¡ãè¿œå ããªãå ŽåããœãŒã¹ã¯æåŸã®ã€ãã³ããå®å šã«å°çããªãã£ããšæ³å®ããŸãã ãã®çµæã圌ã¯é åºã«ã»ãã¬ãŒã¿ãŒãåŸ ã€ããšã«ãªããŸãã ãã®ããã次ã®ã³ãã³ããå®è¡ããŸãã
nc 127.0.0.1 11111 < test.txt
ãã®çµæããã¡ã€ã«ã®ãã¹ãŠã®è¡ãFlumeãœãŒã¹ã«ãã£ãŠæ£åžžã«éåä¿¡ãããããšã確èªããããã«ãNetCatã¯3ã€ã®ãOKãã¡ãã»ãŒãžã衚瀺ããå¿ èŠããããŸãã åæã«ãåšåº«ã¯æ¬¡ã®ã¡ãã»ãŒãžãã³ã³ãœãŒã«ã«åºåããå¿ èŠããããŸãã
sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 31 0D Message 1. } sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 32 0D Message 2. } sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 33 0D Message 3. }
æ³šïŒ Flumeã¯èµ·åæã«shutdownHookãç»é²ããããããªãœãŒã¹ïŒæ¥ç¶ãéããŠãããã¡ã€ã«ãªã©ïŒãæåã§è§£æŸããå¿ èŠã¯ãããŸãã-ããŒãã®ãã¹ãŠã®ã³ã³ããŒãã³ãã¯JVMã§ã®äœæ¥ãåå¥ã«çµäºããŸãã
5. FlumeããŒããã§ãŒã³
ããã§ãåäžã®FlumeããŒããæ§æããŠå®è¡ããæ¹æ³ãèŠã€ããŸããã ãã ãã1ã€ã®ããŒãã®ããŒã¿ãããŒãå¶åŸ¡ããã ãã§ã¯æããã«äžååã§ãã åå²ã¿ã¹ã¯ïŒæ¬è³ªçã«ãã©ã³ã¹èª¿æŽïŒãå®è¡ãã3ã€ã®ããŒãã®å°ããªãã§ãŒã³ãæ§ç¯ããŠã¿ãŸããããæåã®FlumeããŒãã¯ã¯ã©ã€ã¢ã³ãããæ å ±ãåä¿¡ããä»ã®2ã€ã®ããŒãã«ã€ãã³ããéä¿¡ããŸãã ããã«ãã€ãã³ãã¯2çªç®ãš3çªç®ã®ããŒãã§è€è£œãããããããã®éã§åçã«åæ£ãããŸãã
ãããã£ãŠããã®ãããªã¹ããŒã ã§ã¯ãããã€ãã®æ§æãå¿ èŠã§ãïŒåããŒã-ç¬èªïŒã
ããŒã1ã®æ§æïŒnode1.confïŒ
node1.sources = my-source node1.channels = my-channel # 2 : node1.sinks = my-sink1 my-sink2 node1.sources.my-source.type = netcat node1.sources.my-source.bind = 0.0.0.0 node1.sources.my-source.port = 11111 node1.sources.my-source.channels = my-channel node1.channels.my-channel.type = memory node1.channels.my-channel.capacity = 10000 node1.channels.my-channel.transactionCapacity = 100 # avro, # , # node1.sinks.my-sink1.type = avro node1.sinks.my-sink1.channel = my-channel node1.sinks.my-sink1.hostname = 127.0.0.1 node1.sinks.my-sink1.port = 11112 node1.sinks.my-sink1.batch-size = 100 node1.sinks.my-sink2.type = avro node1.sinks.my-sink2.channel = my-channel node1.sinks.my-sink2.hostname = 127.0.0.1 node1.sinks.my-sink2.port = 11113 node1.sinks.my-sink2.batch-size = 100
ããŒã2ã®æ§æïŒnode2.confïŒ
node2.sources = my-source node2.channels = my-channel node2.sinks = my-sink # 1 avro, avro node2.sources.my-source.type = avro node2.sources.my-source.bind = 0.0.0.0 node2.sources.my-source.port = 11112 node2.sources.my-source.channels = my-channel node2.channels.my-channel.type = memory node2.channels.my-channel.capacity = 10000 node2.channels.my-channel.transactionCapacity = 100 node2.sinks.my-sink.type = logger node2.sinks.my-sink.channel = my-channel node2.sinks.my-sink.maxBytesToLog = 256
ããŒã3ã®æ§æïŒnode3.confïŒ
node3.sources = my-source node3.channels = my-channel node3.sinks = my-sink # 1 avro, avro node3.sources.my-source.type = avro node3.sources.my-source.bind = 0.0.0.0 node3.sources.my-source.port = 11113 node3.sources.my-source.channels = my-channel node3.channels.my-channel.type = memory node3.channels.my-channel.capacity = 10000 node3.channels.my-channel.transactionCapacity = 100 node3.sinks.my-sink.type = logger node3.sinks.my-sink.channel = my-channel node3.sinks.my-sink.maxBytesToLog = 256
ãã®äŸã®ããŒã2ãš3ã®æ§æã¯åäžã§ãããããŒãçªå·ã®ã¿ãç°ãªããŸãã ãŸããããŒãéã®éä¿¡ã«ã¯ãAvroãœãŒã¹ãšAvroãã¬ã€ã³ã®æšæºFlumeã³ã³ããŒãã³ãã䜿çšãããŸãã ãããã«ã€ããŠã¯ã以äžã®èšäºã§è©³ãã説æããŸãããçŸæç¹ã§ã¯ãAvro Sinkããããã¯ãŒã¯çµç±ã§ã€ãã³ããéä¿¡ããAvro Sourceãããããåä¿¡ã§ããã°ååã§ãã
ãããã£ãŠãåããŒãã¯åå¥ã®ããã»ã¹ã§èµ·åããå¿ èŠããããèµ·åãã©ã¡ãŒã¿ãŒã¯æ¬¡ã®ããã«ãªããŸãã
Application.main(new String[]{"-f", "/flume/config/node1.conf", "-n", "node1"}); // : //Application.main(new String[]{"-f", "/flume/config/node2.conf", "-n", "node2"}); //Application.main(new String[]{"-f", "/flume/config/node3.conf", "-n", "node3"});
ãã®æ§æã®æäœæ§ã¯ã100è¡ã®ããã¹ããã¡ã€ã«ãæåã®ããŒãã«éä¿¡ããããšã§ç¢ºèªã§ããŸãïŒå°ããªããŒã¿ã¯ãã±ããã§ããŒãã®1ã€ã«éãããããŒã¿åé¢ã®æãŸããå¹æã¯èŠãããŸããïŒã
ãããã«
ããã«èšèŒãããŠããFlumeããŒãã®æ§æäŸã¯ããããã°ãŸãã¯ãã®ããŒã«ãç解ããããã«ã®ã¿åœ¹ç«ã¡ãŸãã å®éã®ãããžã§ã¯ãã§ã¯ãFlumeããããžã¯1ã€ãŸãã¯2ã€ã®ããŒãã®ç¯å²ãã¯ããã«è¶ ããŠãããã³ã³ããŒãã³ãã®æ§æã¯ã¯ããã«è€éã§ãã
次ã®èšäºïŒ
- ããããŒãšãã£ãã«ã»ã¬ã¯ã¿ãŒïŒãã£ãã«ã»ã¬ã¯ã¿ãŒïŒã®äœ¿çšã
- Flumeã®ãæŠéãã³ã³ããŒãã³ãïŒ
- ã¢ãããœãŒã¹
- ãã¡ã€ã«ãã£ã³ãã«
- ã¢ããã·ã³ã¯;
- HDFSã·ã³ã¯;
- ãã¡ã€ã«ããŒã«ã·ã³ã¯ã
- FlumeããŒãã¹ããŒã¿ã¹ã®ç£èŠã
䜿çšããããœãŒã¹ãšäŸ¿å©ãªãªã³ã¯
- Apache Flumeå ¬åŒããŒãž
- å ¬åŒFlumeã³ã³ããŒãã³ããã¥ãŒãã³ã°ã¬ã€ã
- HadoopããŒã2ïŒFlumeãä»ããããŒã¿ã®åé| Selectelã®ããã°ã¯ãClouderaã§Flumeãã»ããã¢ããããããšã«é¢ããèšäºã§ãã
- Hari ShreedharanïŒFlumeã®äœ¿çšã¯ã Flumeã®æ©èœã説æããè¯ãæ¬ã§ãã