ãã¡ã€ã«ãã£ã³ãã«
ååã®èšäºã§ã¯ãã¡ã¢ãªãŒãã£ã³ãã«ã«ã€ããŠèª¬æããŸããã æããã«ãã¡ã¢ãªã䜿çšããŠããŒã¿ãä¿åãããã£ãã«ã¯ä¿¡é Œã§ããŸããã ããŒããåèµ·åãããšããã£ãã«ã«ä¿åãããŠãããã¹ãŠã®ããŒã¿ã倱ãããŸãã ããã«ãããã¡ã¢ãªãã£ãã«ã圹ã«ç«ããªããªãããšã¯ãããŸããããã®é床ã®ããã«äœ¿çšãéåžžã«æ£åœåãããå ŽåããããŸãã ãã ããçã«ä¿¡é Œã§ãã茞éã·ã¹ãã ã«ã¯ãããå ç¢ãªãœãªã¥ãŒã·ã§ã³ãå¿ èŠã§ãã
ãã®ãœãªã¥ãŒã·ã§ã³ã¯ããã¡ã€ã«ãã£ãã«-ãã¡ã€ã«ãã£ãã«ã§ãã ãã®ãã£ã³ãã«ãããŒã¿ããã¡ã€ã«ã«ä¿åããŠãããšæšæž¬ããã®ã¯ç°¡åã§ãã åæã«ããã£ãã«ã¯ã©ã³ãã ã¢ã¯ã»ã¹ã䜿çšããŠãã¡ã€ã«ãæäœãããããã€ãã³ãã®ã·ãŒã±ã³ã¹ãç¶æããªããã€ãã³ããè¿œå ããã³åéã§ããŸãã é«éããã²ãŒã·ã§ã³ã®ããã«ããã£ãã«ã¯ã©ãã«ïŒãã§ãã¯ãã€ã³ãïŒã®ã·ã¹ãã ã䜿çšããWALã¡ã«ããºã ãå®è£ ãããŠããŸãã ããã¯ãã¹ãŠãäžè¬ã«ããã£ãã«ã®ãå éšãã«é ãããŠããã次ã®ãã©ã¡ãŒã¿ãŒã䜿çšããŠæ§æãããŸãïŒå€ªå-å¿ é ãã©ã¡ãŒã¿ãŒïŒã
ãã©ã¡ãŒã¿ | 説æ | ããã©ã«ã㧠|
---|---|---|
ã¿ã€ã | ãã£ãã«å®è£ ã ãã¡ã€ã«ãæå®ããå¿ èŠããããŸã | - |
checkpointDir | ã¿ã°ä»ãã®ãã¡ã€ã«ãä¿åããããã®ãã©ã«ããŒã æå®ããªãå Žåããã£ãã«ã¯FlumeããŒã ãã©ã«ããŒã䜿çšããŸãã | $ HOME / ... |
useDualCheckpoints | ã¿ã°ä»ãã®ããã¯ã¢ãããã©ã«ããäœæããŸãã | åœ |
backupCheckpointDir | ã©ãã«ä»ããã¡ã€ã«ã®ããã¯ã¢ããçšã®ãã©ã«ããŒãuseDualCheckpoints= trueã®å Žåã¯ãã¡ããæå®ããå¿ èŠããããŸãïŒãã¡ããããã®ããã¯ã¢ããã¯å ã®ãã£ã¹ã¯ïŒããšãã°ãå¥ã®ãã£ã¹ã¯ïŒããé ãããå¿ èŠããããŸãïŒã | - |
dataDirs | ããŒã¿ãã¡ã€ã«ãé 眮ãããã³ã³ãã§åºåããããã©ã«ããŒã®ãªã¹ãã ããã©ãŒãã³ã¹ãåäžãããã«ã¯ãç°ãªããã©ã€ãã«è€æ°ã®ãã©ã«ããŒãæå®ããããšããå§ãããŸãã ãã©ã«ããŒãæå®ãããŠããªãå Žåããã£ãã«ã¯FlumeããŒã ãã©ã«ããŒã䜿çšããŸãã | $ HOME / ... |
èœå | ãã£ãã«å®¹éãã€ãã³ãã®æ°ã瀺ããŸãã | 1,000,000 |
transactionCapacity | 1ã€ã®ãã©ã³ã¶ã¯ã·ã§ã³ã®ã€ãã³ãã®æ倧æ°ã ãã©ã³ã¹ããŒãã·ã¹ãã å šäœã®ããã©ãŒãã³ã¹ãäŸåããéåžžã«éèŠãªãã©ã¡ãŒã¿ãŒã 詳现ã«ã€ããŠã¯ã以äžã«èª¬æããŸãã | 10,000 |
checkpointInterval | æ°ããã¿ã°ã®äœæééïŒããªç§åäœïŒã ã¿ã°ã¯åèµ·åæã«éèŠãªåœ¹å²ãæããããã£ãã«ç¶æ ã埩å ããªããããŒã¿ãã¡ã€ã«ã®ã»ã¯ã·ã§ã³ãããžã£ã³ããªãŒããŒãã§ããŸãã ãã®çµæããã£ãã«ã¯ããŒã¿ãã¡ã€ã«å šäœãåèªã¿åãããªãããããã£ãã«ããè©°ãŸã£ãŠãããå Žåã®èµ·åãå€§å¹ ã«é«éåãããŸãã | 30000 |
checkpointOnClose | ãã£ã³ãã«ãéãããšãã«ã©ãã«ãèšé²ããŸãã ãã¬ãŒãªã³ã°ã©ãã«ã䜿çšãããšãåèµ·åæã«ãã£ãã«ãã§ããã ãæ©ãå埩ã§ããŸããããã£ãã«ãéãããããšäœæã«æéãããããŸãïŒå®éã«ã¯ã»ãšãã©ãããŸããïŒã | æ¬åœ |
ããŒãã¢ã©ã€ã | ãã£ãã«ãžã®è¿œå æäœã®ã¿ã€ã ã¢ãŠãïŒç§åäœïŒã ã€ãŸãããã£ãã«ãè©°ãŸã£ãŠããå Žåããã©ã³ã¶ã¯ã·ã§ã³ã¯ãã°ããåŸ ã£ãŠããããã£ã³ã¹ãäžããŸããã ãŸãããã£ãã«ã«ç©ºãã¹ããŒã¹ããªãå Žåããã©ã³ã¶ã¯ã·ã§ã³ã¯ããŒã«ããã¯ãããŸãã | 3 |
maxFileSize | ãã£ãã«ãã¡ã€ã«ã®æ倧ãµã€ãºïŒãã€ãåäœïŒã ãã®ãã©ã¡ãŒã¿ãŒã®å€ã¯ããã£ãã«ãããã€ããã§ããã¹ããŒã¹ã決å®ãããã®ã§ã¯ãããŸããã1ã€ã®ããŒã¿ãã¡ã€ã«ã®ãµã€ãºãèšå®ãããã£ãã«ã¯ãããã®ãã¡ã€ã«ã®ããã€ããäœæã§ããŸãã | 2146435071ïŒ2GBïŒ |
minimumRequiredSpace | ãã£ã¹ã¯ã®ç©ºãé åããã®ãã©ã¡ãŒã¿ãŒã§æå®ãããŠãããããå°ãªãå Žåããã£ãã«ã¯æ°ããã€ãã³ããåä¿¡ããŸããã ããŒã¿ãã©ã«ããŒãè€æ°ã®ãã£ã¹ã¯ã«ããå ŽåãFlume㯠| 524288000ïŒ500MBïŒ
|
- FlumeãããŒã¿ããã©ã«ããŒã«æžã蟌ãæš©éãæã£ãŠããããšã確èªããŠãã ãã ã
ãŸãã¯ãããæ£ç¢ºã«ã¯ãFlumeã«ä»£ãã£ãŠèµ·åããããŠãŒã¶ãŒã¯ã ãã§ãã¯ãã€ã³ããšããŒã¿ã®ãã©ã«ããŒãžã®æžã蟌ã¿æš©éãæã£ãŠããŸã ã
- SSDã¯ãã£ãã«ã倧å¹
ã«é«éåããŸã ã
以äžã®ã°ã©ãã¯ããã¡ã€ã«ãã£ãã«ã䜿çšããŠ500ã€ãã³ãã®ãã±ãããFlumeããŒãã«éä¿¡ããã®ã«ããã£ãæéã瀺ããŠããŸãã ããŒãã®1ã€ã¯SSDã䜿çšããŠãã£ãã«ããŒã¿ãä¿åãããã1ã€ã¯SATAã䜿çšããŸãã éãã¯å€§ããã
åçŽãªåå²ãå®è¡ãããšãSSDäžã®ãã¡ã€ã«ãã£ãã«ãæã€FlumeããŒãã¯ã1ç§ãããæ倧500 / 0.025 = 20,000ã€ãã³ãããã€ãžã§ã¹ãã§ããŸãïŒåç §çšã«ããã®äŸã®ã¡ãã»ãŒãžãµã€ãºã¯çŽ1KBã§ããããã£ãã«ã¯ã¹ãã¬ãŒãžã«1ã€ã®ãã£ã¹ã¯ã®ã¿ã䜿çšããŸãïŒã
- ãã£ãã«å®¹éã¯ãå€åã«éåžžã«ææã§ãã
çªç¶ãã£ã³ãã«ã®å®¹éãå€æŽããããšã«æ±ºããå Žåãäžæå¿«ãªé©ããæãããããããŸãã-ããŒã¿ã埩å ããããã«ãã£ã³ãã«ããªãã¬ã€ãéå§ããŸãã ããã¯ããã§ãã¯ãã€ã³ããã¡ã€ã«ã䜿çšããŠããã«ããã²ãŒã·ã§ã³/ããŒã¿ãæäœãã代ããã«ããã£ãã«ããã¹ãŠã®ããŒã¿ãã¡ã€ã«ãå®å šã«å®è¡ããããšãæå³ããŸãã ãã£ãã«ã«å€§éã®ããŒã¿ãããå Žåãããã»ã¹ã«ã¯ããªãã®æéããããå¯èœæ§ããããŸãã
- ãã£ãã«ã®ç°åžžåæ¢ã¯ãããŒã¿æ倱ã«ã€ãªããå¯èœæ§ããããŸã ã
ããã¯ãFlumeããã»ã¹ã匷å¶çµäºïŒãŸãã¯ããŒããªã»ããïŒããå Žåã«çºçããå¯èœæ§ããããŸãã ãŸãã¯ãçºçããªãå ŽåããããŸãã ç§ã®èšæ¶ã§ã¯ãããã¯ç§ãã¡ã«äžåºŠã ãèµ·ãããŸãã-ããŒã¿ãã¡ã€ã«ã¯ãç Žæããããã£ã³ãã«ããŒã¿ãå«ããã¹ãŠã®ãã¡ã€ã«ãæåã§åé€ããå¿ èŠããããŸããïŒæ®å¿µãªããããã£ã³ãã«ã¯è©°ãŸãããæ倱ã¯åé¿ãããŸããïŒã ãããã£ãŠããã£ãã«ã¯ãŸã 100ïŒ ã®ä¿¡é Œæ§ãæäŸããŸãã-誰ãããã¹ã€ãããåŒã£åŒµããå¯èœæ§ãåžžã«ããã修埩äžå¯èœãªããšãèµ·ãããŸãã ããŠããããçºçãããã£ã³ãã«ãéå§ãæåŠããå Žåãããªãã®è¡åã¯æ¬¡ã®ããã«ãªãã§ãããïŒ
- ãã§ãã¯ãã€ã³ããåé€ããŠã¿ãŠãã ããâãã®å Žåããã£ãã«ã¯ããŒã¿ãã¡ã€ã«ããã®ã¿å埩ãè©Šã¿ãŸãã
- åã®æ®µèœã圹ã«ç«ããããã£ãã«ãããã£ãã«ããããŒã¿ãèªã¿åããŸããããã£ãã«ãéããããŸãããšããã¹ã¿ã€ã«ã§äœããæžã蟌ãå ŽåãããŒã¿ãã¡ã€ã«ã¯ç ŽæããŠããŸãã ããã§ã¯ããã¹ãŠã®ããŒã¿ãã©ã«ããŒã®å®å šãªã¯ãªãŒãã³ã°ã®ã¿ã圹ç«ã¡ãŸãã ããã
- ãã§ãã¯ãã€ã³ããåé€ããŠã¿ãŠãã ããâãã®å Žåããã£ãã«ã¯ããŒã¿ãã¡ã€ã«ããã®ã¿å埩ãè©Šã¿ãŸãã
File-Channelã®ä»£ãããšããŠãFlumeã¯ããã«ããã€ãã®ãã£ãã«ãæäŸããŸããç¹ã«ãããŒã¿ããŒã¹ããããã¡ãšããŠäœ¿çšããJDBC-channelãšKafka-channelãæäŸããŸãã ãã¡ããããã®ãããªãã£ãã«ã䜿çšããã«ã¯ãããŒã¿ããŒã¹ãšKafkaãå¥ã ã«ãããã€ããå¿ èŠããããŸãã
Avro Sourceããã³Avro Sink
Avroã¯ããŒã¿ã·ãªã¢ã«åããŒã«ã® 1ã€ã§ããããã®ãããã§ãœãŒã¹ãšã¹ããã¯ã«ååãä»ããããŸããã ãããã®ã³ã³ããŒãã³ãã®ãããã¯ãŒã¯ã¯ãNettyã䜿çšããŠå®è£ ãããŸãã åã®èšäºã§èª¬æããNetcat Sourceãšæ¯èŒããŠãAvro Sourceã«ã¯æ¬¡ã®å©ç¹ããããŸãã
- ã€ãã³ãã§ããããŒã䜿çšã§ããŸãïŒã€ãŸããããŒã¿ãšãšãã«ãµããŒãæ
å ±ãéä¿¡ããŸãïŒã
- è€æ°ã®ã¯ã©ã€ã¢ã³ãããåæã«æ å ±ãåä¿¡ã§ããŸãã Netcatã¯éåžžã®ãœã±ããã§å®è¡ãããäžé£ã®çä¿¡æ¥ç¶ãåãå ¥ããŸããã€ãŸããäžåºŠã«1ã€ã®ã¯ã©ã€ã¢ã³ãããåŠçã§ããŸããã
ããã§ã¯ãAvro SourceãæäŸããèšå®ãèŠãŠã¿ãŸãããã
ãã©ã¡ãŒã¿ | 説æ | ããã©ã«ã㧠|
ã¿ã€ã | ãœãŒã¹å®è£ ã avroãæå®ããå¿ èŠããããŸãã | - |
ãã£ã³ãã« | ãœãŒã¹ãã€ãã³ããéä¿¡ãããã£ãã«ïŒã¹ããŒã¹ã§åºåãããŸãïŒã | - |
çžã | ãœãŒã¹ãä¿®æ£ãããã¹ã/ IPã | - |
枯 | ãœãŒã¹ãã¯ã©ã€ã¢ã³ãããã®æ¥ç¶ãåãå ¥ããããŒãã | - |
ã¹ã¬ãã | çä¿¡ã€ãã³ãïŒI / Oã¯ãŒã«ãŒïŒãåŠçããã¹ã¬ããã®æ°ã å€ãéžæãããšãã¯ããã®ãœãŒã¹ã«ã€ãã³ããéä¿¡ããæœåšçãªé¡§å®¢ã®æ°ãã¬ã€ãããå¿ èŠããããŸãã å°ãªããšã2ã€ã®ã¹ã¬ãããèšå®ããå¿ èŠããããŸããããããªããšããœãŒã¹ã1ã€ãããªãå Žåã§ãããœãŒã¹ãåã«ããã³ã°ãããå¯èœæ§ããããŸãã å¿ èŠãªã¹ã¬ããæ°ãäžæãªå Žåã¯ãæ§æã§ãã®ãã©ã¡ãŒã¿ãŒãæå®ããªãã§ãã ããã | éå®ãããªã |
å§çž®ã¿ã€ã | ããŒã¿å§çž®ãããã«ã¯ããã€ãã®ãªãã·ã§ã³ããããŸã-noneãŸãã¯deflate ã ã¯ã©ã€ã¢ã³ããããŒã¿ãå§çž®åœ¢åŒã§éä¿¡ããå Žåã«ã®ã¿æå®ããå¿ èŠããããŸãã å§çž®ã«ãããã©ãã£ãã¯ãå€§å¹ ã«ç¯çŽã§ããäžåºŠã«éä¿¡ããã€ãã³ããå€ãã»ã©ãå€§å¹ ã«ç¯çŽã§ããŸãã | ãªã |
- selector.typeã¯ãåã®èšäºã§èšåãããã£ã³ãã«ã»ã¬ã¯ã¿ãŒã§ãã ããã€ãã®ã«ãŒã«ã«åŸã£ãŠãè€æ°ã®ãã£ãã«ã§ã€ãã³ããåå²ãŸãã¯è€è£œã§ããŸãã ã»ã¬ã¯ã¿ãŒã«ã€ããŠã¯ã以äžã§è©³ãã説æããŸãã
- ã€ã³ã¿ãŒã»ãã¿ãŒ -ã¹ããŒã¹ã§åºåãããã€ã³ã¿ãŒã»ãã¿ãŒã®ãªã¹ãã ã€ãã³ãããã£ãã«ã«å ¥ãåã«ã€ã³ã¿ãŒã»ãã¿ãŒãèµ·åããŸãã ãããã¯ãäœããã®æ¹æ³ã§ã€ãã³ããå€æŽããããã«äœ¿çšãããŸãïŒããšãã°ãããããŒãè¿œå ããããã€ãã³ãã®å 容ãå€æŽãããããïŒã ãããã«ã€ããŠã以äžã§èª¬æããŸãã
ãŸãããã®ãœãŒã¹ã«ã¯ãNettyãã£ã«ã¿ãŒãšããŒã¿æå·åèšå®ãçšæãããŠããŸãã ãã®ã³ãŒãã䜿çšããŠããã®ãœãŒã¹ã«ã€ãã³ããéä¿¡ã§ããŸãã
Avro Sourceã®ããªããã£ãJavaã¯ã©ã€ã¢ã³ã
import java.util.HashMap; import java.util.Map; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import org.apache.flume.event.SimpleEvent; public class FlumeSender { public static void main(String[] args) throws EventDeliveryException { RpcClient avroClient = RpcClientFactory.getDefaultInstance("127.0.0.1", 50001); Map<String, String> headers = new HashMap<>(); headers.put("type", "common"); Event event = EventBuilder.withBody(" ".getBytes(), headers); avroClient.append(event); avroClient.close(); } }
次ã«ãAvro-drainã®æ§æãæ€èšããŸãã
ãã©ã¡ãŒã¿ | 説æ | ããã©ã«ã㧠|
---|---|---|
ã¿ã€ã | æµåºã®å®è£ ã¯avroã§ç€ºãããå¿ èŠããããŸãã | - |
ãã£ã³ãã« | æ ªåŒãã€ãã³ããæç»ãããã£ã³ãã«ã | - |
ãã¹ãå | æ ªåŒãã€ãã³ããéä¿¡ãããã¹ã/ IPã | - |
枯 | æå®ããããã·ã³ïŒ hostname ïŒãã¯ã©ã€ã¢ã³ãã®æ¥ç¶ãåŸ æ©ããŠããããŒãã | - |
ããããµã€ãº | éåžžã«éèŠãªãã©ã¡ãŒã¿ãŒïŒ1åã®èŠæ±ã§ã¯ã©ã€ã¢ã³ãã«éä¿¡ãããã€ãã³ãã®ããã±ãããã®ãµã€ãºã åæã«ãåãå€ã䜿çšããŠãã£ãã«ã空ã«ããŸãã ã€ãŸããããã¯ã1ã€ã®ãã©ã³ã¶ã¯ã·ã§ã³ã§ãã£ãã«ããèªã¿åãããã€ãã³ãã®æ°ã§ããããŸãã | 100 |
æ¥ç¶ã¿ã€ã ã¢ãŠã | æ¥ç¶ã¿ã€ã ã¢ãŠãïŒãã³ãã·ã§ã€ã¯ïŒãããªç§åäœã | 20000 |
ãªã¯ãšã¹ãã¿ã€ã ã¢ãŠã | èŠæ±ã®ã¿ã€ã ã¢ãŠãïŒã€ãã³ãã®ãã±ããã®éä¿¡ïŒïŒããªç§åäœïŒã | 20000 |
ãªã»ããæ¥ç¶éé | ããã¹ãå€æŽãã®ééã ãã©ã³ãµãŒããµãŒãã¹ãæäŸããããã€ãã®ãã¹ãã¯ãæå®ããããã¹ãåã®åŸãã«é ããŠããå Žåãããããšãç解ãããŠããŸãã ãã®ãã©ã¡ãŒã¿ãŒã¯ãæå®ãããæéééã§åŒ·å¶çã«ã©ã³ãªãããã·ã³éã§åãæ¿ããŸãã ã©ã³ãªãã®äœæè ãèãã䟿å©ãã¯ãæ°ãããã·ã³ããã©ã³ãµãŒã®è²¬ä»»ãŸãŒã³ã«è¿œå ãããå ŽåãFlumeããŒããåèµ·åããå¿ èŠããªãããšã§ããã©ã³ãªãèªäœã¯å¥ã®ãå®å ããããããšãèªèããŸãã ããã©ã«ãã§ã¯ãã¹ããã¯ã¯ãã¹ãã®å€æŽãå®è¡ããŸããã | -1 |
maxIoWorkers | Avro Sourceã®ã¢ããã°ã¹ã¬ãã ã | 2 * PROC_CORES |
å§çž®ã¿ã€ã | Avro Sourceãšåãã§ãã éãã¯ãã·ã³ã¯ãããŒã¿ãå§çž®ããã®ã«å¯ŸããŠããœãŒã¹ã¯éã«ã¢ã³ããã¯ããããšã§ãã ãããã£ãŠãAvro SinkãAvro Sourceã«ã€ãã³ããéä¿¡ããå Žåãäž¡æ¹ã®å§çž®ã¿ã€ãã¯åãã§ããå¿ èŠããããŸãã | ãªã |
å§çž®ã¬ãã« | compression-type = deflateã®å Žåã®ã¿å§çž®ã¬ãã«ïŒ0-å§çž®ããªãã9-æ倧å§çž®ïŒã | 6 |
- ããããµã€ãºãæ
éã«éžæããŸã ã
ç§ãèšã£ãããã«ãããã¯éåžžã«éèŠãªãã©ã¡ãŒã¿ãŒã§ããããã®éžæãäžé©åã§ãããšã人çãèããæãªãå¯èœæ§ããããŸãã ãŸããããããµã€ãºã¯ãã©ã³ã¶ã¯ã·ã§ã³ãã©ã³ã¶ã¯ã·ã§ã³å®¹éïŒtransactionCapacityïŒä»¥äžã§ãªããã°ãªããŸããã ããã¯æ瀺çã«Avro Sinkã«é©çšãããæé»çã«Avro Sourceã«é©çšãããŸãã äŸãèããŠã¿ãŸãããïŒ
ããã§ãTCã¯transactionCapacityãBSã¯ããããµã€ãºã§ãã éåžžã®åäœã®æ¡ä»¶ã¯ãBS <= TC1ããã³BS <= TC2ã§ãã ã€ãŸããåšåº«ãæ©èœãããã£ãã«ã®ãã©ã³ã¶ã¯ã·ã§ã³å®¹éã ãã§ãªããåä¿¡Avro Sourceãåäœãããã£ãã«ã®ãã©ã³ã¶ã¯ã·ã§ã³å®¹éãèæ ®ããå¿ èŠããããŸãã ããããªããšãã·ã³ã¯ã¯ãã£ã³ãã«ã空ã«ã§ããããœãŒã¹ã¯ãã£ã³ãã«ã«ã€ãã³ããè¿œå ã§ããŸããã ãã®ãããªå ŽåãFlumeã¯ãšã©ãŒã¡ãã»ãŒãžããã°ã«éäžçã«ã¢ããããŒããå§ããŸãã
ç·Žç¿ããã®ã±ãŒã¹ ã ãã¬ã€ã³ã®1ã€ã§ã¯ãããããµã€ãº= 10000ã«èšå®ããåä¿¡ããŒãã§ã¯ãã£ãã«ã«TC = 5000ãèšå®ããŸããã ããŒã¿éã¯å°ãªããã®ã®ãåšåº«ã¯äžåºŠã«10,000件ã®ã€ãã³ããäžåºŠã«ãã£ãã«ããåŒãåºããªãã£ããããå€ãã®ã€ãã³ãããã£ãã«ã«èç©ã§ããŸããã§ããã ãããããã°ãããããšãããŒã¿éãå¢å ããåé¡ãçºçãå§ããŸããã åä¿¡ããŒãã¯ãããŒã¿ã®å€§ããªãã±ãããæåŠãå§ããŸããã ãšã©ãŒãæéå ã«éç¥ããããã©ã¡ãŒã¿ãå€æŽããããã£ãã«ã«èç©ãããããŒã¿ããããããªã¹ããªãŒã ã«ãã£ãŠå®å ã«å°éããŸããã
- 倧ããªãã³ãã«ã§ã€ãã³ããéä¿¡ããŸã ã
ãã©ã³ã¶ã¯ã·ã§ã³ã¯ããªãœãŒã¹ã®ç¹ã§éåžžã«é«äŸ¡ãªæäœã§ãã ãã©ã³ã¶ã¯ã·ã§ã³ã®åæž-ããã©ãŒãã³ã¹ã®åäžã ç¹°ãè¿ããŸãããå€æ°ã®ã€ãã³ããéä¿¡ãããšãã®å§çž®ã¯ãã¯ããã«å¹ççã«æ©èœããŸãã ãããã£ãŠãããããµã€ãºã«å ããŠããã£ãã«ã®transactionCapacityãå¢ããå¿ èŠããããŸãã
- ããŒãã®nettyäŸåé¢ä¿ããªãŒããŒã©ã€ãããŸã ã
Flumeãå€ãnetty 3.6.2 Finalããã«ã¢ããããŠããéã«ã nettyããŒãžã§ã³3.10.5 Finalã䜿çšããŠããŸãã å€ãããŒãžã§ã³ã®åé¡ã¯å°ããªãã°ã§ãããã®ãããAvro Sink / Avro Sourceã¯å®æçã«çžäºã«æ¥ç¶ã§ããŸããã ããã«ãããããŒã¿è»¢éäžã«æ°åéãããŠã³ã¿ã€ã ãå®æçã«çºçããŸãïŒãã®åŸããã¹ãŠãæ£åžžã«æ»ããŸãïŒã ããŒã¿ãã§ããã ãæ©ãå°çãããå¿ èŠãããå Žåããã®ãããªã亀éæžæ»ããåé¡ã«ãªãå¯èœæ§ããããŸãã
Javaã§Flumeãèµ·åããå ŽåãMavenã§äŸåé¢ä¿ããªãŒããŒã©ã€ãã§ããŸãã Clouderaã䜿çšããŠããŸãã¯ãµãŒãã¹ãšããŠFlumeãæ§æããå ŽåãNettyäŸåé¢ä¿ãæåã§å€æŽããå¿ èŠããããŸãã ãããã¯æ¬¡ã®ãã©ã«ããŒã«ãããŸãã
- Cloudera- / opt / cloudera / parcels / CDH-$ {ããŒãžã§ã³} / lib / flume-ng / lib ;
- ãµãŒãã¹ïŒã¹ã¿ã³ãã¢ããŒã³ïŒ- $ FLUME_HOME / lib
ãã¡ã€ã«ããŒã«ã·ã³ã¯
ããã§ãAvro Source / Sinkãšãã¡ã€ã«ãã£ãã«ã«åºã¥ããŠãã©ã³ã¹ããŒãããŒããæ§æããæ¹æ³ãèŠã€ããŸããã çŸåšããã©ã³ã¹ããŒããããã¯ãŒã¯ãéããã³ã³ããŒãã³ãïŒã€ãŸããFlumeã®æ åœãšãªã¢ããã®ããŒã¿ã衚瀺ããã³ã³ããŒãã³ãïŒãåŠçããå¿ èŠããããŸãã
èæ ®ãã¹ãæåã®ãã¬ã€ã³ã¯ãFile-Roll Sinkã§ãã ç§ã¯ãããæ ãè ã®ãã¬ã€ã³ã ãšèšãã§ãããã æå°éã®èšå®ããµããŒããã1ã€ã®ããšããã§ããŸãã-ãã¡ã€ã«ãžã®ã€ãã³ãã®æžã蟌ã¿ã
ãã©ã¡ãŒã¿ | 説æ | ããã©ã«ã㧠|
---|---|---|
ã¿ã€ã | ãã¬ã€ã³ã®å®è£ ã file_rollãæå®ããå¿ èŠããããŸãã | - |
ãã£ã³ãã« | æ ªåŒãã€ãã³ããæç»ãããã£ã³ãã«ã | - |
ãã£ã¬ã¯ã㪠| ãã¡ã€ã«ãä¿åããããã©ã«ããŒã | - |
rollInterval | æ°ãããã¡ã€ã«ã®äœæééïŒ0-1ã€ã®ãã¡ã€ã«ã«ãã¹ãŠãæžã蟌ãïŒãç§åäœã | 30 |
ã·ãªã¢ã©ã€ã¶ãŒ | ã€ãã³ãã®ã·ãªã¢ã«åã TEXTãHEADER_AND_TEXTãAVRO_EVENTããŸãã¯EventSerializer.Builderã€ã³ã¿ãŒãã§ã€ã¹ãå®è£ ããç¬èªã®ã¯ã©ã¹ãæå®ã§ããŸãã | ããã¹ã |
ããããµã€ãº | Avro Sinkãšåæ§ã«ããã£ãã«ãããã©ã³ã¶ã¯ã·ã§ã³ããšã«ååŸãããã€ãã³ãã®ãã±ããã®ãµã€ãºã | 100 |
æ ãè ã®ãã¬ã€ã³ãšèããã®ã¯ãªãã§ããïŒ çµ¶å¯Ÿã«äœãèšå®ã§ããªãããã§ãã å§çž®ãããã¡ã€ã«åïŒäœæã®ã¿ã€ã ã¹ã¿ã³ããååãšããŠäœ¿çšãããŸãïŒãããµããã©ã«ããŒã«ããã°ã«ãŒãåããããŸãã-äœããããŸããã ãã¡ã€ã«ãµã€ãºãå¶éã§ããŸããã ãã®åšåº«ã¯ãããããã説æããæéããªã-ç·æ¥ã«ããŒã¿ã®åä¿¡ãéå§ããå¿ èŠããããå Žåã«ã®ã¿é©ããŠããŸãã
ã泚æ ãã¡ã€ã«ã«ããŒã¿ãæžã蟌ãå¿ èŠãããããããã¡ã€ã«ãã¬ã€ã³ã䜿çšããããããã¡ã€ã«ãã¬ã€ã³ãå®è£ ããæ¹ãæãŸãããšããçµè«ã«éããŸããã Flumeã®ãã¹ãŠã®ãœãŒã¹ãå ¬éãããŠããããšãèãããšãäœæããã®ã¯é£ãããããŸããã§ããã 2æ¥ç®ã«ããã€ããŒãªãã°ãä¿®æ£ãããŸãããã¹ããã¯ã¯1幎以äžé©åã«æ©èœããŠãããããŒã¿ããã©ã«ãã«å解ããŠãã¡ããšããã¢ãŒã«ã€ãã«ããŠããŸãã ãµã€ã¯ã«ã®3çªç®ã®éšåã®åŸã«ããã®æ ªåŒãGitHubã«æçš¿ããŸãã
HDFSã·ã³ã¯
ãã®æ ªã¯ãã§ã«ããæ·±å»ã§ã-ããã¯å€ãã®èšå®ããµããŒãããŠããŸãã File-Roll Sinkãåæ§ã®æ¹æ³ã§äœæãããŠããªãããšã¯å°ãé©ãã§ãã
ãã©ã¡ãŒã¿ | 説æ | ããã©ã«ã㧠|
---|---|---|
ã¿ã€ã | ãã¬ã€ã³ã hdfsã®å®è£ ã瀺ãå¿ èŠããããŸãã | - |
ãã£ã³ãã« | æ ªåŒãã€ãã³ããæç»ãããã£ã³ãã«ã | - |
hdfs.path | ãã¡ã€ã«ãæžã蟌ãŸãããã©ã«ããŒã ãã®ãã©ã«ããŒã«æ£ããã¢ã¯ã»ã¹èš±å¯ãèšå®ãããŠããããšã確èªããŠãã ããã Clouderaã䜿çšããŠãããŒãæ§æãããšããŠãŒã¶ãŒflumeã«ä»£ãã£ãŠããŒã¿ãæžã蟌ãŸããŸãã | - |
hdfs.filePrefix | ãã¡ã€ã«åã®ãã¬ãã£ãã¯ã¹ã File-Rollã®å Žåã®ããŒã¹ãã¡ã€ã«åã¯ãäœææã®ã¿ã€ã ã¹ã¿ã³ãã§ãã ãããã£ãŠã my-dataãæå®ãããšãæçµçãªãã¡ã€ã«åã¯my-data1476318264182ã«ãªããŸãã | Flumedata |
hdfs.fileSuffix | ãã¡ã€ã«åã®æ¥å°ŸèŸã ãã¡ã€ã«åã®æåŸã«è¿œå ãããŸãã .gzãªã©ã®æ¡åŒµåãæå®ããããã«äœ¿çšã§ããŸãã | - |
hdfs.inUsePrefix | filePrefixã«äŒŒãŠããŸãããããŒã¿ããŸã æžã蟌ãŸããŠããäžæãã¡ã€ã«çšã§ãã | - |
hdfs.inUseSuffix | fileSuffixã«äŒŒãŠããŸãããäžæãã¡ã€ã«çšã§ãã æ¬è³ªçã«äžæçãªæ¡åŒµã | .tmp |
hdfs.rollInterval | æ°ãããã¡ã€ã«ãäœæããæéïŒç§åäœïŒã ãã®åºæºã§ãã¡ã€ã«ãéããå¿ èŠããªãå Žåã¯ã0ãèšå®ããŸãã | 30 |
hdfs.rollSize | ããªã¥ãŒã ã§ãã¡ã€ã«ãéããããªã¬ãŒã¯ãã€ãåäœã§ç€ºãããŸãã ãã®åºæºãé©åã§ãªãå Žåã0ãèšå®ããŸãã | 1024 |
hdfs.rollCount | ã€ãã³ãã®æ°ã§ãã¡ã€ã«ãéããããªã¬ãŒã 0ãæå®ããããšãã§ããŸãã | 10 |
hdfs.idleTimeout | ã¢ã¯ãã£ããã£ããªãããã«ãã¡ã€ã«ãéããããªã¬ãŒïŒç§åäœïŒã ã€ãŸãããã°ããã®éãã¡ã€ã«ã«äœãæžã蟌ãŸããªããšããã¡ã€ã«ã¯éããŸãã ãã®ããªã¬ãŒã¯ããã©ã«ãã§ç¡å¹ã«ãªã£ãŠããŸãã | 0 |
hdfs.batchSize | ä»ã®æ氎管ãšåãã§ãã ãã¬ã€ã³ã®ããã¥ã¡ã³ãã«ã¯ããããHDFSã«ãªã»ãããããåã«ãã¡ã€ã«ã«æžã蟌ãŸããã€ãã³ãã®æ°ã§ãããšæžãããŠããŸããã éžæããéã«ã¯ããã£ãã«ã®ãã©ã³ã¶ã¯ã·ã§ã³éã«ã泚ç®ããŸãã | 100 |
hdfs.fileType | ãã¡ã€ã«ã¿ã€ã-SequenceFile ïŒããŒãšå€ã®ãã¢ãæã€Hadoopãã¡ã€ã«ãååãšããŠããã¿ã€ã ã¹ã¿ã³ããããããŒããã®ã¿ã€ã ã¹ã¿ã³ããŸãã¯çŸåšã®æå»ãããŒãšããŠäœ¿çšãããŸãïŒã DataStream ïŒããã¹ãããŒã¿ãå®éã«ã¯ãFile- Roll SinkïŒãŸãã¯CompressedStream ïŒDataStreamã«äŒŒãŠããŸãããå§çž®ããïŒã | SequenceFile |
hdfs.writeFormat | èšé²åœ¢åŒã¯TextãŸãã¯Writableã§ãã SequenceFileã®ã¿ã éãã¯ãããã¹ãïŒ TextWritable ïŒãŸãã¯ãã€ãïŒ BytesWritable ïŒã®ãããããå€ãšããŠæžã蟌ãŸããããšã§ãã | 5000 |
ã·ãªã¢ã©ã€ã¶ãŒ | File-Roll Sinkã«äŒŒãDataStreamããã³CompressedStreamã®æ§æå¯èœã | ããã¹ã |
hdfs.codeC | CompressedStreamãã¡ã€ã«ã¿ã€ãã䜿çšããŠããå Žåããã®ãã©ã¡ãŒã¿ãŒãæå®ããå¿ èŠããããŸãã ãã®ãããªå§çž®ãªãã·ã§ã³ãæäŸãããŠããŸãïŒ gzipãbzip2ãlzoãlzopãsnappy ã | - |
hdfs.maxOpenFiles | åæã«éãããšãã§ãããã¡ã€ã«ã®æ倧æ°ã ãã®ãããå€ãè¶ ãããšãæãå€ããã¡ã€ã«ãéããããŸãã | 5000 |
hdfs.minBlockReplicas | éèŠãªãã©ã¡ãŒã¿ ã HDFSãããã¯ããšã®ã¬ããªã«ã®æå°æ°ã æå®ãããŠããªãå Žåãèµ·åæã«ã¯ã©ã¹ãã¹ã§æå®ãããHadoopæ§æïŒã€ãŸããã¯ã©ã¹ã¿ãŒèšå®ïŒããååŸãããŸãã æ£çŽãªãšããããã®ãã©ã¡ãŒã¿ãŒã«é¢é£ããFlumeã®åäœã®çç±ã説æããããšã¯ã§ããŸããã äžçªäžã®è¡ã¯ããã®ãã©ã¡ãŒã¿ãŒã®å€ã1ãšç°ãªãå Žåãã¹ããã¯ã¯ä»ã®ããªã¬ãŒãèŠãã«ãã¡ã€ã«ãéãå§ããèšé²çãªæéã§å€ãã®å°ããªãã¡ã€ã«ãçæããããšã§ãã | - |
hdfs.maxOpenFiles | åæã«éãããšãã§ãããã¡ã€ã«ã®æ倧æ°ã ãã®ãããå€ãè¶ ãããšãæãå€ããã¡ã€ã«ãéããããŸãã | 5000 |
hdfs.callTimeout | HDFSãžã®ã¢ã¯ã»ã¹ã®ã¿ã€ã ã¢ãŠãïŒãã¡ã€ã«ã®ãªãŒãã³/ã¯ããŒãºãããŒã¿ã®ãªã»ããïŒãããªç§åäœã | 10,000 |
hdfs.closeTries | ãã¡ã€ã«ãéããããšããåæ°ïŒæåã«æ©èœããªãã£ãå ŽåïŒã 0-æåŸãŸã§è©Šè¡ããŸãã | 0 |
hdfs.retryInterval | 倱æããå Žåã«ãã¡ã€ã«ãéããããšããé »åºŠïŒç§åäœïŒã | 180 |
hdfs.threadsPoolSize | HDFSã§IOæäœãå®è¡ããã¹ã¬ããã®æ°ã å€ãã®ãã¡ã€ã«ã«ããã±ãŒãžåãããã€ãã³ãã®ãå¯ãéãããããå Žåããã®æ°ãå¢ããããšããå§ãããŸãã | 10 |
hdfs.rollTimerPoolSize | åã®ããŒã«ãšã¯ç°ãªãããã®ã¹ã¬ããããŒã«ã¯ã¹ã±ãžã¥ãŒã«ãããã¿ã¹ã¯ãå®è¡ããŸãïŒãã¡ã€ã«ãéããŸãïŒã ããã«ã rollIntervalãšretryIntervalã® 2ã€ã®ãã©ã¡ãŒã¿ãŒã«åºã¥ããŠæ©èœããŸãã ã€ãŸã ãã®ããŒã«ã¯ãããªã¬ãŒã«ããã¹ã±ãžã¥ãŒã«ãããã·ã£ããããŠã³ãšããã¡ã€ã«ãéããããã®å®æçãªç¹°ãè¿ãè©Šè¡ã®äž¡æ¹ãå®è¡ããŸãã 1ã€ã®ã¹ã¬ããã§ååã§ãã | 1 |
hdfs.useLocalTimeStamp | HDFSã¹ããã¯ã§ã¯ãçæããããã¡ã€ã«ã®ååã«æ¥ä»èŠçŽ ã䜿çšããŸãïŒããšãã°ã hdfs.path = / logs /ïŒ Y-ïŒ m-ïŒ dã«ããããã¡ã€ã«ãæ¥ããšã«ã°ã«ãŒãåã§ããŸãïŒã æ¥ä»ã䜿çšãããšãã©ããããåä¿¡ããå¿ èŠãããããšã瀺åãããŸãã ãã®ãã©ã¡ãŒã¿ãŒã«ã¯2ã€ã®ãªãã·ã§ã³ããããŸããã€ãã³ããåŠçãããæéã䜿çšããïŒ true ïŒããã€ãã³ãã§æå®ãããæéã䜿çšããïŒã€ãŸãããã¿ã€ã ã¹ã¿ã³ããããããŒã§äœ¿çšããïŒfalseïŒïŒã ã¿ã€ã ã¹ã¿ã³ãã€ãã³ãã䜿çšããå Žåã¯ãã¡ãã»ãŒãžã«ãã®ããããŒãããããšã確èªããŠãã ããã ããããªããšãHDFSã«èšé²ãããŸããã | åœ |
hdfs.round | ã¿ã€ã ã¹ã¿ã³ããå€ã«äžžããŸãã | åœ |
hdfs.roundValue | ã¿ã€ã ã¹ã¿ã³ããäžžããæ¹æ³ã | 1 |
hdfs.roundUnit | äžžããåäœïŒ second ã minutesãŸãã¯hour ïŒã | ç¬¬äº |
HDFSãã¬ã€ã³æ§æã®èå³æ·±ãæ©èœã«æ°ã¥ãããããããŸãã-HDFSã®ã¢ãã¬ã¹ã瀺ããã©ã¡ãŒã¿ãŒã¯ãããŸããã ãã¬ã€ã³ã®äœæè ã¯ããã®ãã¬ã€ã³ãHDFSãšåããã·ã³ã§äœ¿çšããããšãææ¡ããŠããŸãã
ãããã£ãŠããã®ãã¬ã€ã³ãèšå®ãããšãã«èæ ®ãã¹ãããšã¯äœã§ããã
- 倧ããªããããµã€ãºãštransactionCapacityã䜿çšããŸãã
äžè¬ã«ãããã§ã®ãã¹ãŠã¯ä»ã®ãã¬ã€ã³ã«äŒŒãŠããŸã-ãã©ã³ã¶ã¯ã·ã§ã³ã¯ãªãœãŒã¹ã®ç¹ã§éåžžã«é«äŸ¡ãªã®ã§ã倧éã«æ³šãæ¹ãè¯ãã§ãã
- ãã¡ã€ã«ã®åœåã«ãã¯ããä¹±çšããªãã§ãã ãã ã
ãã¡ã€ã«/ãã©ã«ããŒå ã®æ¥ä»èŠçŽ ãŸãã¯ããããŒã®ãã¬ãŒã¹ãã«ããŒåã䜿çšããããšã¯ã確ãã«äŸ¿å©ãªããŒã«ã§ãã ããããããã»ã©é«éã§ã¯ãããŸããã äœæè ãæ¥ä»ã®çœ®æãããæé©åã§ããããã«æããŸã-ãœãŒã¹ãèŠããšããããã®è¡ããã©ãŒãããããããã«å®è¡ãããæäœã®æ°ã«é©ãã§ãããã ãã®ãããªãã©ã«ããŒæ§é ãã»ããã¢ããããããšã«ãããšä»®å®ããŸãã
ããã§ãdirãšsrcã¯ãããããã€ãã³ãããããŒã®å€ã§ãã ããŒã çµæã®ãã¡ã€ã«ã¯/logs/web/my-source/2016-04-15/2016-04-15-12-00-00.my-host.my-source.gzã®ããã«ãªããŸãã ç§ã®ã³ã³ãã¥ãŒã¿ãŒã§ã¯ã100äžã®ã€ãã³ãã«å¯ŸããŠãã®ååãçæããã®ã«20ç§è¿ãããããŸãïŒ ã€ãŸã 10,000ã€ãã³ãã®å ŽåãçŽ200ããªç§ããããŸãã çµè«ïŒæ¯ç§10,000ã€ãã³ãã®èšé²é床ã䞻匵ããŠããå Žåããã¡ã€ã«åã®çæã«20ïŒ ã®æéãè²»ããæºåãããŠãã ããã ããã¯ã²ã©ãã§ãã ããã¯ãã¯ã©ã€ã¢ã³ãåŽã§ãã¡ã€ã«åãçæãã責任ãè² ãããšã§è§£æ±ºã§ããŸãã ã¯ããããã®ããã«ããã€ãã®ã³ãŒããæžãå¿ èŠããããŸããããããã«ãããŒèšå®ãå€æŽããããšã¯å¯èœã§ãïŒhdfs.path = / logs /ïŒ {dir} hdfs.filePrefix =ïŒ {src} /ïŒ Y-ïŒ m-ïŒ d /ïŒ Y-ïŒ m-ïŒ d-ïŒ H-ïŒ M-ïŒ S.ïŒ {host}ãïŒ {src}
çæããããã¡ã€ã«åãfile-nameããããŒã«æž¡ããšããªãœãŒã¹ãšæéãç¯çŽã§ããŸãã ãã®ãããªããããŒã«ãããã¡ã€ã«ãã¹ã®åœ¢æã«ã¯20ç§ããããŸãããã100äžã€ãã³ãã«å¯ŸããŠ500ã600ããªç§ããããŸãã ã€ãŸããã»ãŒ40åé«éã§ããhdfs.path = / logs hdfs.filePrefix =ïŒ {ãã¡ã€ã«å}
- ã€ãã³ããçµã¿åãããŸã ã
ãã¬ãŒã³ã®ããã©ãŒãã³ã¹ãå€§å¹ ã«åäžããããã1ã€ã®å°ããªããã¯ã ã€ãã³ãã1è¡ãã€ãã¡ã€ã«ã«æžã蟌ãå Žåãã¯ã©ã€ã¢ã³ãåŽã§ããããçµåã§ããŸãã ããšãã°ããµãŒãã¹ã¯åããã¡ã€ã«ã«éä¿¡ããå¿ èŠã®ãããã°ãçæããŸãã ããã§ã¯ã\ nãåºåãæåãšããŠäœ¿çšããŠãè€æ°ã®è¡ã1ã€ã«ãŸãšããŠã¿ãŸãããïŒ HDFSãŸãã¯ãã¡ã€ã«ã·ã¹ãã ãžã®ããŒã¿ã®æžã蟌ã¿èªäœã¯ãããŒã¿ã«é¢ãããã®ãã¹ãŠã®ãããžã¿ã«ã®å®å䞻矩ããããã¯ããã«çãæéã§æžã¿ãŸãã
å°ãªããšã5察1ã®æ¯çã§ã€ãã³ããçµã¿åããããšãçç£æ§ãå€§å¹ ã«åäžããŸãã åœç¶ãããã§æ³šæããå¿ èŠããããŸããã¯ã©ã€ã¢ã³ãã®ã€ãã³ããäžåºŠã«1ã€ãã€çæãããå Žåãã€ãã³ããçµåããããã«ãããã¡ãåããã®ã«æéããããå ŽåããããŸãã ãã®éãã£ãšãã€ãã³ãã¯ã¡ã¢ãªã«ä¿åãããã°ã«ãŒãã®åœ¢æãçµåããã®ãåŸ ã¡ãŸãã ãã®ãããããŒã¿ã倱ãå¯èœæ§ãé«ããªããŸãã èŠçŽïŒ
- å°éã®ããŒã¿ã®å Žåãã¯ã©ã€ã¢ã³ããäžåºŠã«1ã€ãã€ã€ãã³ããFlumeã«éä¿¡ããããšããå§ãããŸããã€ãã³ãã倱ãå¯èœæ§ã¯äœããªããŸãã
- 倧éã®ããŒã¿ã®å Žåãã€ãã³ãéçŽã䜿çšããããšããå§ãããŸãã ã€ãã³ããéäžçã«çæãããå Žåã5ã10åã®ã€ãã³ãã®ãããã¡ãååã«ãã°ãããã€ã€ã«ãããŸãã ãã®å Žåãææ°Žã®ããã©ãŒãã³ã¹ãå€§å¹ ã«åäžããŸãã
- å°éã®ããŒã¿ã®å Žåãã¯ã©ã€ã¢ã³ããäžåºŠã«1ã€ãã€ã€ãã³ããFlumeã«éä¿¡ããããšããå§ãããŸããã€ãã³ãã倱ãå¯èœæ§ã¯äœããªããŸãã
- HDFSã¯ã©ã¹ã¿ãŒäžã®è€æ°ã®ãã·ã³ã«ãã¬ã€ã³ããããã€ããŸã ã
Clouderaãä»ããŠFlumeãæ§æããå Žåãã¯ã©ã¹ã¿ãŒã®åããŒãã§åå¥ã®FlumeããŒããå®è¡ããããšãã§ããŸãã ãããŠããã®æ©äŒã¯äœ¿çšããã»ããè¯ãã§ã-ãã®ããã«ããŠããã¹ãŠã®ã¯ã©ã¹ã¿ãŒãã·ã³éã§è² è·ãåæ£ãããããã§ãã ãã ããåãæ§æïŒã€ãŸãããã¹ãŠã®ãã·ã³ã§åãæ§æãã¡ã€ã«ïŒã䜿çšããå Žåã¯ããã¡ã€ã«åã®ç«¶åãçºçããªãããã«ããŠãã ããã ãããè¡ãã«ã¯ãããããŒã«ãã¹ãåãè¿œå ããã€ãã³ãããã¯ã䜿çšããŸãã ãããã£ãŠããã¡ã€ã«åãã³ãã¬ãŒãã§ãã®ããããŒãæå®ããã ãã§æžã¿ãŸãïŒä»¥äžãåç §ïŒã
ã泚æ å®éããã®ãããªæ±ºå®ãäžãéã«ã¯ãæ€èšãã䟡å€ããããŸããçµå±ã®ãšãããåæ ªåŒã¯åçš®ã®ããŒã¿ããã¡ã€ã«ã«æžã蟌ã¿ãŸãã ãã®çµæãHDFSã§å€æ°ã®å°ããªãã¡ã€ã«ãååŸã§ããŸãã 決å®ã®ãã©ã³ã¹ãåãå¿ èŠããããŸã-ããŒã¿éãå°ãªãå Žåã¯ãHDFSã§ã®èšé²çšã«FlumeããŒãã1ã€ã«å¶éã§ããŸãã ããã¯ãããããããŒã¿çµ±åãšåŒã°ããŸã ãè€æ°ã®ãœãŒã¹ããã®ããŒã¿ãæçµçã«åãåšåº«ã«ãªãå Žåã§ãã ãã ããããŒã¿ãæµããŠããå Žåã1ã€ã®ããŒãã§ã¯äžååãªå ŽåããããŸãã ãã®ã·ãªãŒãºã®æ¬¡ã®èšäºã§ã¯ããã©ã³ã¹ããŒããããã¯ãŒã¯å šäœã®èšèšã«ã€ããŠè©³ãã説æããŸãã
ã€ãã³ãã€ã³ã¿ãŒã»ãã¿ãŒ
ç§ã¯ãããã®äžæè°ãªã€ã³ã¿ãŒã»ãã¿ãŒã«ã€ããŠäœåºŠãèšåããŸããããããããä»ããããäœã§ãããã«ã€ããŠè©±ãæã§ãã ã€ã³ã¿ãŒã»ãã¿ãŒã¯ããœãŒã¹ã§ã€ãã³ããåä¿¡ããŠââãããã£ãã«ã«éä¿¡ãããŸã§ã®ãã§ãŒãºã§æ©èœããã€ãã³ããã³ãã©ãŒã§ãã ã€ã³ã¿ãŒã»ãã¿ãŒã¯ãã€ãã³ããå€æãå€æŽããŸãã¯ãã£ã«ã¿ãŒã§ããŸãã
Flume ã¯ããã©ã«ãã§å€ãã®ã€ã³ã¿ãŒã»ãã¿ãŒãæäŸããŸã ã
- éçããããŒïŒå®æ°ãã¿ã€ã ã¹ã¿ã³ãããã¹ãåïŒãè¿œå ããŸãã
- ããããŒã«ã©ã³ãã ãªUUIDãçæããŸãã
- ã€ãã³ãïŒæ£èŠè¡šçŸïŒã®æ¬äœããå€ãæœåºããããããŒãšããŠäœ¿çšããŸãã
- ã€ãã³ãã®å 容ãå€æŽããŸãïŒåã³æ£èŠè¡šçŸã䜿çšïŒã
- ã³ã³ãã³ãã«åºã¥ããŠã€ãã³ãããã£ã«ã¿ãªã³ã°ããŸãã
ããŸããŸãªã€ã³ã¿ãŒã»ãã¿ãŒã®æ§æäŸ
# ============================ Avro- ============================ # # Vvro- my-agent.sources.avro-source.type = avro my-agent.sources.avro-source.bind = 0.0.0.0 my-agent.sources.avro-source.port = 50001 my-agent.sources.avro-source.channels = my-agent-channel # , ( ) my-agent.sources.avro-source.interceptors = ts directory host replace group-replace filter extractor # ------------------------------------------------------------------------------ # # . # "dir", â "test-folder". my-agent.sources.avro-source.interceptors.directory.type = static my-agent.sources.avro-source.interceptors.directory.key = dir my-agent.sources.avro-source.interceptors.directory.value = test-folder # â ( â false) my-agent.sources.avro-source.interceptors.directory.preserveExisting = true # ------------------------------------------------------------------------------ # # "timestamp" , my-agent.sources.avro-source.interceptors.ts.type = timestamp my-agent.sources.avro-source.interceptors.ts.preserveExisting = true # ------------------------------------------------------------------------------ # # /IP my-agent.sources.avro-source.interceptors.host.type = host my-agent.sources.avro-source.interceptors.host.useIP = true # ( directory.key) my-agent.sources.avro-source.interceptors.host.hostHeader = host my-agent.sources.avro-source.interceptors.host.preserveExisting = true # ------------------------------------------------------------------------------ # # ; my-agent.sources.avro-source.interceptors.replace.type = search_replace my-agent.sources.avro-source.interceptors.replace.searchPattern = \t my-agent.sources.avro-source.interceptors.replace.replaceString = ; # byte[], ( â UTF-8) my-agent.sources.avro-source.interceptors.replace.charset = UTF-8 # ------------------------------------------------------------------------------ # # "" my-agent.sources.avro-source.interceptors.group-replace.type = search_replace # , 2014-01-20 20/01/2014 # . "" 4 () , # my-agent.sources.avro-source.interceptors.group-replace.searchPattern = (\\d{4})-(\\d{2})-(\\d{2})(.*) my-agent.sources.avro-source.interceptors.group-replace.replaceString = $3/$2/$1$4 # ------------------------------------------------------------------------------ # # -, my-agent.sources.avro-source.interceptors.filter.type = regex_filter my-agent.sources.avro-source.interceptors.filter.regex = error$ # true â , , # â , my-agent.sources.avro-source.interceptors.filter.excludeEvents = true # ------------------------------------------------------------------------------ # # , my-agent.sources.avro-source.interceptors.extractor.type = regex_extractor # , : "2016-04-15;WARINING;- " my-agent.sources.avro-source.interceptors.extractor.regex = (\\d{4}-\\d{2}-\\d{2});(.*); # â , # . # (\\d{4}-\\d{2}-\\d{2}) -> $1 -> ts # (.*) -> $2 -> loglevel my-agent.sources.avro-source.interceptors.extractor.serializers = ts loglevel # , TS my-agent.sources.avro-source.interceptors.extractor.serializers.ts.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer my-agent.sources.avro-source.interceptors.extractor.serializers.ts.name = timestamp my-agent.sources.avro-source.interceptors.extractor.serializers.ts.pattern = yyyy-MM-dd # as is my-agent.sources.avro-source.interceptors.extractor.serializers.loglevel.name = level
æšæºã®ã€ã³ã¿ãŒã»ãã¿ãŒã®äžã«ã¯ãæ®å¿µãªããããããŒãã£ã«ã¿ãŒãèŠã€ãããŸããã§ããããã ããå¿ èŠã«å¿ããŠããã®ãããªã€ã³ã¿ãŒã»ãã¿ãŒãèªåã§äœæã§ããŸããFlumeãã©ã³ã¹ããŒããå®å šã«æ§æããã«ã¯ãå¥ã®ã¿ã€ãã®Flumeã³ã³ããŒãã³ãã§ããã»ã¬ã¯ã¿ãŒãæ€èšããå¿ èŠããããŸãã
ãã£ã³ãã«ã»ã¬ã¯ã¿ãŒ
ã©ã®ã€ãã³ããã©ã®ãã£ãã«ã«éä¿¡ããããç解ããã«ã¯ããã£ãã«ã«ã»ã¬ã¯ã¿ãå¿ èŠã§ããåèš2çš®é¡ã®ã»ã¬ã¯ã¿ãŒããããŸãã
- è€è£œ -ã»ã¬ã¯ã¿ãããã«ããããœãŒã¹ã¯é¢é£ãããã¹ãŠã®ãã£ãã«ã§ã€ãã³ããè€è£œããŸããFlumeãããã©ã«ãã§äœ¿çšããã®ã¯åœŒã§ããåæã«ããã®ã»ã¬ã¯ã¿ã䜿çšãããšãããªãã·ã§ã³ã®ããã£ãã«ãéžæã§ããŸããã¡ã€ã³ã®ãã®ãšã¯ç°ãªãããœãŒã¹ã¯ãã®ãããªãã£ãã«ãžã®å€±æããã€ãã³ãã®è¿œå ãç¡èŠããŸãã
- å€éå -ããã€ãã®ã«ãŒã«ã«åŸã£ãŠãã£ãã«éã§ã€ãã³ããåé ããã»ã¬ã¯ã¿ãŒãæšæºã®å€éåã»ã¬ã¯ã¿ãŒã®å®è£ ã«ãããããããŒå€ã«åºã¥ããŠãã£ãã«éã§ã€ãã³ããé ä¿¡ã§ããŸãã
å€éåã»ã¬ã¯ã¿ãŒã®æ§æäŸ
# ============================ Avro- ============================ # my-source.sources.avro-source.type = avro my-source.sources.avro-source.port = 50002 my-source.sources.avro-source.bind = 127.0.0.1 my-source.sources.avro-source.channels = hdfs-channel file-roll-channel null-channel # â multiplexing, # , "" "" , # HDFS, â my-source.sources.avro-source.selector.type = multiplexing # , my-source.sources.avro-source.selector.header = type # type = important, HDFS, my-source.sources.avro-source.selector.mapping.important = hdfs-channel file-roll-channel # type = common, my-source.sources.avro-source.selector.mapping.common = file-roll-channel # type - , # ( , memchannel null-sink) my-source.sources.avro-source.selector.mapping.default = hdfs-null-channel
ã»ã¬ã¯ã¿ãŒã¯ã€ã³ã¿ãŒã»ãã¿ãŒã®åŸã®ã€ãã³ããåŠçããŸããããã¯ãã€ãã³ãã§ã€ã³ã¿ãŒã»ãã¿ãŒã®ããã€ãã®æäœïŒããšãã°ãããŸããŸãªããããŒã®æœåºïŒãå®è¡ãããããã®æäœã®çµæãã»ã¬ã¯ã¿ãŒã§æ¢ã«äœ¿çšã§ããããšãæå³ããŸãã
ãããã«
ãã®èšäºã¯äºæ³å€ã«å€§èŠæš¡ã§ããããšãå€æããããããã®ã·ãªãŒãºã®èšäºã®æ¬¡ã®ããŒãã§ãçŽæããããµã€ãã®ç£èŠãæ€èšããããšã«ããŸãããçµè«ãšããŠãHDFSã§æ©èœããFlumeæ§æã®1ã€ã瀺ããããšæããŸããããã¯ãããŒããããæ¯ç§æ倧çŽ2000ã€ãã³ããŸã§ã®å°éã®ããŒã¿ã®é ä¿¡ãšç·šæã«é©ããŠããŸãããã®ããŒãã«ã¯ãã€ãã³ãã«ããŒã«ããããŒïŒã15mããŸãã¯ã60mãïŒãdirãããã³srcãå¿ èŠã§ãããããã䜿çšããŠã2ã¬ãã«ã®ãã©ã«ããŒéå±€ãååŸãããŸãã
HDFSã®Flumeæ§æ
flume-hdfs.sources = hdfs-source flume-hdfs.channels = hdfs-15m-channel hdfs-60m-channel hdfs-null-channel flume-hdfs.sinks = hdfs-15m-sink hdfs-60m-sink # =========== Avro-, host ============ # flume-hdfs.sources.hdfs-source.type = avro flume-hdfs.sources.hdfs-source.port = 50002 flume-hdfs.sources.hdfs-source.bind = 0.0.0.0 flume-hdfs.sources.hdfs-source.interceptors = hostname flume-hdfs.sources.hdfs-source.interceptors.hostname.type = host flume-hdfs.sources.hdfs-source.interceptors.hostname.hostHeader = host flume-hdfs.sources.hdfs-source.channels = hdfs-null-channel hdfs-15m-channel flume-hdfs.sources.hdfs-source.selector.type = multiplexing flume-hdfs.sources.hdfs-source.selector.header = roll flume-hdfs.sources.hdfs-source.selector.mapping.15m = hdfs-15m-channel flume-hdfs.sources.hdfs-source.selector.mapping.60m = hdfs-60m-channel flume-hdfs.sources.hdfs-source.selector.mapping.default = hdfs-null-channel # ============================ , 15 ============================ # flume-hdfs.channels.hdfs-15m-channel.type = file flume-hdfs.channels.hdfs-15m-channel.maxFileSize = 1073741824 flume-hdfs.channels.hdfs-15m-channel.capacity = 10000000 flume-hdfs.channels.hdfs-15m-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-15m-channel.dataDirs = /flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2 flume-hdfs.channels.hdfs-15m-channel.checkpointDir = /flume/flume-hdfs/hdfs-15m-channel/checkpoint # ============================ , 60 ============================ # flume-hdfs.channels.hdfs-60m-channel.type = file flume-hdfs.channels.hdfs-60m-channel.maxFileSize = 1073741824 flume-hdfs.channels.hdfs-60m-channel.capacity = 10000000 flume-hdfs.channels.hdfs-60m-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-60m-channel.dataDirs =/flume/flume-hdfs/hdfs-60m-channel/data1,/flume/flume-hdfs/hdfs-60m-channel/data2 flume-hdfs.channels.hdfs-60m-channel.checkpointDir = /flume/flume-hdfs/hdfs-60m-channel/checkpoint # =========== , 15 (5 . ) =========== # flume-hdfs.sinks.hdfs-15m-sink.type = hdfs flume-hdfs.sinks.hdfs-15m-sink.channel = hdfs-15m-channel flume-hdfs.sinks.hdfs-15m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log flume-hdfs.sinks.hdfs-15m-sink.hdfs.path = /logs/%{dir} flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileSuffix = .gz flume-hdfs.sinks.hdfs-15m-sink.hdfs.writeFormat = Text flume-hdfs.sinks.hdfs-15m-sink.hdfs.codeC = gzip flume-hdfs.sinks.hdfs-15m-sink.hdfs.fileType = CompressedStream flume-hdfs.sinks.hdfs-15m-sink.hdfs.minBlockReplicas = 1 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollInterval = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollSize = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.rollCount = 0 flume-hdfs.sinks.hdfs-15m-sink.hdfs.idleTimeout = 300 flume-hdfs.sinks.hdfs-15m-sink.hdfs.round = true flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundValue = 15 flume-hdfs.sinks.hdfs-15m-sink.hdfs.roundUnit = minute flume-hdfs.sinks.hdfs-15m-sink.hdfs.threadsPoolSize = 8 flume-hdfs.sinks.hdfs-15m-sink.hdfs.batchSize = 10000 # =========== , 60 (20 . ) =========== # flume-hdfs.sinks.hdfs-60m-sink.type = hdfs flume-hdfs.sinks.hdfs-60m-sink.channel = hdfs-60m-channel flume-hdfs.sinks.hdfs-60m-sink.hdfs.filePrefix = %{src}/%Y-%m-%d/%Y-%m-%d-%H-%M-%S.%{src}.%{host}.log flume-hdfs.sinks.hdfs-60m-sink.hdfs.path = /logs/%{dir} flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileSuffix = .gz flume-hdfs.sinks.hdfs-60m-sink.hdfs.writeFormat = Text flume-hdfs.sinks.hdfs-60m-sink.hdfs.codeC = gzip flume-hdfs.sinks.hdfs-60m-sink.hdfs.fileType = CompressedStream flume-hdfs.sinks.hdfs-60m-sink.hdfs.minBlockReplicas = 1 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollInterval = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollSize = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.rollCount = 0 flume-hdfs.sinks.hdfs-60m-sink.hdfs.idleTimeout = 1200 flume-hdfs.sinks.hdfs-60m-sink.hdfs.round = true flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundValue = 60 flume-hdfs.sinks.hdfs-60m-sink.hdfs.roundUnit = minute flume-hdfs.sinks.hdfs-60m-sink.hdfs.threadsPoolSize = 8 flume-hdfs.sinks.hdfs-60m-sink.hdfs.batchSize = 10000 # ================ NULL- + =============== # flume-hdfs.channels.hdfs-null-channel.type = memory flume-hdfs.channels.hdfs-null-channel.capacity = 30000 flume-hdfs.channels.hdfs-null-channel.transactionCapacity = 10000 flume-hdfs.channels.hdfs-null-channel.byteCapacityBufferPercentage = 20 flume-hdfs.sinks.hdfs-null-sink.channel = hdfs-null-channel flume-hdfs.sinks.hdfs-null-sink.type = null
ãµã€ã¯ã«ã®æ¬¡ã®æçµèšäºã§ã¯ã以äžãæ€èšããŸãã
- Flumeã«åºã¥ããæ¬æ ŒçãªããŒã¿ãã©ã³ã¹ããŒããæ§ç¯ããããã»ã¹ã
- ç¬èªã®ã³ã³ããŒãã³ããéçºããäŸã
- ãã®èšäºã«ã¯å«ãŸããŠããªãã£ããçŽæãããããŒãç£èŠã