ããŒã¿åæã«é©åãªããŒã«ãéžæããããšã¯éåžžã«éèŠã§ãã åœéçãªããŒã¿ãµã€ãšã³ã¹ã³ã³ããã£ã·ã§ã³ãéå¬ããKaggle.comãã©ãŒã©ã ã§ã¯ãã©ã®ããŒã«ãæé©ããããå°ããããŸãã 人æ°ã®æåã®è¡ã¯ãRãšPythonãå ããŠããŸãã ãã®èšäºã§ã¯ãScalaããã°ã©ãã³ã°èšèªãšSparkåæ£ã³ã³ãã¥ãŒãã£ã³ã°ãã©ãããã©ãŒã ã«åºã¥ããããŒã¿åææè¡ã®ä»£æ¿ã¹ã¿ãã¯ã«ã€ããŠèª¬æããŸãã
ã©ããã£ãŠããã«æ¥ãã®ïŒ Retail Rocketã§ã¯ãéåžžã«å€§ããªããŒã¿ã»ããã§å€ãã®æ©æ¢°åŠç¿ãè¡ã£ãŠããŸãã 以åã¯ããã³ãã«ã®IPython + Pyhs2ïŒPythonã®ãã€ããã©ã€ãïŒ+ Pandas + Sklearnã䜿çšããŠãããã¿ã€ããéçºããŸããã 2014幎å€ã®çµããã«ãåããµãŒããŒããŒã¯ã§çç£æ§ã3ã4ååäžããããšãå®éšã«ãã瀺ããããããSparkã«åãæ¿ãããšããåºæ¬çãªæ±ºå®ãè¡ããŸããã
å¥ã®ãã©ã¹-1ã€ã®ããã°ã©ãã³ã°èšèªã䜿çšããŠãããã«ãµãŒããŒã§åäœããã¢ããªã³ã°ãšã³ãŒããäœæã§ããŸãã ããã¯ç§ãã¡ã«ãšã£ãŠå€§ããªå©ç¹ã§ããã以åã¯ãHiveãPigãJavaãPythonã®4ã€ã®èšèªãåæã«äœ¿çšããŠããŸãããå°ããªããŒã ã§ã¯ããã¯æ·±å»ãªåé¡ã§ãã
Sparkã¯ãAPIãä»ããPython / Scala / Javaãšã®é£æºããµããŒãããŸãã Scalaã«æžãããŠããã®ã¯Sparkã§ãããããScalaãéžæããããšã«ããŸãããã€ãŸãããœãŒã¹ã³ãŒããåæããå¿ èŠã«å¿ããŠãšã©ãŒãä¿®æ£ã§ããããã«Hadoopå šäœãå®è¡ãããJVMã§ãã Sparkããã°ã©ãã³ã°èšèªãã©ãŒã©ã ã®åæã«ããã次ã®ããšãããããŸããã
ScalaïŒ
+æ©èœ;
+ Sparkãã€ãã£ã;
+ JVMã§åäœããŸããã€ãŸããHadoopãã€ãã£ãã§ãã
+å³å¯ãªéçåä»ã;
-ããªãè€éãªãšã³ããªã§ãããã³ãŒãã¯èªã¿åãå¯èœã§ãã
PythonïŒ
+人æ°;
+ã·ã³ãã«;
-åçåä»ã;
-ããã©ãŒãã³ã¹ã¯Scalaãããæªãã§ãã
JavaïŒ
+人æ°;
+ Hadoopã«ãã€ãã£ãã
-ã³ãŒããå€ãããã
Sparkã®ããã°ã©ãã³ã°èšèªã®éžæã«ã€ããŠè©³ããã¯ã ãã¡ããã芧ãã ãã ã
ãã®æç¹ã§ããŒã ã®èª°ãScalaãç¥ããªãã£ããããéžæã¯å®¹æã§ã¯ãªãã£ããšèšããããåŸãŸããã
æ¢ç¥ã®äºå®ïŒèšèªã§ããŸãã³ãã¥ãã±ãŒã·ã§ã³ããšãããšãåŠã¶ã«ã¯ãèšèªç°å¢ã«æ²¡é ããã§ããã ãé »ç¹ã«äœ¿çšããå¿ èŠããããŸãã ãããã£ãŠãã¢ããªã³ã°ãšè¿ éãªããŒã¿åæã®ããã«ãScalaãåªå ããŠPythonã¹ã¿ãã¯ãæŸæ£ããŸããã
ãŸããIPythonã®ä»£æ¿åãèŠã€ããå¿ èŠããããŸããããªãã·ã§ã³ã¯æ¬¡ã®ãšããã§ããã
1ïŒ Zeppelin -Sparkçšã®IPythonã®ãããªããŒãããã¯ã
2ïŒISpark;
3ïŒSpark Notebook;
4ïŒ IBMã®Spark IPython Notebook ã
ISparkã®éžæã¯åçŽã§ãããScala / Sparkã®IPythonã§ãããããHighChartsãšRã°ã©ããæ¯èŒçç°¡åã«åºå®ã§ããYarnã¯ã©ã¹ã¿ãŒã«åé¡ãªãæ¥ç¶ã§ããŸããã
ScalaããŒã¿ãã€ãã³ã°ç°å¢ã«é¢ããã¹ããŒãªãŒã¯ã3ã€ã®éšåã§æ§æãããŠããŸãã
1ïŒISparkã®Scalaã§ã®ç°¡åãªã¿ã¹ã¯ãSparkã§ããŒã«ã«ã«å®è¡ãããŸãã
2ïŒISparkã§åäœããããã«ã³ã³ããŒãã³ããæ§æããã³ã€ã³ã¹ããŒã«ããŸãã
3ïŒã©ã€ãã©ãªRã䜿çšããŠãScalaã§æ©æ¢°åŠç¿ã¿ã¹ã¯ãäœæããŸãã
ãããŠããã®èšäºã人æ°ãããå Žåã¯ãä»ã«2ã€æžããŠãããŸãã ;ïŒ
ææŠãã
質åã«çããŠã¿ãŸãããïŒãªã³ã©ã€ã³ã¹ãã¢ã§ã®å¹³åè³Œå ¥é åæžã¯ãå°åããã©ãŠã¶ã®çš®é¡ïŒã¢ãã€ã«/ãã¹ã¯ãããïŒããªãã¬ãŒãã£ã³ã°ã·ã¹ãã ããã©ãŠã¶ã®ããŒãžã§ã³ãªã©ãã¯ã©ã€ã¢ã³ãã®éçãã©ã¡ãŒã¿ã«äŸåããŸããïŒ ããã¯ã ãçžäºæ å ±ã ïŒçžäºæ å ±ïŒã䜿çšããŠå®è¡ã§ããŸãã
Retail Rocketã§ã¯ ãå€å žçãªã·ã£ãã³ã®å ¬åŒãKullback-Leiblerã®çºæ£ãçžäºæ å ±ãªã©ãå€ãã®å Žæã§æšå¥šã¢ã«ãŽãªãºã ãšåæã«ãšã³ããããŒã䜿çšããŠããŸãã ãã®ãããã¯ã«é¢ããRecSysã«ã³ãã¡ã¬ã³ã¹ã§ã¬ããŒããç³è«ããŸããã ãããã®å¯Ÿçã¯ãããŒãã£ãŒã®æåãªæ©æ¢°åŠç¿ã®æç§æžã®å°ããªã»ã¯ã·ã§ã³ã§ã¯ãããŸãããåå¥ã®ã»ã¯ã·ã§ã³ã«åœãŠãããŠããŸãã
Retail Rocketã®å®éã®ããŒã¿ãåæããŠã¿ãŸãããã 以åã¯ããµã³ãã«ãã¯ã©ã¹ã¿ãŒããã³ã³ãã¥ãŒã¿ãŒã«csvãã¡ã€ã«ãšããŠã³ããŒããŸããã
ããŒã¿ã®èªã¿èŸŒã¿
ããã§ã¯ãããŒã«ã«ã¢ãŒãã§èµ·åãããISparkãšSparkã䜿çšããŸããã€ãŸãããã¹ãŠã®èšç®ã¯ããŒã«ã«ã§è¡ãããååžã¯ã³ã¢ãééããŸãã å®éããã¹ãŠã¯ã³ã¡ã³ãã«æžãããŠããŸãã æãéèŠãªããšã¯ãåºåã§RDDïŒSparkããŒã¿æ§é ïŒãååŸããããšã§ããRDDã¯ãã³ãŒãã§å®çŸ©ãããŠããRowåã®ã±ãŒã¹ã¯ã©ã¹ã®ã³ã¬ã¯ã·ã§ã³ã§ãã ããã«ããã_ãcategoryIdãªã©ã®ããããä»ããŠãã£ãŒã«ãã«ã¢ã¯ã»ã¹ã§ããŸãã
å ¥ãå£ã§ïŒ
import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.tribbloid.ispark.display.dsl._ import scala.util.Try val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ // CASE class, dataframe case class Row(categoryId: Long, orderId: String ,cityId: String, osName: String, osFamily: String, uaType: String, uaName: String,aov: Double) // val sc (Spark Context), Ipython val aov = sc.textFile("file:///Users/rzykov/Downloads/AOVC.csv") // val dataAov = aov.flatMap { line => Try { line.split(",") match { case Array(categoryId, orderId, cityId, osName, osFamily, uaType, uaName, aov) => Row(categoryId.toLong + 100, orderId, cityId, osName, osFamily, osFamily, uaType, aov.toDouble) } }.toOption }
åºåïŒ
MapPartitionsRDD[4] at map at <console>:28
ããã§ã¯ãããŒã¿èªäœãèŠãŠã¿ãŸãããã
ãã®è¡ã¯ãããŒãžã§ã³1.3.0ã§Sparkã«è¿œå ãããæ°ããDataFrameããŒã¿åã䜿çšããŠããŸã;ããã¯ãPythonã®pandasã©ã€ãã©ãªã®é¡äŒŒããæ§é ã«éåžžã«äŒŒãŠããŸãã toDfã¯Rowã±ãŒã¹ã¯ã©ã¹ãååŸããŸããããã«ããããã£ãŒã«ãã®ååãšã¿ã€ããååŸãããŸãã
ããã«åæããã«ã¯ãä»»æã®1ã€ã®ã«ããŽãªãéžæããå¿ èŠããããŸããã§ããã°å€ãã®ããŒã¿ã䜿çšããŠãã ããã ãããè¡ãã«ã¯ãæã人æ°ã®ããã«ããŽãªã®ãªã¹ããååŸããå¿ èŠããããŸãã
å ¥ãå£ã§ïŒ
// dataAov.map { x => x.categoryId } // categoryId .countByValue() // categoryId .toSeq .sortBy( - _._2) // .take(10) // 10
åºåã§ã¯ã圢åŒïŒcategoryIdãfrequencyïŒã®ã¿ãã«ïŒã¿ãã«ïŒã®é åãååŸããŸããã
ArrayBuffer((314,3068), (132,2229), (128,1770), (270,1483), (139,1379), (107,1366), (177,1311), (226,1268), (103,1259), (127,1204))
ããã«äœæ¥ãé²ããããã«ã128çªç®ã®ã«ããŽãªãŒãéžæããããšã«ããŸããã
ããŒã¿ãæºåããŸããã°ã©ãã£ãã¯ã¹ããŽãã§è©°ãŸããªãããã«ãå¿ èŠãªçš®é¡ã®ãªãã¬ãŒãã£ã³ã°ã·ã¹ãã ãé€å€ããŸãã
å ¥ãå£ã§ïŒ
val interestedBrowsers = List("Android", "OS X", "iOS", "Linux", "Windows") val osAov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) // .filter(_.categoryId == 128) // .map(x => (x.osFamily, (x.aov, 1.0))) // .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) .map{ case(osFamily, (revenue, orders)) => (osFamily, revenue/orders) } .collect()
åºåã¯ãOS圢åŒã®ã¿ãã«ã®é åãå¹³åãã§ãã¯ã§ãïŒ
Array((OS X,4859.827586206897), (Linux,3730.4347826086955), (iOS,3964.6153846153848), (Android,3670.8474576271187), (Windows,3261.030993042378))
èŠèŠåãå¿ èŠãªå Žåã¯ãHighChartsã§è¡ããŸãããã
çè«çã«ã¯ã Wispã§ãµããŒããããŠããHighChartsã°ã©ãã£ãã¯ã䜿çšã§ããŸãã ãã¹ãŠã®ã°ã©ãã£ãã¯ã¯ã€ã³ã¿ã©ã¯ãã£ãã§ãã
åãããšãè©ŠããŠã¿ãŸãããããã ããRã䜿çšããŸãã
Rã¯ã©ã€ã¢ã³ããéå§ããŸãã
import org.ddahl.rscala._ import ru.retailrocket.ispark._ def connect() = RClient("R", false) @transient val r = connect()
ãã£ãŒãèªäœãäœæããŸãã
ãã®ãããIPythonã¡ã¢åž³ã§ä»»æã®Rã°ã©ãã£ãã¯ãäœæã§ããŸãã
çžäºæ å ±
ã°ã©ãã¯äŸåé¢ä¿ãããããšã瀺ããŠããŸãããã¡ããªãã¯ã¯ãã®çµè«ã確èªããŸããïŒ ãããè¡ãã«ã¯å€ãã®æ¹æ³ããããŸãã ãã®å ŽåãããŒãã«å ã®å€ã®éã§çžäºæ å ±ïŒ çžäºæ å ± ïŒã䜿çšããŸãã 2ã€ã®ã©ã³ãã ïŒé¢æ£ïŒéã®ååžéã®çžäºäŸåæ§ã枬å®ããŸãã
é¢æ£ååžã®å Žåã次ã®åŒã§èšç®ãããŸãã
ããããããå®çšçãªã¡ããªãã¯ã§ããæ倧æ å ±ä¿æ° ïŒMICïŒã«èå³ããããŸããããã¯ãé£ç¶å€æ°ã®ããªãã¯ãèšç®ããããã®èšç®ã§ãã ããã¯ããã®ãã©ã¡ãŒã¿ãŒã®å®çŸ©ãã©ã®ããã«èããããã§ãã
D =ïŒxãyïŒãã©ã³ãã å€æ°Xããã³Yã®èŠçŽ ã®nã®é åºä»ããã¢ã®ã»ãããšããŸãããã®2次å 空éã¯Xããã³Yã°ãªããã§åå²ãããããããããŒãã£ã·ã§ã³ã®Xããã³Yã§xããã³yã®å€ãã°ã«ãŒãåããŸãïŒãã¹ãã°ã©ã ãæãåºããŠãã ããïŒïŒã
ããã§ãBïŒnïŒã¯ã°ãªããã®ãµã€ãºãI âïŒDãXãYïŒã¯XãšYã®ããŒãã£ã·ã§ã³ã«é¢ããçžäºæ å ±ã§ããåæ¯ã¯å¯Ÿæ°ã瀺ããMICãéé[0ã1]ã®å€ã«æ£èŠåããã®ã«åœ¹ç«ã¡ãŸãã MICã¯ãéé[0,1]ã§é£ç¶å€ãåããŸãã極端ãªå€ã®å ŽåãäŸåé¢ä¿ãããå Žåã¯1ãããã§ãªãå Žåã¯0ã§ãã ãã®ãããã¯ã§ä»ã«èªãããšãã§ãããã®ã¯ãèšäºã®æåŸã®åç §ãªã¹ãã«ãªã¹ããããŠããŸãã
æ¬ã§ã¯ã MIC ïŒçžäºæ å ±ïŒã¯21äžçŽçžé¢ãšåŒã°ããŠããŸãã ãããŠãããã«çç±ããããŸãïŒ ä»¥äžã®ã°ã©ãã¯ã6ã€ã®äŸåé¢ä¿ã瀺ããŠããŸãïŒã°ã©ãC-HïŒã ãããã«ã€ããŠã¯ããã¢ãœã³ãšMICã®çžé¢ãèšç®ãããå·ŠåŽã®ã°ã©ãã§å¯Ÿå¿ããæåã§ããŒã¯ãããŠããŸãã ã芧ã®ãšããããã¢ãœã³ã®çžé¢é¢ä¿ã¯ã»ãŒãŒãã§ãããMICã¯äŸåé¢ä¿ã瀺ããŠããŸãïŒã°ã©ãFãGãEïŒã
ãœãŒã¹ïŒ people.cs.ubc.ca
以äžã®è¡šã¯ãã©ã³ãã ãç·åœ¢ããã¥ãŒããã¯ãªã©ã®ç°ãªãäŸåé¢ä¿ã§èšç®ãããå€ãã®ã¡ããªãã¯ã瀺ããŠããŸãã ãã®è¡šã¯ãMICãéåžžã«è¯å¥œã«åäœããéç·åœ¢ã®äŸåé¢ä¿ãæ€åºããããšã瀺ããŠããŸãã
å¥ã®èå³æ·±ãã°ã©ãã¯ãMICã«å¯Ÿãããã€ãºã®åœ±é¿ã瀺ããŠããŸãã
ç§ãã¡ã®å ŽåãAovå€æ°ãé£ç¶çã§ãããä»ã®ãã¹ãŠãé åºä»ããããŠããªãå€ïŒãã©ãŠã¶ãŒã®ã¿ã€ããªã©ïŒã§é¢æ£ããŠããå ŽåãMICã®èšç®ãåŠçããŠããŸãã MICãæ£ããèšç®ããã«ã¯ãAovããµã³ããªã³ã°ããå¿ èŠããããŸãã exploredata.netã®æ¢è£œã®ãœãªã¥ãŒã·ã§ã³ã䜿çšããŸãã ãã®ãœãªã¥ãŒã·ã§ã³ã«ã¯1ã€ã®åé¡ããããŸããäž¡æ¹ã®å€æ°ãé£ç¶ã§ãããFloatå€ã§è¡šçŸããããšèŠãªãããŸãã ãããã£ãŠãé¢æ£éã®å€ãFloatã§ãšã³ã³ãŒããããããã®éã®é åºãã©ã³ãã ã«å€æŽããããšã«ãããã³ãŒãã欺ãå¿ èŠããããŸãã ãããè¡ãã«ã¯ãã©ã³ãã ãªé åºïŒ100ãå®è¡ïŒã§å€æ°ã®å埩ãå®è¡ããå¿ èŠãããããã®çµæãæ倧ã®MICå€ãååŸããŸãã
import data.VarPairData import mine.core.MineParameters import analysis.Analysis import analysis.results.BriefResult import scala.util.Random // , "" def encode(col: Array[String]): Array[Double] = { val ns = scala.util.Random.shuffle(1 to col.toSet.size) val encMap = col.toSet.zip(ns).toMap col.map{encMap(_).toDouble} } // MIC def mic(x: Array[Double], y: Array[Double]) = { val data = new VarPairData(x.map(_.toFloat), y.map(_.toFloat)) val params = new MineParameters(0.6.toFloat, 15, 0, null) val res = Analysis.getResult(classOf[BriefResult], data, params) res.getMIC } // def micMax(x: Array[Double], y: Array[Double], n: Int = 100) = (for{ i <- 1 to 100} yield mic(x, y)).max
ããŠãç§ãã¡ã¯æçµæ®µéã«è¿ã¥ããŠããŸããä»åºŠã¯èšç®èªäœãå®è¡ããŸãã
val aov = dataAov.filter(x => interestedBrowsers.contains(x.osFamily)) // .filter(_.categoryId == 128) // //osFamily var aovMic = aov.map(x => (x.osFamily, x.aov)).collect() println("osFamily MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2))) //orderId aovMic = aov.map(x => (x.orderId, x.aov)).collect() println("orderId MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2))) //cityId aovMic = aov.map(x => (x.cityId, x.aov)).collect() println("cityId MIC =" + micMax(encode(aovMic.map(_._1)), aovMic.map(_._2))) //uaName aovMic = aov.map(x => (x.uaName, x.aov)).collect() println("uaName MIC =" + mic(encode(aovMic.map(_._1)), aovMic.map(_._2))) //aov println("aov MIC =" + micMax(aovMic.map(_._2), aovMic.map(_._2))) //random println("random MIC =" + mic(aovMic.map(_ => math.random*100.0), aovMic.map(_._2)))
åºåïŒ
osFamily MIC =0.06658 orderId MIC =0.10074 cityId MIC =0.07281 aov MIC =0.99999 uaName MIC =0.05297 random MIC =0.10599
å®éšã®ããã«ãäžæ§ååžã®ã©ã³ãã å€æ°ãšAOVèªäœãè¿œå ããŸããã
ã芧ã®ãšãããã»ãŒãã¹ãŠã®MICãã©ã³ãã ãªå€ïŒã©ã³ãã MICïŒãäžåã£ãŠããããšãå€æããŸãããããã¯ãæ¡ä»¶ä»ãã決å®ãããå€ãšèŠãªãããšãã§ããŸãã Aov MICã¯ããèªäœãšã®çžé¢ã1ã«çãããããèªç¶ã«ã»ãšãã©çµ±äžãããŸãã
èå³æ·±ã質åãçºçããŸãããªãã°ã©ããžã®äŸåæ§ããããMICããŒãã§ããã®ã§ããããã å€ãã®ä»®èª¬ãæãä»ãããšãã§ããŸãããããããosãã¡ããªãŒã®ã±ãŒã¹ã§ã¯ããã¹ãŠãéåžžã«åçŽã§ã-Windowsãã·ã³ã®æ°ã¯ä»ãã¯ããã«äžåã£ãŠããŸãã
ãããã«
ScalaãããŒã¿ãµã€ãšã³ãã£ã¹ãã®äººæ°ãç²åŸããããšãé¡ã£ãŠããŸãã ããã¯éåžžã«äŸ¿å©ã§ããæšæºã®IPythonããŒãããã¯ã§äœæ¥ããŠãSparkã®ãã¹ãŠã®æ©èœãååŸã§ããããã§ãã ãã®ã³ãŒãã¯ãã©ãã€ãã®ããŒã¿é åã§ç°¡åã«æ©èœããŸãããã®ãããã¯ã©ã¹ã¿ãŒã®URIãæå®ããŠãISparkã®æ§æè¡ãå€æŽããã ãã§ãã
ãšããã§ããã®ãšãªã¢ã«ã¯ç©ºåžããããŸãã
䟿å©ãªãªã³ã¯ïŒ
MICã®éçºã«åºã¥ããç§åŠèšäº ã
çžäºæ å ±ã«é¢ããKDnuggetsã«é¢ããã¡ã¢ ïŒãããªããããŸãïŒã
Pythonããã³MATLAB / OCTAVEã®ã©ãããŒã䜿çšããŠMICãèšç®ããããã®Cã©ã€ãã©ãª ã
MICãéçºããç§åŠè«æã®èè ã® ãµã€ã ïŒ ãã®ãµã€ãã«ã¯Rã®ã¢ãžã¥ãŒã«ãšJavaã®ã©ã€ãã©ãªããããŸãïŒã