ããã§ããšãã²ããŒã»ãšãã²ããŒã»ããªãœãã»ããªãœãã¯ãã¹ããŒã¯ã«ã€ããŠãç¥è©±ããããŠãã³ã¯ã»ããã€ãã®ããã¹ããã±ã€ãã£ã»ããªãŒãããé©åãã©ããã«ã€ããŠå°ã説æããŸãã
ããã¯ãSparkã«é¢ããçããã¬ããŒãã§ãã
éåžžã圌ãã¯Sparkã«ã€ããŠå€ãã®ããšã話ãããããããã«ã¯ãŒã«ããScalaã§ã³ãŒãã瀺ããŸãã ããããç§ã¯ãããã«ç°ãªãç®æšãæã£ãŠããŸãã ãŸããSparkã®æŠèŠãšãSparkãå¿ èŠãªçç±ã«ã€ããŠèª¬æããŸãã ããããäž»ãªç®æšã¯ãJavaéçºè ãšããŠå®å šã«äœ¿çšã§ããããšã瀺ãããšã§ãã ãã®ã¬ããŒãã§ã¯ãSparkã«é¢ããããã€ãã®ç¥è©±ãææããŸãã
èªåã«ã€ããŠç°¡åã«
ç§ã¯2001幎ããJavaããã°ã©ããŒã§ãã
2003幎ãŸã§ã«ã圌ã¯åæã«æãå§ããŸããã
2008幎ãã圌ã¯åè°ã«åŸäºãå§ããŸããã
2009幎以æ¥ã圌ã¯ããŸããŸãªãããžã§ã¯ãã®ã¢ãŒããã¯ãã£ã«æºãã£ãŠããŸããã
ã¹ã¿ãŒãã¢ããã¯2014幎ã«ç¬èªã«ãªãŒãã³ããŸããã
2015幎以æ¥ãç§ã¯Naya Technologiesã®ããã°ããŒã¿ã®æè¡ãªãŒããŒã§ãããå¯èœãªéãããã°ããŒã¿ãå®è£ ããŠããŸãã ç§ãã¡ã«ã¯åœŒããå©ããŠã»ãããšé¡ãèšå€§ãªæ°ã®é¡§å®¢ãããŸãã ç§ãã¡ã¯å£æ» çã«æ°ãããã¯ãããžãŒã«ç²ŸéããŠãã人ãäžè¶³ããŠããã®ã§ãç§ãã¡ã¯åžžã«åŽåè ãæ¢ããŠããŸãã
ã¹ããŒã¯ç¥è©±
Sparkã«ã¯å€ãã®ç¥ââ話ããããŸãã
ãŸãã詳现ã«èª¬æããæŠå¿µçãªç¥è©±ãããã€ããããŸãã
- Sparkã¯Hadoopã®ããŒã·ã§ã³ã®äžçš®ã§ãã å€ãã®äººããSparkãšHadoopãäžç·ã«ããããã ãšèããŠããŸãã ãããããã§ãããã©ããã«ã€ããŠè©±ããŸãããã
- ãã®Sparkã¯Scalaã§äœæããå¿
èŠããããŸãã Sparkã¯Scalaã§äœæã§ããã ãã§ãªããScalaã§äœæããã®ãæ£ãããšèããããšãããã§ãããããã€ãã£ãAPIãªã©ã®ããã§ãã ãããæ£ãããã©ããã«ã€ããŠè©±ããŸãã
- ç§ã¯Springã倧奜ãã§ãå¯èœãªéããã¹ãããã»ããµã䜿çšããŠããŸãã ãã¹ãããã»ããµã¯æ¬åœã«ããã§äœãå©çããããããŸããïŒ
- Sparkãã¹ãã§äœãèµ·ãããã«ã€ããŠè©±ããŸãããã Sparkã¯ããã°ããŒã¿ã§ããããããã¹ãæ¹æ³ã¯ããŸãæ確ã§ã¯ãããŸããã ããã«ã¯ãã¹ãããŸã£ããæžããªããšããç¥è©±ããããæžãã°ããã¹ãŠãç§ãã¡ãæ
£ããŠãããã®ãšã¯å®å
šã«ç°ãªã£ãŠèŠããã§ãããã
ããã€ãã®æè¡çãªç¥è©±ããããŸãïŒããã¯Sparkã§äœæ¥ããããå€å°ãªããšããããç¥ã£ãŠãã人åãã§ãïŒã
- ãããŒããã£ã¹ãã«ã€ããŠ-å Žåã«ãã£ãŠã¯äœ¿çšããå¿
èŠããããŸãã䜿çšããªããšããã¹ãŠãã¯ã©ãã·ã¥ããŸãã ãããããã§ãããã©ããã«ã€ããŠè©±ããŠãã ããã
- ããŒã¿ãã¬ãŒã ã«ã€ããŠ-ããŒã¿ãã¬ãŒã ã¯ã¹ããŒã ãæã€ãã¡ã€ã«ã«ã®ã¿äœ¿çšã§ãããšèšãããä»ã®å Žåã«äœ¿çšã§ãããã©ããã«ã€ããŠèª¬æããŸãã
ãããŠãäž»ãªç¥è©±ã¯ãã³ã¯ããã€ãã°ã«ãŒãã«ã€ããŠã§ãã ãã³ã¯ã»ããã€ããããªãããŒã»ã¹ãã¢ãŒãºããã£ããã£ã»ããªãŒã®ããã«è³¢ãããã¹ããæžãïŒæžãïŒãšããã®ã¯ç¥è©±ã§ãã ãããŠä»æ¥ãSparkã§æç« ãæžããŸããããã¯ãããããã¹ãŠã®ãã¥ãŒãžã·ã£ã³ã®æè©ãåæãããããã®é¡äŒŒããåèªãç¹å®ããã®ã«åœ¹ç«ã¡ãŸãã ãã³ã¯ããã€ãããããã¢ãŒãã£ã¹ããšåããã³ã»ã³ã¹ãæžããŠããããšã蚌æãããã
ãããã®ç¥è©±ã®ã©ããåè«ã§ãããèŠãŠã¿ãŸãããã
ç¥è©±1. SparkãšHadoop
æŠããŠãHadoopã¯åãªãæ å ±ã®ãªããžããªã§ãã ããã¯åæ£ãã¡ã€ã«ã·ã¹ãã ã§ãã ããã«ããã®æ å ±ãåŠçã§ããç¹å®ã®ããŒã«ãšAPIã®ã»ãããæäŸããŸãã
Sparkã®ã³ã³ããã¹ãã§ã¯ãHadoopãå¿ èŠãšããã®ã¯Sparkã§ã¯ãªããããããHadoopã«ä¿åããŠãã®ããŒã«ã䜿çšããŠåŠçã§ããïŒããã©ãŒãã³ã¹ã®åé¡ã«çŽé¢ããŠããïŒæ å ±ãSparkã䜿çšããŠããé«éã«åŠçã§ããããã§ãã
åé¡ã¯ãSparkãåäœããããã«Hadoopãå¿ èŠãã©ããã§ãã
Sparkã®å®çŸ©ã¯æ¬¡ã®ãšããã§ãã

ããã«Hadoopãšããèšèã¯ãããŸããïŒ Sparkã¢ãžã¥ãŒã«ããããŸãïŒ
- Spark Coreã¯ãããŒã¿ãåŠçã§ããç¹å®ã®APIã§ãã
- Spark SQLãããã¯ãSQLã«ç²ŸéããŠãã人åãã«SQLã®ãããªæ§æãäœæããããšãå¯èœã«ããŸãïŒããã«ã€ããŠã¯ãè¯ããæªããã«ã€ããŠå¥ã«èª¬æããŸãïŒã
- æ©æ¢°åŠç¿ã¢ãžã¥ãŒã«ã
- ã¹ããªãŒãã³ã°ãSparkã䜿çšããŠæ
å ±ãä¿æããããäœããèãããã§ããŸãã
ããããHadoopãšããèšèã¯ã©ãã«ããããŸããã
Sparkã«ã€ããŠè©±ããŸãããã
ãã®ã¢ã€ãã¢ã¯ã2009幎é ã«ããŒã¯ã¬ãŒå€§åŠã§çãŸããŸããã æåã®ãªãªãŒã¹ã¯ããã»ã©åã§ã¯ãªãã2012幎ã«ãªãªãŒã¹ãããŸãããä»æ¥ã¯ããŒãžã§ã³2.1.0ã§ãïŒ2016幎ã®çµããã«ãªãªãŒã¹ãããŸããïŒã ãã®ã¬ããŒããæ¡ç¹ããæç¹ã§ã¯ãããŒãžã§ã³1.6.1ãé¢é£ããŠããŸããããAPIãã¯ãªãŒã³ã«ããå€ãã®æ°ãã䟿å©ãªãã®ãè¿œå ããSpark 2.0ã®å·®ãè¿«ã£ããªãªãŒã¹ãçŽæããŸããïŒããã§ã¯Spark 2.0ã®ã€ãããŒã·ã§ã³ã¯èæ ®ãããŸããïŒã
Sparkèªäœã¯Scalaã§èšè¿°ãããŠããŸããããã¯ããã€ãã£ãAPIãååŸãããããSparkã§Sparkã䜿çšããæ¹ãè¯ããšããç¥è©±ã説æããŠããŸãã ãã ããScala APIã®ã»ãã«æ¬¡ã®ãã®ããããŸãã
- Python
- Java
- Java 8ïŒåå¥ïŒ
- ããã³RïŒçµ±èšããŒã«ïŒã
InteliJã§Sparkãäœæã§ããŸããããã¯ãã¬ããŒãããã»ã¹ã§æ¬æ¥è¡ããŸãã Eclipseã䜿çšã§ããŸãããSparkã«ã¯ãŸã ç¹å¥ãªãã®ããããŸã-ããã¯Sparkã·ã§ã«ã§ãçŸåšã¯ç¹å®ã®ããŒãžã§ã³ã®Hadoopã«ä»å±ããŠãããSparkã³ãã³ããã©ã€ãã§èšè¿°ããŠå³åº§ã«çµæãåŸãããšãã§ããŸããåå©çšã®ããã
SparkãSparkã·ã§ã«ãšããŒãããã¯ã§å®è¡ã§ããŸã-çµã¿èŸŒã¿ã§ãã Spark-submitã³ãã³ãã䜿çšããŠã¯ã©ã¹ã¿ãŒã§Sparkã¢ããªã±ãŒã·ã§ã³ãèµ·åããéåžžã®Javaããã»ã¹ãšããŠå®è¡ã§ããŸãïŒjava -jaarãšmainãåŒã³åºãããã³ãŒããèšè¿°ãããŠããå ŽæãèšãïŒã æ¬æ¥ãã¬ããŒãã®éçšã§Sparkãèµ·åããŸãã 解決ãããã¿ã¹ã¯ã«ã¯ãããŒã«ã«ãã·ã³ã§ååã§ãã ãã ããã¯ã©ã¹ã¿ãŒã§å®è¡ããå Žåã¯ãã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒãå¿ èŠã§ãã ãããSparkãå¿ èŠãšããå¯äžã®ãã®ã§ãã ãããã£ãŠãHadoopãªãã«ã¯æ¹æ³ããªããšããé¯èŠããã°ãã°çºçããŸãã Hadoopã«ã¯ãã¯ã©ã¹ã¿ãŒå šäœã«Sparkã¿ã¹ã¯ãåæ£ãããããã«äœ¿çšã§ããã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã§ããYarnããããŸãã ãããã代æ¿æ段ããããŸã-Mesos-Hadoopãšã¯é¢ä¿ã®ãªãã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã§ãã ããã¯é·ãéååšããŠãããçŽ1幎åã«åœŒãã¯7000äžãã«ãåãåããŸããã ååãšããŠãHadoopãå«ã人ã¯èª°ã§ããYarnãšHadoopããŸã£ãããªãã¯ã©ã¹ã¿ãŒã§å®éã«Sparkã¿ã¹ã¯ãå®è¡ã§ããŸãã
ããŒã¿ã®å±ææ§ã«ã€ããŠ2ã€ã®èšèã ãã§èª¬æããŸãã 1å°ã®ãã·ã³äžã§ã¯ãªããå€æ°ã®ãã·ã³äžã«ããããã°ããŒã¿ãåŠçãããšããèãã¯äœã§ããïŒ
ããšãã°jdbcãORMã§åäœããã³ãŒããäœæãããšãå®éã«ã¯ã©ããªããŸããïŒ Javaããã»ã¹ãéå§ãããã·ã³ããããããŒã¿ããŒã¹ã«ã¢ã¯ã»ã¹ããã³ãŒãããã®ããã»ã¹ã§å®è¡ããããšããã¹ãŠã®ããŒã¿ãããŒã¿ããŒã¹ããèªã¿åããããã®Javaããã»ã¹ãæ©èœããå Žæã«ãªãã€ã¬ã¯ããããŸãã ããã°ããŒã¿ã«ã€ããŠè©±ããšããããŒã¿ãå€ãããããããããè¡ãããšã¯äžå¯èœã§ã-ããã¯éå¹ççã§ãããããã«ã®ããã¯ããããŸãã ããã«ãããŒã¿ã¯æ¢ã«é åžãããŠãããæåã¯å€æ°ã®ãã·ã³ã«é 眮ãããŠããããããã®ããã»ã¹ã«ããŒã¿ããã«ããã®ã§ã¯ãªãããã®ãæ¥ä»ããåŠçãããã·ã³ã«ã³ãŒããé åžããããšããå§ãããŸãã ãããã£ãŠãããã¯å€ãã®ãã·ã³ã§äžŠè¡ããŠè¡ãããç¡å¶éã®æ°ã®ãªãœãŒã¹ã䜿çšããŸããããã§ã¯ããããã®ããã»ã¹ã調æŽããã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒãå¿ èŠã§ãã
ãã®åçã§ã¯ãããããã¹ãŠãSparkã®äžçã§ã©ã®ããã«æ©èœããããããããŸãã

ãã©ã€ããŒããããŸã-ã¡ã€ã³ã¯å¥ã®ãã·ã³ã§å®è¡ãããŸãïŒã¯ã©ã¹ã¿ãŒã«é¢é£ããŠããŸããïŒã Sparkã¢ããªã±ãŒã·ã§ã³ãéä¿¡ãããšãããªãœãŒã¹ãããŒãžã£ãŒã§ããYarnã«é ŒããŸãã Javaããã»ã¹ã«äœ¿çšããã¯ãŒã«ãŒã®æ°ïŒ3ãªã©ïŒã圌ã«äŒããŸãã ã¯ã©ã¹ã¿ãã·ã³ããã圌ã¯ã¢ããªã±ãŒã·ã§ã³ãã¹ã¿ãŒãšåŒã°ãã1å°ã®ãã·ã³ãéžæããŸãã ãã®ã¿ã¹ã¯ã¯ãã³ãŒããååŸãããããå®è¡ããã¯ã©ã¹ã¿ãŒå ã®3å°ã®ãã·ã³ãèŠã€ããããšã§ãã 3ã€ã®ãã·ã³ãããã3ã€ã®å¥åã®Javaããã»ã¹ïŒ3ã€ã®ãšã°ãŒãã¥ãŒã¿ãŒïŒãçºçããããã§ã³ãŒããèµ·åãããŸãã ãã®åŸããã¹ãŠãã¢ããªã±ãŒã·ã§ã³ãã¹ã¿ãŒã«æ»ããæçµçã«ã¯ãããã°ããŒã¿æäœã®çµæãã³ãŒãã®å ã®å Žæã«æ»ãããã«ããã©ã€ããŒã«çŽæ¥è¿ããŸãã
ããã¯ãä»æ¥ã話ãããããšãšã¯çŽæ¥é¢ä¿ãããŸããã SparkãCluster ManagerïŒãã®äŸã§ã¯YarnïŒã§ã©ã®ããã«æ©èœãããããããŠãªãœãŒã¹ãéãããŠããçç±ïŒãéãé€ã-ãã·ã³ãã¡ã¢ãªãªã©ã®äœè£ãããå ŽåïŒã«ã€ããŠç°¡åã«èª¬æããŸãã ããã¯å€å žçãªMapReduceã«å°ã䌌ãŠããŸã-Hadoopã«ãã£ãå€ãAPIïŒååçã«ã¯çŸåšïŒã§ããå¯äžã®éãã¯ããã®APIãäœæããããšãããã·ã³ã®åŒ·åºŠãäžååã§ãããäžéããŒã¿ã®çµæã¯ãã£ã¹ã¯ã«ããä¿åã§ããªãããšã§ãã RAMã«ååãªã¹ããŒã¹ããªãã£ãããã§ãã ãããã£ãŠãããã¯ãã¹ãŠãã£ãããšåããŸããã äŸãšããŠãå€ãMapReduceã§èšè¿°ãããã³ãŒããæè¿æžãæãããšãããçŽ2.5æéå®è¡ããããšèšããŸãã Sparkã¯ãã¹ãŠãRAMã«ä¿åãããããSparkã§1.5ååäœããŸãã
ã³ãŒãã®èšè¿°æã«ã¯ããã®äžéšãã¯ã©ã¹ã¿ãŒã§å®è¡ãããä»ã®éšåããã©ã€ããŒã§å®è¡ãããããšãç解ããããšãéåžžã«éèŠã§ãã ãããããç解ããŠããªã人ã¯ãããããçš®é¡ã®OutOfMemoryãªã©ãæã£ãŠããŸãã ïŒããã«ã€ããŠã話ããŸã-ãããã®ãšã©ãŒã®äŸã瀺ããŸãïŒã
ã ããã¹ããŒã¯...è¡ããŸããã
RDDïŒåŸ©å æ§ã®ããåæ£ããŒã¿ã»ããïŒã¯ããã¹ãŠã®Sparkãå®è¡ãããäž»èŠãªã³ã³ããŒãã³ãã§ãã

ããŒã¿ã»ãããšããçšèªããå§ããŸããããããã¯åãªãæ å ±ã®éãŸãã§ãã ãã®APIã¯Streamã«éåžžã«äŒŒãŠããŸãã å®éãStreamãšåæ§ã«ãããã¯ããŒã¿ãŠã§ã¢ããŠã¹ã§ã¯ãªããããŒã¿ã®äžçš®ã®æœè±¡åïŒãã®å Žåã¯åæ£ïŒã§ããããã®ããŒã¿ã«å¯ŸããŠããããçš®é¡ã®æ©èœãå®è¡ã§ããŸãã Streamãšã¯ç°ãªããRDDã¯æåã¯åæ£åã§ããRDDã¯1ã€ã®RDDãã·ã³ã§ã¯ãªããSparkã®èµ·åæã«äœ¿çšãèš±å¯ããããã·ã³ã®æ°ã«é 眮ãããŸãã
ã¬ãžãªãšã³ãã¯ãããªãã圌ã殺ãããšã¯ãªããšèšããŸãããªããªããããŒã¿åŠçäžã«ãã·ã³ããªãã«ãªã£ãå ŽåïŒäŸãã°ãå ãéåãããªã©ïŒå埩ããŸãã æãããããŸãã
RDDã¯ã©ãããå ¥æã§ããŸããïŒ
- æãäžè¬çãªãªãã·ã§ã³ã¯ãç¹å®ã®ã¿ã€ãã®ãã¡ã€ã«ããããã¡ã€ã«ãŸãã¯ãã£ã¬ã¯ããªããã®ãã®ã§ãã ãã¡ã€ã«ããRDDãäœæã§ããŸãïŒStreamãããŒã¿ãœãŒã¹ãå¿
èŠãšããããã«ïŒã
- ã¡ã¢ãªãã-äœããã®ã³ã¬ã¯ã·ã§ã³ãŸãã¯ãªã¹ãããã ããã¯ããã¹ãã«æããã䜿çšãããŸãã ããšãã°ãåæããŒã¿ãå«ãRDDãåãåããåŠçãããããŒã¿ãRDDã«åºåãããµãŒãã¹ãäœæããŸããã ããããã¹ããããšãããã£ã¹ã¯ããããŒã¿ãèªã¿ãããããŸããã ãã¹ãã§äœããã®ã³ã¬ã¯ã·ã§ã³ãäœæããããšæããŸãã ãã®ã³ã¬ã¯ã·ã§ã³ãRDDã«å€æãããµãŒãã¹ããã¹ãããæ©äŒããããŸãã
- ã¹ããªãŒã ã®ããã«å¥ã®RDDããã ã»ãšãã©ã®ã¹ããªãŒã ã¡ãœããã¯ã¹ããªãŒã ãè¿ããŸã-ãã¹ãŠãéåžžã«äŒŒãŠããŸãã
RDDã®äœææ¹æ³ã®äŸã次ã«ç€ºããŸãã
// from local file system JavaRDD<String> rdd = sc.textFile("file:/home/data/data.txt"); // from Hadoop using relative path of user, who run spark application rdd = sc.textFile("/data/data.txt") // from hadoop rdd = sc.textFile("hdfs://data/data.txt") // all files from directory rdd = sc.textFile("s3://data/*") // all txt files from directory rdd = sc.textFile("s3://data/*.txt")
scãšã¯åŸã§èª¬æããŸãïŒããã¯ãSparkã®éå§ãªããžã§ã¯ãã§ãïŒã ããã§ãRDDãäœæããŸãã
- ããŒã«ã«ãã£ã¬ã¯ããªã«ããããã¹ããã¡ã€ã«ããïŒããã§ã¯Hadoopãšã¯é¢ä¿ãããŸããïŒã
- ãªã¬ãŒã·ã§ãã«ãã¹äžã®ãã¡ã€ã«ããã
- Hadoopã§-ããã§ãHadoopã«ãããã¡ã€ã«ãååŸããŸãã å®éã«ã¯çŽ°ããåå²ãããŠããŸããã1ã€ã®RDDã«ãŸãšããããŸãã ã»ãšãã©ã®å ŽåãRDDã¯ãã®ããŒã¿ãé
眮ãããŠãããã·ã³ã«é
眮ãããŸãã
- s3ã¹ãã¬ãŒãžããèªã¿åãããšãã§ããŸããããããçš®é¡ã®ã¯ã€ã«ãã«ãŒãã䜿çšããããããŒã¿ãã£ã¬ã¯ããªããããã¹ããã¡ã€ã«ã®ã¿ãååŸã§ããŸãã
ãã®RDDã«ã¯äœãå«ãŸããŸããïŒ ããã§ã¯ãããã¯RDDïŒããã¹ããã¡ã€ã«ã«æååãããïŒã§ãããšè¿°ã¹ãŠããŸãã ããã«ããã¡ã€ã«ïŒãããã¯ãã®ãã¡ã€ã«ã®è¡ïŒããRDDãäœæãããããã£ã¬ã¯ããªïŒãã®ãã£ã¬ã¯ããªå ã®ãã¹ãŠã®ãã¡ã€ã«ã®è¡ïŒããRDDãäœæãããã¯é¢ä¿ãããŸããã
ããã«ãããã¡ã¢ãªããRDDãäœæãããŸãã

ãªã¹ããååŸããŠRDDã«å€æããparallelizeã¡ãœããããããŸãã
ããã§ãscãäœã§ããããšããåé¡ã«åãçµã¿ãŸããscã¯ãRDDãååŸããããã«åžžã«äœ¿çšãããŠããŸãã Scalaã§äœæ¥ããå Žåããã®ãªããžã§ã¯ãã¯SparkContextãšåŒã°ããŸãã Java APIã®äžçã§ã¯ãJavaSparkContextãšåŒã°ããŸãã ããã¯ãããããRDDãååŸãããããSparkã«é¢é£ããã³ãŒãã®èšè¿°ãéå§ããäž»èŠãªãã€ã³ãã§ãã
Javaã§Sparkã³ã³ããã¹ããªããžã§ã¯ããæ§æããæ¹æ³ã®äŸã次ã«ç€ºããŸãã
SparkConf conf = new SparkConf(); conf.setAppName("my spark application"); conf.setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf);
æåã«ãSparkæ§æãªããžã§ã¯ããäœæãããæ§æããïŒã¢ããªã±ãŒã·ã§ã³ã®ååãèšãïŒãããŒã«ã«ã§åäœãããã©ããã瀺ããŸãïŒã¢ã¹ã¿ãªã¹ã¯ã¯ãæ€çŽ¢ããã¹ã¬ããã®æ°ã瀺ãã䜿çšã§ããã¹ã¬ããã®æ°ã¯1ã2ãªã©ãæå®ã§ããŸãïŒ dãïŒã ãããŠãJavaSparkContextãäœæããããã§æ§æãæž¡ããŸãã
ããã¯æåã®è³ªåãæèµ·ããŸãïŒã©ãããã°ãã¹ãŠãåå²ã§ããŸããïŒ ãã®æ¹æ³ã§SparkContextãäœæããããã«æ§æãæž¡ããšãã¯ã©ã¹ã¿ãŒã§ã¯æ©èœããŸããã ã¯ã©ã¹ã¿ãŒã«äœãæžã蟌ãŸããªãããã«åå²ããå¿ èŠããããŸãïŒSparkããã»ã¹ã®éå§æã«ã䜿çšãããã·ã³ã®æ°ããã¹ã¿ãŒã®ææè ãã¯ã©ã¹ã¿ãŒãããŒãžã£ãŒã®ææè ãªã©ãèšãå¿ èŠããããŸãïŒã ãã®æ§æãããã«ããããšã¯æãŸãããããŸããã ã¢ããªã±ãŒã·ã§ã³åã®ã¿ãæ®ããŸãã
ãããŠãSpringãå©ãã«ãªããŸãã2ã€ã®BeanãäœæããŸãã 1ã€ã¯ãããã¯ã·ã§ã³ãããã¡ã€ã«ã®äžã«ããïŒéåžžãææè ããã·ã³ã®æ°ãªã©ã«é¢ããæ å ±ã¯éä¿¡ãããŸããïŒããã1ã€ã¯ããŒã«ã«ãããã¡ã€ã«ã®äžã«ãããŸãïŒããã§ã¯ãã®æ å ±ãéä¿¡ããŸããããã«åå²ã§ããŸãïŒã ãã¹ãã§ã¯ã1ã€ã®BeanãSparkContextããæ©èœããæ¬çª-å¥ã®Beanãæ©èœããŸãã
@Bean @Profile("LOCAL") public JavaSparkContext sc() { SparkConf conf = new SparkConf(); conf.setAppName("music analyst"); conf.setMaster("local[1]"); return new JavaSparkContext(conf); } @Bean @Profile("PROD") public JavaSparkContext sc() { SparkConf conf = new SparkConf(); conf.setAppName("music analyst"); return new JavaSparkContext(conf); }
RDDã®æ©èœã®ãªã¹ãã¯æ¬¡ã®ãšããã§ãã
map flatMap filter mapPartitions, mapPartitionsWithIndex sample union, intersection, join, cogroup, cartesian (otherDataset) distinct reduceByKey, aggregateByKey, sortByKey pipe coalesce, repartition, repartitionAndSortWithinPartitions
ãããã¯ã¹ããªãŒã é¢æ°ã«éåžžã«äŒŒãŠããŸãïŒãã¹ãŠäžå€ã§ãããRDDãè¿ããŸãïŒã¹ããªãŒã ã®äžçã§ã¯ãäžéæäœãšåŒã°ããããã§ã¯-å€æïŒã 詳现ã¯èª¬æããŸããã
ã¢ã¯ã·ã§ã³ããããŸãïŒStreamsã®äžçã§ã¯ãã¿ãŒããã«æäœãšåŒã°ããŠããŸããïŒã
reduce collect count, countByKey, countByValue first take, takeSample, takeOrdered saveAsTextFile, saveAsSequenceFile, saveAsObjectFile foreach
ã¢ã¯ã·ã§ã³ãšã¯äœãããã©ã³ã¹ãã©ãŒã¡ãŒã·ã§ã³ãšã¯äœããå€æããæ¹æ³ã¯ïŒ ã¹ããªãŒã ãšåæ§ã«ãRDDã¡ãœãããRDDãè¿ãå Žåãããã¯å€æã§ãã ããã§ãªãå Žåãããã¯ã¢ã¯ã·ã§ã³ã§ãã
ã¢ã¯ã·ã§ã³ã«ã¯2ã€ã®ã¿ã€ãããããŸãã
- ãã©ã€ããŒã«äœããè¿ããã®ïŒããã¯ã¯ã©ã¹ã¿ãŒäžã«ãªãããšã匷調ããããšãéèŠã§ããçãã¯ãã©ã€ããŒã«æ»ããŸãïŒã ããšãã°ãreduceã¯ãã¹ãŠã®ããŒã¿ãåéããæ¹æ³ã«é¢ããé¢æ°ãåããæçµçã«1ã€ã®åçãè¿ãããŸãïŒäžè¬çãªå Žåã1ã€ã§ããå¿
èŠã¯ãããŸããïŒã
- ãã©ã€ããŒã«å¿çãè¿ããªããã®ã ããšãã°ãåãHadoopãŸãã¯ä»ã®ã¹ãã¬ãŒãžã§åŠçããåŸã«ããŒã¿ãä¿åã§ããŸãïŒãã®ããã®saveAsTextFileã¡ãœããããããŸãïŒã
ã©ã®ããã«æ©èœããŸããïŒ

ãã®ã¹ããŒã ã¯ã¹ããªãŒã ã«äŒŒãŠããŸãããå°ããªæ³šæç¹ã1ã€ãããŸãã äœããã®çš®é¡ã®ããŒã¿ããããããšãã°s3ã¹ãã¬ãŒãžã«ãããŸãã SparkContextã䜿çšããŠãæåã®RDD1ãäœæããŸããã ãã®åŸãããããçš®é¡ã®ç°ãªãå€æãè¡ãããããããRDDãè¿ããŸãã æåŸã«ãã¢ã¯ã·ã§ã³ãå®è¡ããäœããã®å©çïŒä¿åãå°å·ããŸãã¯åãåã£ããã®ã転éïŒãååŸããŸãã ãã¡ããããã®éšåã¯ã¯ã©ã¹ã¿ãŒã§å®è¡ãããŸãïŒãã¹ãŠã®RDDã¡ãœããã¯ã¯ã©ã¹ã¿ãŒã§å®è¡ãããŸãïŒã çµæããªãããã®çãã§ããå ŽåãæåŸã«å°ããªæçããã©ã€ããŒã§èµ·åãããŸãã Dataã®å·ŠåŽïŒã€ãŸããSparkã³ãŒãã®äœ¿çšãéå§ããåïŒã¯ãã¹ãŠãã¯ã©ã¹ã¿ãŒã§ã¯ãªãããã©ã€ããŒã§å®è¡ãããŸãã
ãã®Lazyã¯ãã¹ãŠãã¹ããªãŒã ãšãŸã£ããåãã§ãã å€æã§ããåRDDã¡ãœããã¯äœãè¡ããŸããããã¢ã¯ã·ã§ã³ãåŸ ã¡ãŸãã ã¢ã¯ã·ã§ã³ããããšããã§ãŒã³å šäœãéå§ãããŸãã ãããŠãããã§å€å žçãªçåãçããŸãããã®å Žåãããã§äœãããŠããã®ã§ããããïŒ

ç§ã®ããŒã¿ã¯ãããéè¡ã§ã®éå»5幎éã®ãã¹ãŠã®ééååŒã§ãããšæ³åããŠãã ããã ãããŠãç§ã¯ããªãé·ãæ²»çãå®è¡ããå¿ èŠããããããã¯åå²ãããŸããç§ã¯ãã¹ãŠã®ç·æ§ã®ããã«1ã€ã®ã¢ã¯ã·ã§ã³ãè¡ãããã¹ãŠã®å¥³æ§ã®ããã«-å¥ã®ã¢ã¯ã·ã§ã³ãè¡ããŸãã ããã»ã¹ã®æåã®éšåã«10åããããšããŸãããã ããã»ã¹ã®2çªç®ã®éšåã«ã¯1åããããŸãã åèš12åãååŸããå¿ èŠãããããã«æããŸããïŒ
ããããé 延ãçºçããã®ã¯22åã§ããã¢ã¯ã·ã§ã³ãèµ·åããããã³ã«ããã§ãŒã³å šäœãæåããæåŸãŸã§å®è¡ãããŸãã ç§ãã¡ã®å Žåãå ±éã®ãã£ã³ã¯ã¯2åããéå§ãããŸãããã15åã®ãã©ã³ããããå Žåã¯ã©ãã§ããããïŒ
åœç¶ãããã©ãŒãã³ã¹ã«å€§ããªææãäžããŸãã Sparkã®äžçã§ã¯ãç¹ã«é¢æ°åããã°ã©ãã³ã°ã«ç²ŸéããŠãã人ã«ãšã£ãŠã¯ãã³ãŒããæžãã®ã¯éåžžã«ç°¡åã§ãã ããããã§ãããã¯å€æããŸããã å¹ççãªã³ãŒããèšè¿°ãããå Žåãå€ãã®æ©èœãç¥ãå¿ èŠããããŸãã
åé¡ã解決ããŠã¿ãŸãããã ã¹ããªãŒã ã§äœãããŸããïŒ åœŒãã¯ããã€ãã®åéãè¡ããããããã¹ãŠãŸãšããŠåéããããããã¹ããªãŒã ãååŸããŸãã

GetTaxiãè©ŠããŠã¿ãŸãããã次ã®ããã«ãªããŸããã

ããã«ãã¯ã©ã¹ã¿ãŒäžã§ããã«å€ãã®ãã·ã³ãè³Œå ¥ããäºå®ã§ãã£ãããã40å°ãããããããã«20ã®ã¬ãã€ãã®RAMããããŸããã
ç解ããå¿ èŠããããŸããããã°ããŒã¿ã«ã€ããŠè©±ããŠããå Žåãåéããæç¹ã§ããã¹ãŠã®RDDããã®ãã¹ãŠã®æ å ±ãDriverã§è¿ãããŸãã ãã®ãããã®ã¬ãã€ããšãã·ã³ã¯æ±ºããŠå©ãã«ã¯ãªããŸãããåéãããšããã¹ãŠã®æ å ±ãã¢ããªã±ãŒã·ã§ã³ãéå§ãããå Žæãã1ã€ã®å Žæã«ããŒãžãããŸãã åœç¶ãã¡ã¢ãªäžè¶³ã«ãªããŸãã
ãã®åé¡ãã©ã®ããã«è§£æ±ºããŸããïŒãã§ãŒã³ã2åå®è¡ããããããã«15åå®è¡ããããåéãå®è¡ããããªãïŒã ãããè¡ãããã«ãSparkã«ã¯persistã¡ãœããããããŸãã

Persistã§ã¯ãç¶æ RDDãä¿åã§ããä¿åå ãéžæã§ããŸãã å€ãã®ä¿è·ãªãã·ã§ã³ããããŸãã æãæé©ãªãã®ã¯ã¡ã¢ãªå ã«ãããŸãïŒã¡ã¢ãªã®ã¿ããããŸãããã¡ã¢ãªã¯2ã€ãããããŸãã-ããã¯ã¢ããã2ã€ãããŸãïŒã ã«ã¹ã¿ã ã¹ãã¬ãŒãžãäœæããŠãä¿åæ¹æ³ãæå®ããããšãã§ããŸãã ã¡ã¢ãªãšãã£ã¹ã¯ãä¿åã§ããŸã-ã¡ã¢ãªã«ä¿åããããšããŸããããã®ã¯ãŒã«ãŒïŒãã®RDDãå®è¡ãããã·ã³ïŒã«ååãªRAMããªãå Žåãäžéšã¯ã¡ã¢ãªã«æžã蟌ãŸããæ®ãã¯ãã£ã¹ã¯ã«ãã©ãã·ã¥ãããŸãã ããŒã¿ããªããžã§ã¯ããšããŠä¿åããããã·ãªã¢ã«åãå®è¡ã§ããŸãã åãªãã·ã§ã³ã«ã¯é·æãšçæããããŸããããã®ãããªæ©äŒããããããã¯çŽ æŽãããããšã§ãã
ãã®åé¡ã解決ããŸããã æ°žç¶åã¯ã¢ã¯ã·ã§ã³ã§ã¯ãããŸããã ã¢ã¯ã·ã§ã³ããªãå Žåãpersistã¯äœãããŸããã æåã®ã¢ã¯ã·ã§ã³ãéå§ããããšããã§ãŒã³å šäœãå®è¡ãããRDDãã§ãŒã³ã®æåã®éšåã®çµããã§ãããŒã¿ãé 眮ãããŠãããã¹ãŠã®ãã·ã³ã§ä¿æãããŸãã ã¢ã¯ã·ã§ã³RDD6ãå®è¡ãããšãã¯ãæ¢ã«æ°žç¶åããéå§ããŸãïŒä»ã®ãã©ã³ããããå Žåã¯ããèšæ¶ããŸãã¯ãããŒã¯ããããæ°žç¶åããç¶è¡ããŸãïŒã
ç¥è©±2. Sparkã¯Scalaã§ã®ã¿æžãããŠããŸãã
Sparkã¯åªããŠãããäžéšã®ããŒã«ã«ããŒãºã«ã䜿çšã§ããŸãããå¿ ãããããã°ããŒã¿ã«ã¯äœ¿çšã§ããŸããã APIãããŒã¿åŠçã«äœ¿çšããã ãã§æžã¿ãŸãïŒæ¬åœã«äŸ¿å©ã§ãïŒã 質åãçºçããŸãïŒäœã«æžãã¹ãã§ããïŒ PythonãšRç§ã¯ããã«åŽäžããŸããã 調ã¹ãŠã¿ãŸãããïŒScalaãšJavaã®ã©ã¡ãã§ããïŒ
éåžžã®Javaéçºè ã¯Scalaã«ã€ããŠã©ãæããŸããïŒ

äžçŽã®Javaéçºè ã¯ããå°ãèŠãŠããŸãã 圌ã¯ãããçš®ã®éã³ãããã€ãã®ã¯ãŒã«ãªãã¬ãŒã ã¯ãŒã¯ãã©ã ããããã³å€ãã®äžåœèªãããããšãç¥ã£ãŠããŸãã
ãå°»ãèŠããŠããŸããïŒ åœŒå¥³ãããã ãããScalaã³ãŒãã®å€èŠ³ã§ãã
val lines = sc.textFile("data.txt") val lineLengths = lines.map(_.length) val totalLength = lineLengths.reduce(_+_)
ç§ã®ç©¶æ¥µã®ç®æšã¯ãJavaã§æžãããšã¯æªããªãããšãçŽåŸãããããšãªã®ã§ãScala APIã«ã¯å ¥ããŸãããããã®ã³ãŒãã¯åè¡ã®é·ããæ°ããå šäœãèŠçŽããŸãã
Javaã«å¯Ÿããéåžžã«åŒ·åãªè°è«ã¯ãåãJavaã³ãŒãã次ã®ããã«èŠããããšã§ãã
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() { @Override public Integer call(String lines) throws Exception { return lines.length(); } }); Integer totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } });
æåã®ãããžã§ã¯ããå§ãããšããåœå±ã¯ç§ã«ç¢ºä¿¡ããããã©ããå°ããŸããã çµå±ãç§ãã¡ãæžããšããããå€ãã®ã³ãŒããããã§ãããã ããããããã¯ãã¹ãŠåã§ãã ä»æ¥ã®ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
val lines = sc.textFile("data.txt") val lineLengths = lines.map(_.length) val totalLength = lineLengths.reduce(_+_)
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(String::length); int totalLength = lineLengths.reduce((a, b) -> a + b);
ScalaãšJava 8ã«ã¯å€§ããªéãããããŸããïŒ ããã¯ãJavaããã°ã©ããŒã«ãšã£ãŠèªã¿ããããšæãããŸãã ããããJava 8ã«ãããããããSparkã¯Scalaã§äœæããå¿ èŠããããšããç¥è©±ã«ãªããŸãã Java 8ã§ã¯ãã¹ãŠãããã»ã©æªããªãããšãç¥ã£ãŠãã人ããããScalaã§æžãå¿ èŠããããšäž»åŒµããŸããïŒ
Scalaã®å ŽåïŒ
- Scalaã¯ã¯ãŒã«ã§ãæµè¡ã«ææã§ããã¡ãã·ã§ããã«ã§ããŸãã«å
ãžé²ãå¿
èŠããããŸãã ãã®GroovyãNahigãScalaã§ã¯ãã¹ãŠãééããªãé¢çœãã§ãã
- Scalaã¯ç°¡æœã§äŸ¿å©ãªæ§æã§ãã åžç¥ãããŸãã
- Spark APIã¯Scalaã§äœæãããŠãããããäž»ã«Scalaçšã«èšèšãããŠããŸãã ããã¯æ·±å»ãªãã©ã¹ã§ãã
- Java APIã¯ãåŸã§æåºããªããã°ãªããŸãããåœé ããå¿
èŠãããããã§ãã åžžã«ãã¹ãŠãããã«ããããã§ã¯ãããŸããã
Javaã®å ŽåïŒ
- ã»ãšãã©ã®Javaããã°ã©ããŒã¯Javaãç¥ã£ãŠããŸãã ãããã®äººã
ã¯Scalaãç¥ããªãã 倧äŒæ¥ã§ã¯ãå€ããå°ãªããJavaãæ±ãJavaããã°ã©ããŒã®éãŸãã Sparkãæžãããã«ScalaãæäŸããŸããïŒ ãããã
- ããªãã¿ã®äžç-Springãããªãã¿ã®ãã¶ã€ã³ãã¿ãŒã³ãMavenãŸãã¯ãã以äž-Gradleãã·ã³ã°ã«ãã³ãªã© ç§ãã¡ã¯ããã§åãããšã«æ
£ããŠããŸãã ãŸããScalaã¯ç°ãªãæ§æã§ããã ãã§ãªããä»ã®å€ãã®æŠå¿µã§ããããŸãã Scalaã¯å¶åŸ¡ã®å転ãå¿
èŠãšããŸããããªããªãã ããã§ã¯ãã¹ãŠãç°ãªããŸãã
ãªãJavaã¯ãŸã åªããŠããã®ã§ããïŒ ãã¡ãããç§ãã¡ã¯ScalaãæããŠããŸããããéã¯Javaã«ãããŸãã

äœãèµ·ãã£ãããè°è«ããããããã£ã¹ã-Issue 104-ãèããŠãã ããã
ç°¡åã«èª¬æããŸãã
1幎åã2010幎ã«TypesafeãéããMartin OderskyãéããŸããã ScalaããµããŒãããTypesafeã¯ãªããªããŸããã
ããã¯ãTypesafeã®ä»£ããã«Lightbendãšããå¥ã®äŒç€Ÿãèšç«ãããããScalaãæ»ãã ãšããæå³ã§ã¯ãããŸãããããŸã£ããç°ãªãããžãã¹ã¢ãã«ãæã£ãŠããŸãã PlayãAkkaãSparkã®ãããªScalaã§æžãããçŽ æŽããããã®ã®ãããã§ããããããŠäžèšã®æçã®ãããã§ããã倧è¡ãScalaã«åãæ¿ããããšã¯äžå¯èœã§ãããšããçµè«ã«éããŸããã 1幎åãScalaã¯äººæ°ã®ããŒã¯ã§ããããããã«ãããããããã©ã³ãã³ã°ã®æåã®40ã®å Žæã«ãããããŸããã§ããã æ¯èŒã®ããã«ãGroovyã¯Javaã®20çªç®ãæåã¯Groovyã§ããã
人æ°ã®ããŒã¯æã§ããã人ã ã倧è¡ã®éã§Scalaã䜿çšããããšã匷å¶ããŠããªãããšã«æ°ä»ãããšãã圌ãã¯èªåã®ããžãã¹ã¢ãã«ã誀ã£ãŠãããšèªèããŸããã ä»æ¥Scalaãå°å ¥ããäŒæ¥ã¯ãç°ãªãããžãã¹ã¢ãã«ãæ¡çšããŠããŸãã 圌ãã¯ãSparkã®ãããªå€§è¡åãã«äœããããã¹ãŠã®è£œåã«ã¯åªããJava APIããããšèšã£ãŠããŸãã ãããŠãããŒã¿ãã¬ãŒã ã«å°éãããšãScalaã§æžããJavaã§æžããã«éãã¯ãªãããšãããããŸãã
ç¥è©±3. SparkãšSpringã¯äºææ§ããããŸãã
æåã«ãBeanãšããŠç»é²ãããŠããSparkContextãããããšãæ¢ã«ç€ºããŸããã 次ã«ãPostanããã»ããµBeanã䜿çšããŠãSparkã®æ©èœããµããŒãããæ¹æ³ã確èªããŸãã
ãã§ã«ã³ãŒããæžããŸãããã
RDDæååãšäžäœã¯ãŒãã®æ°ãåãå ¥ãããµãŒãã¹ïŒãã«ããŒïŒãäœæããŸãã 圌ã®ä»äºã¯ãããã¯ãŒããè¿ãããšã§ãã ã³ãŒãã§äœãããã®ãèŠãŠã¿ãŸãããã
@service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap) .sortByKey().map(Tuple2::_2).take(x); } }
ãŸããæè©ãå°æåã§ããã倧æåã§ããããããããªãããã倧æåãšå°æåã§åèªãäºéã«ã«ãŠã³ãããªãããã«ãã¹ãŠãå°æåã«å€æããå¿ èŠããããŸãã ãããã£ãŠããããé¢æ°ã䜿çšããŸãã ãã®åŸãé¢æ°flatmapã䜿çšããŠè¡ãåèªã«å€æããå¿ èŠããããŸãã
ããã§ãåèªãååšããRDDãã§ããŸããã æ°éã«å¯ŸããŠãããã³ã°ããŸãã ããããæåã«ãååèªã«åäžæ§ãå²ãåœãŠãå¿ èŠããããŸãã ããã¯å€å žçãªãã¿ãŒã³ã«ãªããŸãïŒåèª-1ãåèª-1ããããåãåèªã«å¯Ÿãããã¹ãŠã®ãã®ãåèšããŠãœãŒãããå¿ èŠããããŸãïŒãã¹ãŠãã¡ã¢ãªå ã§æ©èœããååãªã¡ã¢ãªãããå Žåã¯äžéçµæã¯ãã£ã¹ã¯ã«ä¿åãããŸããïŒã
mapToPairé¢æ°ããããŸã-ãã¢ãäœæããŸãã åé¡ã¯ãJavaã«ã¯Pairã¯ã©ã¹ããªãããšã§ãã å®éã«ã¯ãããã¯å€§ããªçç¥ã§ããç¹å®ã®ã³ã³ããã¹ãã§çµåãããäœããã®æ å ±ãããããšãéåžžã«å€ãã®ã§ããããã®ããã®ã¯ã©ã¹ãæžãã®ã¯æãã§ãã
Scalaã«ã¯æ¢è£œã®ã¯ã©ã¹ããããŸãïŒãããããããŸãïŒ-Tupleã Tuple2ã3ã4ãªã©ããããŸãã 22ãžããªã22ãžã 誰ãç¥ããªãã 2ãããããããããTuple2ãå¿ èŠã§ãã
ããããã¹ãŠæžããå¿ èŠããããŸãã ãã¹ãŠã®åãåèªãããŒãšããŠæ®ãreduceByKeyã¡ãœãããããããã¹ãŠã®å€ã§ç§ãæ±ããããšãè¡ããŸãã æããããå¿ èŠããããŸãã ãã¢ãååŸããŸãããéã¯éã§ãã
次ã«ããœãŒãããå¿ èŠããããŸãã ããã§ããJavaã«å°ããªåé¡ããããŸãããªããªãã å¯äžã®ãœãŒãã¯sorkByKeyã§ãã Scala APIã«ã¯sortbyããããããã§ãã®TupleãååŸããŠãããããå¿ èŠãªãã®ãåŒãåºããŸãã ãããŠãããã¯SortByKeyã®ã¿ã§ãã
ç§ãèšã£ãããã«ããããŸã§ã®ãšããããã€ãã®å Žæã§ã¯ãJava APIãååã«è±å¯ã§ã¯ãªããšæããŠããŸãã ããããããªãã¯æãåºãããšãã§ããŸãã ããšãã°ããã¢ãå転ã§ããŸãã ãããè¡ãããã«ãåã³mapToPairãäœæããŸããTupleã«ã¯çµã¿èŸŒã¿ã®ã¹ã¯ããé¢æ°ããããŸãïŒæ°åã®åèªãå€æããŸããïŒã ããã§ãsortByKeyãå®è¡ã§ããŸãã
ãã®åŸãæåã®éšåã§ã¯ãªãã2çªç®ã®éšåãåŒãåºãå¿ èŠããããŸãã ãããã£ãŠãå°å³ãäœæããŸãã 2çªç®ã®éšåãåŒãåºãããã«ãTupleã«ã¯æ¢è£œã®é¢æ°ã_2ãããããŸãã ããã§TakeïŒxïŒãå®è¡ããŸãïŒxèªã®ã¿ãå¿ èŠã§ã-ã¡ãœããã¯TopXãšåŒã°ããŸãïŒãããããã¹ãŠã¯returnã§å®è¡ã§ããŸãã
ãã¹ãã®å®è¡æ¹æ³ã瀺ããŸãã ããããã®åã«ãSpringã§ã®ç§ã®Javaæ§æã®å 容ãèŠãŠãã ããïŒSpringã§äœæ¥ããŠããŸããããã¯åãªãã¯ã©ã¹ã§ã¯ãªãããµãŒãã¹ã§ãïŒã
@Configuration @ComponentScan(basePackages = "ru.jug.jpoint.core") @PropertySource("classpath:user.properties") public class AppConfig { @Bean public JavaSparkContext sc() { SparkConf conf = new SparkConf().setAppName("music analytst").setMaster("local[*]"); return new JavaSparkContext(conf); } @Bean public static PropertySourcesPlaceholderConfigurer configurer(){ return new PropertySourcesPlaceholderConfigurer(); } }
Java configã§ãããçš®ã®user.propertiesãèªã¿ãŸããïŒçç±ã¯åŸã§èª¬æããŸãããä»ã¯äœ¿çšããŸããïŒã ãŸãããã¹ãŠã®ã¯ã©ã¹ãã¹ãã£ã³ãã2ã€ã®BeanãèŠå®ããŸããPropertySourcePlceholderConfigurer-ããããã£ãã¡ã€ã«ããäœããæ³šå ¥ã§ããããã«ãããã¯ãŸã é¢ä¿ãããŸããã ãããŠä»ç§ãã¡ãèå³ãæã£ãŠããå¯äžã®Beanã¯ãéåžžã®JavaSparkContextã§ãã
SparkConfãäœæããŠã»ããã¢ãããïŒé³æ¥œã¢ããªã¹ããšåŒã°ããããã°ã©ã ïŒããã¹ã¿ãŒãããããšãäŒããŸããïŒããŒã«ã«ã§äœæ¥ããŠããŸãïŒã JavaSparkContextãäœæããŸãã-ãã¹ãŠãçŽ æŽãããã§ãã
ãã¹ããèŠãŠãã ããã
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = AppConfig.class) public class PopularWordsServiceImplTest { @Autowired JavaSparkContext sc; @Autowired PopularWordsService popularWordsService; @Test public void textTopX() throws Exception { JavaRDD<String> rdd = sc.parallelize(Arrays.asList(âjava java java scala grovy grovyâ); List<String> top1 = popularWordsService.topX(rdd, 1); Assert.assertEquals(âjavaâ,top1.get(0)); } }
Springã§äœæ¥ããŠãããããã©ã³ããŒã¯èªç¶ã«è·³ãäžãããŸãã æ§æã¯AppConfigã§ãïŒãã¹ãçšãšéçšçšã«ç°ãªãæ§æãäœæããã®ãæ£ããã§ãããïŒã 次ã«ãããã«JavaSparkContextãšç¢ºèªããããµãŒãã¹ãæ³šå ¥ããŸãã SparkContextã䜿çšããŠãparallelizeã¡ãœããã䜿çšãããjava java java scala grovy grovyããšããæååãæž¡ããŸãã 次ã«ãã¡ãœãããå®è¡ããJavaãæãäžè¬çãªåèªã§ããããšã確èªããŸãã
ãã¹ãã¯èœã¡ãŸããã æã人æ°ãããã®ã¯ã¹ã«ã©ã ããã§ãã

äœãããã®ãå¿ããŸãããïŒ äžŠã¹æ¿ããè¡ã£ããšããä»ã®æ¹æ³ã§äžŠã¹æ¿ããå¿ èŠããããŸããã
ãµãŒãã¹ã§ä¿®æ£ããŸãã
@service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
ãã¹ãã«åæ ŒããŸããã
ããã§mainãå®è¡ããŠãå®éã®æ²ã§çµæã確èªããŠãã ããã ããŒã¿ãã£ã¬ã¯ããªããããŸããããŒãã«ãºãã©ã«ããŒããããããã«ã¯æšæ¥ã®1æ²ã®ããã¹ãããããŸãã æšæ¥ã®æã人æ°ã®ããèšèã¯äœã ãšæããŸããïŒ

ããã«ãArtistsJudgeãµãŒãã¹ããããŸãã TopXã¡ãœãããå®è£ ããŸãããããã¯ã¢ãŒãã£ã¹ãã®ååãååŸãããã®ã¢ãŒãã£ã¹ãã®æ²ã眮ãããŠãããã£ã¬ã¯ããªãè¿œå ããŠããããã§ã«äœæããããµãŒãã¹ã®topXã¡ãœããã䜿çšããŸãã
@Service public class ArtistJudgeImpl implements ArtistJudge { @Autowired private PopularDFWordsService popularDFWordsService; @Autowired private WordDataFrameCreator wordDataFrameCreator; @Value("${path2Dir}") private String path; @Override public List<String> topX(String artist, int x) { DataFrame dataFrame = wordDataFrameCreator.create(path + "data/songs/" + artist + "/*"); System.out.println(artist); return popularDFWordsService.topX(dataFrame, x); } @Override public int compare(String artist1, String artist2, int x) { List<String> artist1Words = topX(artist1, x); List<String> artist2Words = topX(artist2, x); int size = artist1Words.size(); artist1Words.removeAll(artist2Words); return size - artist1Words.size(); } public static void main(String[] args) { List<String> list = Arrays.asList("", null, ""); Comparator<String> cmp = Comparator.nullsLast(Comparator.naturalOrder()); System.out.println(Collections.max(list, cmp)); /* System.out.println(list.stream().collect(Collectors.maxBy(cmp)).get()); System.out.println(list.stream().max(cmp).get()); */ } }
ã¡ã€ã³ã¯ãã®ããã«èŠããŸãïŒ
package ru.jug.jpoint; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import ru.jug.jpoint.core.ArtistJudge; import java.util.List; import java.util.Set; /** * Created by Evegeny on 20/04/2016. */ public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); ArtistJudge judge = context.getBean(ArtistJudge.class); List<String> topX = judge.topX("beatles", 3); System.out.println(topX); } }
ãããã£ãŠãæã人æ°ã®ããåèªã¯æšæ¥ã§ã¯ãªãããiãã§ãã
[i, yesterday, to]
åæããŸããããã¯ããŸãè¯ããããŸããã ã»ãã³ãã£ãã¯ã®è² è·ã䌎ããªãäžèŠãªåèªããããŸãïŒæçµçã«ã¯ãPink Floydã®æã®æ·±ããåæããããšæããŸãããããã®åèªã¯ç§ãã¡ã倧ãã劚害ããŸãïŒã
ãããã£ãŠãäžèŠãªåèªãå®çŸ©ããuserPropertiesãã¡ã€ã«ããããŸããã
garbage = the,you,and,a,get,got,m,chorus,to,i,in,of,on,me,is,all,your,my,that,it,for
ãã®ã¬ããŒãžãããã«ãµãŒãã¹ã«æ³šå ¥ããããšãã§ããŸãããç§ã¯ããã奜ãŸãªãã®ã§ãã ããŸããŸãªãµãŒãã¹ã«éä¿¡ãããUserConfigããããŸãã 誰ãã圌ããå¿ èŠãªãã®ãåŒãåºããŸãã
@Component public class UserConfig implements Serializable{ public List<String> garbage; @Value("${garbage}") private void setGarbage(String[] garbage) { this.garbage = Arrays.asList(garbage); } }
ã»ãã¿ãŒã«ã¯privateã䜿çšããããããã£èªäœã«ã¯publicã䜿çšããŠããããšã«æ³šæããŠãã ããã ããããããã«ãã ããã®ã¯ãããŸãããã
PopularWordsServiceImplã«ç§»åãããã®UserConfigã«èªåæ¥ç¶ããŠããã¹ãŠã®åèªããã£ã«ã¿ãªã³ã°ããŸãã
@service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
main.
, ( ):

, not serializable. . , UserConfig â serializable.
Component public class UserConfig implements Serializable{ public List<String> garbage; @Value("${garbage}") private void setGarbage(String[] garbage) { this.garbage = Arrays.asList(garbage); } }
serializable PopularWordsServiceImpl:
@Service public class PopularWordsServiceImpl implements PopularWordsService {
serializable:
public interface PopularWordsService extends Serializable { List<String> topX(JavaRDD<String> lines, int x); }
map- ( , ) state- - , . ã€ãŸã UserConfig , serializable. , UserConfig , . , serializable.
. yesterday. â oh, â believe. oh -, .
, , UserConfig worker? ? ? , Spark , , - .
, broadcast-.
4. , broadcast
, worker- data ( UserConfig ). , , broadcast, . ( ), broadcast .
2 , :
- . Spark ;
- , , , â . broadcast.
:
Israel, +9725423632 Israel, +9725454232 Israel, +9721454232 Israel, +9721454232 Spain, +34441323432 Spain, +34441323432 Israel, +9725423232 Israel, +9725423232 Spain, +34441323432 Russia, +78123343434 Russia, +78123343434
.
( ), . , . â - property- worker-. , , :
Israel, Orange Israel, Orange Israel, Pelephone Israel, Pelephone Israel, Hot Mobile Israel, Orange Russia, Megaphone Russia, MTC
- Excel-, , 054 â Orange, 911 â . (10 ; 2 â big data) .
:
Orange Orange Pelephone Pelephone Hot Mobile Orange Megaphone MTC
?
public interface CommonConfig { Operator getOperator(String phone); List<String> countries(); }
CommonConfig, , .
, :
@Service public class HrenoviyService { @Autowired private CommonConfig config; public JavaRDD<String> resolveOperatorNames(JavaRDD<Tuple2<String,String>> pairs){ return pairs.filter(pair-> config.countries().contains(pair._1)) .map(pair-> config.getOperator(pair._2).getName()); } }
- Spring, , , data.
! , ( broadcast).
? Driver Worker-, , , , 1 . . , . , , 1000 . Spark-, 10 Worker-.
Worker- - . 10, 1000, 100 . , , , .. - ( 1 , 2, .. 2 ). , worker- , broadcast.
:

context, , broadcast, , broadcast-. , worker- .
åé¡ã¯äœã§ããïŒ :
@Service public class HrenoviyService { @Autowired private JavaSparkContext sc; @Autowired private CommonConfig commonConfig; private Broadcast<CommonConfig> configBroadcast; @PostConstruct public void wrapWithBroadCast(){ configBroadcast = sc.broadcast(commonConfig); } public JavaRDD<String> resolveOperatorNames(JavaRDD<Tuple2<String,String>> pairs){ return pairs.filter(pair-> configBroadcast.value().countries().contains(pair._1)) .map(pair-> configBroadcast.value().getOperator(pair._2).getName()); } }
context (, , Spring). broadcast- , PostConstruct, wrapWithBroadcast. SparkContext , . PostConstruct.
( broadcast , ):
return pairs.filter(pair-> configBroadcast.value().countries().contains(pair._1)) .map(pair-> configBroadcast.value().getOperator(pair._2).getName());
:

SparkContext, . . copy-paste, , broadcast, .

copy-past .
, Spark - ( broadcast â ). , SparkContext, .
:

SparkContext , serializable.
, , , . :

broadcast? , bean:

broadcast-, bean , broadcast.

, , broadcast? , , Service , , broadcast.
.
@Service public class PopularWordsServiceImpl implements PopularWordsService { @AutowiredBroadcast private Broadcast<UserConfig> userConfig;
broadcast UserConfig AutowiredBroadcast. , ?
:
@Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .filter(w -> !userConfig.value().garbage.contains(w)) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
UserConfig.value, .
, bean-, .
.
lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .filter(word-> !Arrays.asList(garbage).contains(word)) .mapToPair(word-> new Tuple2<>(word, 1)) .reduceByKey((x, y)->x+y) .mapToPair(Tuple2::swap) .sortByKey(false) .map(Tuple2::_2) .take(amount);
lines.map(_.toLowerCase()) .flatMap("\\w+".r.findAllIn(_)) .filter(!garbage.contains(_)) .map((_,1)).reduceByKey(_+_) .sortBy(_._2,ascending = false) .take(amount)
, Java ( ..). â Scala. , Java 8, 2 . , :

Java GetWords, . Scala . Scala SortBy, Tuple, Scala ( ascending false, false).
? .
DataFrames â API, Spark 1.3. , ( Tuple). RDD, .. RDD , â . ( ), task- .
:
- hive-;
- json- ;
- RDD;
- ;
- .
DSL SQLContext ( ).
, :
Agg, columns, count, distinct, drop, dropDuplicates, filter groupBy, orderBy, registerTable, schema, show, select, where, withColumn
SQL :
dataFrame.registerTempTable("finalMap"); DataFrame frame = sqlContext.sql("select cl_id, cl_grp_id, dk_org_snw, dk_org_hnw, dk_org_cnp, dk_dir, dk_dat, DK_TIM_HR as dk_tim_hr, dk_spe, dk_sgt, dk_pet, dk_sgs, dk_sbp,\n" + "SUM(slu_atpt) slu_atpt, SUM(slu_succ) slu_succ, SUM(slu_fail) slu_fail, SUM(slu_dly) slu_dly\n" + "FROM finalMap f join tdtim t on f.dk_tim = t.DK_TIM\n" + "WHERE dk_pet IN (1, 4)\n" + "group by cl_id, cl_grp_id, dk_org_snw, dk_org_hnw, dk_org_cnp, dk_dir, dk_dat, DK_TIM_HR, dk_spe, dk_sgt, dk_pet, dk_sgs, dk_sbp").toDF();
, SQL sqlContext.
, :

:
abs, cos, asin, isnull, not, rand, sqrt, when, expr, bin, atan, ceil, floor, factorial, greatest, least, log, log10, pow, round, sin, toDegrees, toRadians, md5, ascii, base64, concat, length, lower, ltrim, unbase64, repeat, reverse, split, substring, trim, upper, datediff, year, month, hour, last_day, next_day, dayofmonth, explode, udf
, . :

keywords (, ).
main.

-, , . sqlContext.read.json ( , json, â ; json ). show. :

: , keywords, . . , 30 ( ), .
. linkedIn. select, keywords. , , explode ( keyword ).
linkedIn.select(functions.explode(functions.column(âkeywordsâ)).as(âkeywordâ));
. , :

sort .. .
. keyword:
DataFrame orderedBy = keywords.groupBy(âkeywordâ) .agg(functions.count(âkeywordâ).as(âamountâ)) .orderBy(functions.column(âamountâ).desc()); orderedBy.show();
, . keyword- amount. amount descended ( false). :

. :
String mostPopularWord = orderedBy.first().getString(0); System.out.println(âmostPopularWord = â + mostPopularWord);
first â , string ( resultset-). , :
linkedIn.where{ functions.column(âageâ).leq(30).and(functions.array_contains(functions.column(âkeywordsâ).mostPopularWord))) .select(ânameâ).show(); }
30 . , functions.array_contains. show. :

. : XML-, JSON-, . ( )? , Java Scala? , .
WordDataFrameCreator.
, :
@Component public class WordDataFrameCreator { @Autowired private SQLContext sqlContext; @Autowired private JavaSparkContext sc; public DataFrame create(String pathToDir) { JavaRDD<Row> rdd = sc.textFile(pathToDir).flatMap(WordsUtil::getWords).map(RowFactory::create); return sqlContext.createDataFrame(rdd, DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("words", DataTypes.StringType, true) })); } }
. RDD map- . , RowFactory â RDD. RDD, RDD , , , , .. , â . SqlContext.
, SqlContext JavaSparkContext ( AppConfig, SqlContext ). , :
public SQLContext sqlContext(){ return new SQLContext(sc()); }
SqlContext , RDD, , , â ( , words, string â true).
API : , - .
:
@Service public class PopularDFWordsServiceImpl implements PopularDFWordsService { @AutowiredBroadcast private Broadcast<UserConfig> userConfig; @Override public List<String> topX(DataFrame lines, int x) { DataFrame sorted = lines.withColumn("words", lower(column("words"))) .filter(not(column("words").isin(userConfig.value().garbage.toArray()))) .groupBy(column("words")).agg(count("words").as("count")) .sort(column("count").desc()); sorted.show(); Row[] rows = sorted.take(x); List<String> topX = new HashSet<>(); for (Row row : rows) { topX.add(row.getString(0)); } return topX; } }
, , RDD, . API .
lower case . withColumn â , . , , . , count , â descended-. - .
. , ? . , custom-, , .

ustom- ( udf) â , . . notGarbage. , udf1, string (), â boolean ( ).
, :
@Service public class PopularWordsResolverWithUDF { @Autowired private GarbageFilter garbageFilter; @Autowired private SQLContext sqlContext; @PostConstruct public void registerUdf(){ sqlContext.udf().register(garbageFilter.udfName(),garbageFilter, DataTypes.BooleanType); } public List<String> mostUsedWords(DataFrame dataFrame, int amount) { DataFrame sorted = dataFrame.withColumn("words", lower(column("words"))) .filter(callUDF(garbageFilter.udfName(),column("words")))âŠ
, , PostConstruct .
callUDF â . â , - . udf-.
UDF , , , @RegisterUDF BPP .
, ( Tomcat, ):
10 :

( ):

, . 6 4 .
:

:

Pink Floyd 0 . , :

çµè«
- Hadoop Spark, Spark Hadoop. Yarn, , â ;
- Scala, . ;
- : , Spring, , , ..;
- . - , bean, , ;
- ( , ).
:

:
â 5 Spark-. , , , . Spark , - . , : , , , â « Spark!» .
â 7-8 JPoint 2017 . : «Spring â » « Spring Test» . , !
, JPoint Java â , .