ç§ãã¡ã¯ã€ãã«Sparkãã¬ãŒã ã¯ãŒã¯ã«é¢ããæ¬ã翻蚳ãå§ããŸããïŒ
æ¬æ¥ãSparkã®æ©èœã«é¢ããã¬ãã¥ãŒèšäºã®ç¿»èš³ã«æ³šç®ããŸããã
Sparkã«ã€ããŠæåã«èããã®ã¯2013幎åŸåãScalaã«èå³ãæã€ããã«ãªã£ããšãã§ã-SparkãæžãããŠããã®ã¯ãã®èšèªã§ãã å°ãåŸã«ãèå³ã®ããã«ãã¿ã€ã¿ããã¯ã®ä¹å®¢ã®çåãäºæž¬ããããšã«å°å¿µããããŒã¿ãµã€ãšã³ã¹ã®åéãããããžã§ã¯ãã®éçºã«çæããŸããã Sparkããã°ã©ãã³ã°ãšãã®æŠå¿µãç解ããã®ã«æé©ãªæ¹æ³ã§ããããšãããããŸããã ãã¹ãŠã®åå¿è Sparkéçºè ã«åœŒãç¥ã£ãŠãããããšã匷ããå§ãããŸãã
ä»æ¥ãSparkã¯AmazonãeBayãYahooïŒãªã©ã®å€ãã®äž»èŠäŒæ¥ã§äœ¿çšãããŠããŸãã å€ãã®çµç¹ã¯ãæ°åã®ããŒãã®ã¯ã©ã¹ã¿ãŒã§SparkãéçšããŠããŸãã Spark FAQã«ãããšããããã®ã¯ã©ã¹ã¿ãŒã®æ倧ã®ãã®ã«ã¯8,000以äžã®ããŒãããããŸãã å®éãSparkã¯æ³šç®ã«å€ããæ¢çŽ¢ãã䟡å€ã®ããæè¡ã§ãã
ãã®èšäºã§ã¯ãSparkã®æŠèŠãšã䜿çšäŸãšã³ãŒããµã³ãã«ã玹ä»ããŸãã
Apache Sparkãšã¯äœã§ããïŒ ã¯ããã«
Sparkã¯ããè¶ é«éã¯ã©ã¹ã¿ãŒã³ã³ãã¥ãŒãã£ã³ã°ãã®ããŒã«ãšããŠäœçœ®ä»ããããŠããApacheãããžã§ã¯ãã§ãã 掻çºãªç¡æã³ãã¥ããã£ã«ãã£ãŠéçºããããã®ãããžã§ã¯ãã¯ãçŸåšãApacheãããžã§ã¯ãã®äžã§æã掻çºã§ãã
Sparkã¯ãé«éã§æ±çšæ§ã®é«ãããŒã¿åŠçãã©ãããã©ãŒã ãæäŸããŸãã Hadoopãšæ¯èŒããŠãSparkã¯ã¡ã¢ãªå ã®ããã°ã©ã ã100å以äžããã£ã¹ã¯äžã®ããã°ã©ã ã10å以äžé«éåããŸãã
ããã«ãSparkã³ãŒãã¯ããé«éã«èšè¿°ãããŸããããã¯ã80ãè¶ ããé«ã¬ãã«ã®æŒç®åãèªç±ã«äœ¿çšã§ããããã§ãã ãããç解ããããã«ãBigDataã®äžçã®ãHello WorldïŒãã¢ããã°ãèŠãŠã¿ãŸããããåèªã«ãŠã³ãã®äŸã§ãã MapReduceçšã«Javaã§èšè¿°ãããããã°ã©ã ã«ã¯çŽ50è¡ã®ã³ãŒããå«ãŸããSparkïŒScalaïŒã§ã¯æ¬¡ã®ãã®ã®ã¿ãå¿ èŠã§ãã
sparkContext.textFile("hdfs://...") .flatMap(line => line.split(" ")) .map(word => (word, 1)).reduceByKey(_ + _) .saveAsTextFile("hdfs://...")
Apache SparkãåŠç¿ãããšãããã1ã€ã®éèŠãªåŽé¢ã¯æ³šç®ã«å€ããŸããæ¢è£œã®å¯Ÿè©±åã·ã§ã«ïŒREPLïŒãæäŸããŸãã REPLã䜿çšãããšãæåã«ã¿ã¹ã¯å šäœãããã°ã©ãã³ã°ããŠå®äºããªããŠããã³ãŒãã®åè¡ã®çµæããã¹ãã§ããŸãã ãããã£ãŠãæ¢è£œã®ã³ãŒããã¯ããã«é«éã«èšè¿°ããããšãã§ããããã«ãç¶æ³ããŒã¿åæãæäŸãããŸãã
ããã«ãSparkã«ã¯æ¬¡ã®äž»èŠãªæ©èœããããŸãã
- çŸåšãScalaãJavaãPythonã®APIãæäŸããä»ã®èšèªïŒRãªã©ïŒã®ãµããŒããæºåããŠããŸã
- Hadoopãšã³ã·ã¹ãã ããã³ããŒã¿ãœãŒã¹ïŒHDFSãAmazon S3ãHiveãHBaseãCassandraãªã©ïŒãšããŸãçµ±åãããŸãã
- Hadoop YARNãŸãã¯Apache Mesosãå®è¡ããŠããã¯ã©ã¹ã¿ãŒã§åäœãããªãã©ã€ã³ã§ãåäœå¯èœ
Sparkã³ã¢ã¯ãåãã¢ããªã±ãŒã·ã§ã³å ã§ã·ãŒã ã¬ã¹ã«ãããã³ã°ãã匷åãªé«ã¬ãã«ã©ã€ãã©ãªã®ã»ããã«ãã£ãŠè£å®ãããŸãã çŸåšããã®ãããªã©ã€ãã©ãªã«ã¯ãSparkSQLãSpark StreamingãMLlibïŒæ©æ¢°åŠç¿çšïŒãGraphXãå«ãŸããŠããŸãããããã®ãã¹ãŠã«ã€ããŠã¯ããã®èšäºã§è©³ãã説æããŸãã ä»ã®Sparkã©ã€ãã©ãªãšæ¡åŒµæ©èœãéçºãããŠããŸãã
ã¹ããŒã¯ã«ãŒãã«
Sparkã³ã¢ã¯ã倧èŠæš¡ãªäžŠåããã³åæ£ããŒã¿åŠçã®ã³ã¢ãšã³ãžã³ã§ãã ã«ãŒãã«ã¯ä»¥äžãæ åœããŸãã
- ã¡ã¢ãªç®¡çãšãã§ã€ã«ãªãŒããŒ
- ã¯ã©ã¹ã¿ãžã§ãã®ã¹ã±ãžã¥ãŒãªã³ã°ãé åžãããã³è¿œè·¡
- ã¹ãã¬ãŒãžã·ã¹ãã ãšã®çžäºéçšæ§
Sparkã¯RDD ïŒå ç¢ãªåæ£ããŒã¿ã»ããïŒã®æŠå¿µãå°å ¥ããŸããããã¯ã䞊è¡ããŠåŠçã§ãããªããžã§ã¯ãã®äžå€ã§ãã©ãŒã«ããã¬ã©ã³ããªåæ£ã³ã¬ã¯ã·ã§ã³ã§ãã RDDã«ã¯ãããããã¿ã€ãã®ãªããžã§ã¯ããå«ããããšãã§ããŸãã RDDã¯ãå€éšããŒã¿ã»ãããããŒãããããã¡ã€ã³ããã°ã©ã ïŒãã©ã€ããŒããã°ã©ã ïŒããã³ã¬ã¯ã·ã§ã³ãé åžããããšã«ãã£ãŠäœæãããŸãã RDDã¯2çš®é¡ã®æäœããµããŒãããŠããŸãã
- å€æã¯ãRDDã§å®è¡ãããæäœïŒãããã³ã°ããã£ã«ã¿ãªã³ã°ãçµåãªã©ïŒã§ãã å€æã®çµæã¯ããã®çµæãå«ãæ°ããRDDã§ãã
- ã¢ã¯ã·ã§ã³ã¯ãRDDã§ã®ããã€ãã®èšç®ã®çµæãšããŠååŸãããå€ãè¿ãæäœïŒããšãã°ãåæžãã«ãŠã³ããªã©ïŒã§ãã
Sparkã®å€æã¯ãé 延ãã¢ãŒãã§å®è¡ãããŸããã€ãŸããçµæã¯å€æåŸããã«ã¯èšç®ãããŸããã 代ããã«ãå®è¡ãããæäœãšãæäœãå®è¡ããããŒã¿ã»ããïŒãã¡ã€ã«ãªã©ïŒãåã«ãèšæ¶ãããŸãã å€æã¯ãã¢ã¯ã·ã§ã³ãåŒã³åºããããšãã«ã®ã¿èšç®ããããã®çµæãã¡ã€ã³ããã°ã©ã ã«è¿ãããŸãã ãã®èšèšã®ãããã§ãSparkã®ããã©ãŒãã³ã¹ãåäžããŠããŸãã ããšãã°ã倧ããªãã¡ã€ã«ãããŸããŸãªæ¹æ³ã§å€æãããŠæåã®ã¢ã¯ã·ã§ã³ã«è»¢éãããå ŽåãSparkã¯æåã®è¡ã®ã¿ãåŠçããŠçµæãè¿ãããã®æ¹æ³ã§ã¯ãã¡ã€ã«å šäœãåŠçããŸããã
ããã©ã«ãã§ã¯ãå€æãããåRDDã¯ãæ°ããã¢ã¯ã·ã§ã³ãå®è¡ãããã³ã«åè©äŸ¡ã§ããŸãã ãã ããRDDã¯ãã¹ãã¬ãŒãžãŸãã¯ãã£ãã·ã³ã°æ¹åŒã䜿çšããŠã¡ã¢ãªã«é·æéä¿åããããšãã§ããŸãã ãã®å ŽåãSparkã¯å¿ èŠãªèŠçŽ ãã¯ã©ã¹ã¿ãŒäžã«ä¿æããããé«éã«èŠæ±ã§ããããã«ãªããŸãã
ãããã
SparkSQLã¯ãSQLãŸãã¯Hive Query Languageã䜿çšããããŒã¿ã®ã¯ãšãªããµããŒãããSparkã³ã³ããŒãã³ãã§ãã ã©ã€ãã©ãªã¯ãïŒMapReduceã®ä»£ããã«ïŒSparkã®äžã§äœæ¥ããããã®Apache HiveããŒããšããŠç»å ŽããçŸåšã¯ãã§ã«Sparkã¹ã¿ãã¯ãšçµ±åãããŠããŸãã ããŸããŸãªããŒã¿ãœãŒã¹ããµããŒãããã ãã§ãªããSQLã¯ãšãªãã³ãŒãå€æã«ãã€ã³ãããããšãã§ããŸãã éåžžã«åŒ·åãªããŒã«ã§ããããšãããããŸããã 以äžã¯ãHiveäºæãªã¯ãšã¹ãã®äŸã§ãã
// sc â SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
ã¹ããŒã¯ã¹ããªãŒãã³ã°
Spark Streamingã¯ãªã¢ã«ã¿ã€ã ã®ã¹ããªãŒãã³ã°åŠçããµããŒãããŠããŸãã ãã®ãããªããŒã¿ã¯ã皌åäžã®WebãµãŒããŒã®ãã°ãã¡ã€ã«ïŒApache FlumeãHDFS / S3ãªã©ïŒããœãŒã·ã£ã«ãããã¯ãŒã¯ïŒTwitterãªã©ïŒã®æ å ±ãããã³Kafkaãªã©ã®ããŸããŸãªã¡ãã»ãŒãžãã¥ãŒã§ãã ãå éšãSpark Streamingã¯å ¥åããŒã¿ã¹ããªãŒã ãåä¿¡ããããŒã¿ããã±ããã«åå²ããŸãã 次ã«ããããã¯Sparkãšã³ãžã³ã«ãã£ãŠåŠçããããã®åŸãæçµçãªããŒã¿ã¹ããªãŒã ãïŒä»¥äžã®ããã«ããã圢åŒã§ïŒçæãããŸãã
Spark Streaming APIã¯Spark Core APIãšå®å šã«äžèŽãããããããã°ã©ãã¯ãããããŒã¿ãšã¹ããªãŒãã³ã°ããŒã¿ã®äž¡æ¹ãç°¡åã«æäœã§ããŸãã
MLlib
MLlibã¯ãåé¡ãååž°ãã¯ã©ã¹ã¿ãªã³ã°ãã³ãã£ã«ã¿ãªã³ã°ãªã©ã®ããã«ã¯ã©ã¹ã¿ãŒäžã§æ°Žå¹³ã«ã¹ã±ãŒãªã³ã°ããããã«èšèšãããããŸããŸãªã¢ã«ãŽãªãºã ãæäŸããæ©æ¢°åŠç¿ã©ã€ãã©ãªã§ãã ãããã®ã¢ã«ãŽãªãºã ã®äžéšã¯ãã¹ããªãŒãã³ã°ããŒã¿ã§ãæ©èœããŸããããšãã°ãéåžžã®æå°äºä¹æ³ã䜿çšããç·åœ¢ååž°ãk-meansæ³ã䜿çšããã¯ã©ã¹ã¿ãªã³ã°ïŒãªã¹ãã¯éããªãæ¡å€§ããŸãïŒã Apache Mahout ïŒHadoopã®æ©æ¢°åŠç¿ã©ã€ãã©ãªïŒã¯æ¢ã«MapReduceãé¢ããŠãããçŸåšã¯Spark MLlibãšé£æºããŠéçºãããŠããŸãã
Graphx
GraphXã¯ãã°ã©ããæäœããã°ã©ãã§äžŠåæäœãå®è¡ããããã®ã©ã€ãã©ãªã§ãã ãã®ã©ã€ãã©ãªã¯ãETLãç 究åæãã°ã©ãããŒã¹ã®å埩èšç®ã®ããã®æ±çšããŒã«ãæäŸããŸãã ã°ã©ããæäœããããã®çµã¿èŸŒã¿æäœã«å ããŠãPageRankãªã©ã®ã°ã©ããæäœããããã®åŸæ¥ã®ã¢ã«ãŽãªãºã ã®ã©ã€ãã©ãªãæäŸããŸãã
Apache Sparkã®äœ¿çšæ¹æ³ïŒã€ãã³ãæ€åºã®äŸ
Apache Sparkãäœã§ããããããã£ãã®ã§ãã©ã®ã¿ã¹ã¯ãšåé¡ãæãå¹æçã«è§£æ±ºãããããèããŠã¿ãŸãããã
æè¿ãTwitterã®ãããŒãåæããããšã§ãå°éã®ç»é²å®éšã«é¢ããèšäºãèŠã€ããŸãã ã ã¡ãªã¿ã«ããã®èšäºã§ã¯ããã®æ¹æ³ã䜿çšãããšãæ°è±¡åºã®ã¬ããŒããããè¿ éã«å°éã«ã€ããŠåŠã¶ããšãã§ããããšã瀺ãããŸããã ãã®èšäºã§èª¬æãããŠãããã¯ãããžãŒã¯Sparkãšã¯äŒŒãŠããŸãããããã®äŸã¯Sparkã®ã³ã³ããã¹ãã§æ£ç¢ºã«èå³æ·±ãããã«èŠããŸããã€ãŸããã°ã«ãŒã³ãŒããªãã§åçŽåãããã³ãŒããã©ã°ã¡ã³ããæäœããæ¹æ³ã瀺ããŠããŸãã
ãŸããç§ãã¡ã«é¢ä¿ããããšæããããã€ãŒããé€å€ããå¿ èŠããããŸããããšãã°ããå°éãããè¡æãã«èšåããå Žåã§ãã ããã¯ã次ã®ããã«Spark Streamingã§ç°¡åã«å®è¡ã§ããŸãã
TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
次ã«ããã€ãŒãã®ã»ãã³ãã£ãã¯åæãè¡ã£ãŠã圌ãã話ãããã·ã¥ãé¢é£ããŠãããã©ãããå€æããå¿ èŠããããŸãã ãå°éïŒãããä»éããŠããããªã©ã®ãã€ãŒãã¯è¯å®çãªçµæãšèŠãªããããå°éäŒè°ã§ç§ããŸãã¯ãæšæ¥ã¯ã²ã©ãéããŠããŸããã-åŠå®çãªçµæãšèŠãªãããŸãã ãã®èšäºã®èè ã¯ããã®ç®çã®ããã«ãµããŒããã¯ã¿ãŒã¡ãœããïŒSVMïŒã䜿çšããŸããã åæ§ã«ã ã¹ããªãŒãã³ã°ããŒãžã§ã³ãå®è£ ããŸã ã MLlibããçæããããµã³ãã«ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
// , , LIBSVM val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // (60%) (40%). val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) // , val numIterations = 100 val model = SVMWithSGD.train(training, numIterations) // , model.clearThreshold() // val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label) } // val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() println("Area under ROC = " + auROC)
ãã®ã¢ãã«ã®æ£ããäºæž¬ã®å²åãç§ãã¡ã«åã£ãŠããã°ã次ã®æ®µéã«é²ãããšãã§ããŸãïŒæ€åºãããå°éã«å¯Ÿå¿ããã ãããè¡ãã«ã¯ãäžå®ã®æéå ã«åä¿¡ããäžå®ã®æ°ïŒå¯åºŠïŒã®è¯å®çãªãã€ãŒããå¿ èŠã§ãïŒèšäºãåç §ïŒã 泚ïŒãã€ãŒãã«ãžãªãã±ãŒã·ã§ã³æ å ±ãå«ãŸããŠããå Žåãå°éã®åº§æšãç¹å®ã§ããŸãã ãã®ç¥èãããã°ãSparkSQLã䜿çšããŠæ¢åã®HiveããŒãã«ïŒå°ééç¥ãåä¿¡ããããŠãŒã¶ãŒã«é¢ããããŒã¿ãä¿åïŒã«ã¯ãšãªããã¡ãŒã«ã¢ãã¬ã¹ãæœåºããŠãããŒãœãã©ã€ãºãããã¢ã©ãŒããéä¿¡ã§ããŸãã
// sc â SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail â sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
Apache Sparkã®ä»ã®çšé
ãã¡ãããSparkã®æœåšçãªç¯å²ã¯ãå°éåŠã ãã«ãšã©ãŸããŸããã
以äžã¯ãããã°ããŒã¿ã®é«éã§å€æ§ãã€å€§éã®åŠçãå¿ èŠãªãSparkãéåžžã«é©ããŠããä»ã®å®çšçãªç¶æ³ã®ææšïŒã€ãŸãã決ããŠç¶²çŸ çã§ã¯ãªãïŒã®éžæã§ãã
ã²ãŒã æ¥çïŒãªã¢ã«ã¿ã€ã ã§å å®ãªã¹ããªãŒã ã«å°çããã²ãŒã ã€ãã³ããèšè¿°ãããã¿ãŒã³ã®åŠçãšæ€åºã ãã®çµæããã¬ãŒã€ãŒã®ä¿æãã¿ãŒã²ãããçµã£ãåºåãé£æ床ã®èªåä¿®æ£ãªã©ã䜿çšããŠãããã«ãããã«å¯Ÿå¿ããååãªãéã皌ãããšãã§ããŸãã
eã³ããŒã¹ã§ã¯ã ALSã®å Žåã®ããã«ããªã¢ã«ã¿ã€ã ã®ãã©ã³ã¶ã¯ã·ã§ã³æ å ±ãã¹ããªãŒãã³ã°ã¯ã©ã¹ã¿ãªã³ã°ã¢ã«ãŽãªãºã ã«è»¢éã§ããŸãã ãã®åŸãçµæãä»ã®éæ§é åããŒã¿ãœãŒã¹ããã®æ å ±ïŒããšãã°ã顧客ã¬ãã¥ãŒãã¬ãã¥ãŒãªã©ïŒãšçµã¿åãããããšãã§ããŸãã åŸã ã«ããã®æ å ±ã䜿çšããŠãæ°ããåŸåãèæ ®ããŠãæšå¥šäºé ãæ¹åã§ããŸãã
éèã»ã¯ã¿ãŒãã»ãã¥ãªãã£ã®ç®çã§ãSparkã¹ã¿ãã¯ã䜿çšããŠãäžæ£ãäŸµå ¥ãæ€åºãããããªã¹ã¯åæã«åºã¥ããèªèšŒãè¡ã£ããã§ããŸãã ãããã£ãŠãèšå€§ãªéã®ã¢ãŒã«ã€ããã°ãåéããããããå€éšããŒã¿ãœãŒã¹ãããšãã°ããŒã¿ãªãŒã¯ããããã³ã°ãããã¢ã«ãŠã³ãïŒ https://haveibeenpwned.com/ãåç §ïŒãªã©ã®æ å ±ãšçµã¿åãããããšã§ãæé«ã®çµæãåŸãããšãã§ããŸããæ¥ç¶/èŠæ±ã«é¢ããæ å ±ãããšãã°ãIPã«ããå°çäœçœ®æ å ±ãæéããŒã¿ã«çŠç¹ãåãããŸãã
ãããã«
ãã®ãããSparkã¯ãæ§é åããã³éæ§é åã®äž¡æ¹ã§ã倧éã®ããŒã¿ïŒãªã¢ã«ã¿ã€ã ãšã¢ãŒã«ã€ãã®äž¡æ¹ïŒãåŠçãã倧ããªèšç®è² è·ã«é¢é£ããéèŠãªã¿ã¹ã¯ãç°¡çŽ åããã®ã«åœ¹ç«ã¡ãŸãã Sparkã¯ãæ©æ¢°åŠç¿ãã°ã©ãäœæã¢ã«ãŽãªãºã ãªã©ã®è€éãªæ©èœã®ã·ãŒã ã¬ã¹ãªçµ±åãæäŸããŸãã Sparkã¯ãããã°ããŒã¿åŠçã倧è¡ã«éã³ãŸãã ãããè©ŠããŠãã ãã-ããªãã¯ãããåŸæããŸããïŒ