ãã®èšäºã§ã¯ãApache Hadoopã§ã®ããŒã¿åŠçããã»ã¹ã説æãããã¬ãŒã ã¯ãŒã¯ã§ããTwitter Scaldingã«ã€ããŠèª¬æããŸãã Hadoopã«å ããŠããã¬ãŒã ã¯ãŒã¯ã®æŽå²ãšãšãã«ãç§ã¯é ãããå§ããŸãã 次ã«ãã¹ã±ãŒãªã³ã°æ©èœã®æŠèŠã説æããŸãã çµè«ãšããŠãJavaãç¥ã£ãŠãããScalaã«ã»ãšãã©ç²ŸéããŠããªã人ã«ã¯ç解ã§ããã³ãŒãäŸã瀺ããŸãã
é¢çœãïŒ è¡ããïŒ
MapReduceããã·ã³ãã«
MapReduceãã©ãã€ã ãç»å Žããã°ããã®ãšããåæ£ã³ã³ãã¥ãŒãã£ã³ã°ã®éçºãç°¡çŽ åããç»æçãªã¹ãããã§ããã ãã ããããããŒãšãªãã¥ãŒãµãŒãæåã§èšè¿°ããã®ã¯éåžžã«é¢åã§ããããšãããã«ããããŸããã éçºãã¹ããŒãã¢ããããããã«ãMap / Reduceã®é«ã¬ãã«ã¢ããªã³ïŒPigãHiveãCascadingããã®ä»ïŒãç»å ŽããŸããã åŸè ã«ã€ããŠèª¬æããŸãããã
ã«ã¹ã±ãŒãã¯ãããŒã¿åŠçããã»ã¹ãèšè¿°ããããã®Javaãã¬ãŒã ã¯ãŒã¯ããããã ã¯ãŒã¯ãããŒã 説æã®åŸãCascadingã¯DBMSã®ã¯ãšãªã¢ãã©ã€ã¶ãŒã®ãããªã¯ãŒã¯ãããŒãåæããäžé£ã®ããã/ãªãã¥ãŒã¹ã¿ã¹ã¯ã®åœ¢åŒã§å®è¡èšç»ãæ§ç¯ããããããHadoopã¯ã©ã¹ã¿ãŒã«éä¿¡ããèµ·åããŒã¿ãšäžéããŒã¿ãåå¥ã«ç®¡çããŸãã æ®å¿µãªãããã«ã¹ã±ãŒãã¯ããªãäœã¬ãã«ã®æœè±¡åã§åäœããããã人æ°ã®ç¹ã§ã¯ãä»ã®ããŒã¿åŠçã¡ã«ããºã ã«é·ãé倱ãããŠããŸãã
ãã®ç¶æ³ããæãåºãè¯ãæ¹æ³ãèŠã€ãããŸããã Twitterã¯ãCascadingãããŒãºã«åãããŠèª¿æŽãããã®æœè±¡åãåŸæ¥ã®ScalaããŒã«ã§ã©ããããŸããã ãã®ãããScaldingãèªçããŸãã-ã«ã¹ã±ãŒãã®äžã«Scalaãã¬ãŒã ã¯ãŒã¯ã ããã§äœè«ãããŠãScalaã«ã€ããŠè©±ãããšãã§ããŸãã
Scalaã®æè©
Scalaã¯è€éã§ãã ç£æ¥éçºã«ããããã®é©çšæ§ã®è©±é¡ã«ã¯ã²ã©ãæŠãããããŸãã ãã®ããªã±ãŒããªåé¡ã§ã¯ãããããä¿å®çãªJavaãµããŒã¿ãŒã«åå ããŸãã ããããScalaã«ã¯ä»ã®èšèªãããåªããæ©èœãããããšãèªããªããã°ãªããŸãããã€ãŸããããŒã¿ã¹ããªãŒã ã®åŠçãšã¹ã¬ããéã®çžäºäœçšã®æ§ç¯ã§ãã Scalaã«æ
£ããŠããªãJavistsã®å Žåãã³ã¬ã¯ã·ã§ã³ ãã¹ã¬ãããããã³æ©èœã®æäœã¯Scalaã§ç°¡åãã€èªç¶ã«è¡ãããããšã«æ³šæããå¿
èŠããããŸãã ããªãã¿ã®Java Stream APIãšjava.util.functionalã¯ãScalaã®æšæºããŒã«ã®éãæ·¡ãã³ããŒã§ãã
ãã®ãããScalaã®æšæºçãªã¢ãããŒããã¯ãŒã¯ãããŒããŒã¿åŠçã®èšè¿°ã«é©çšããè©Šã¿ã¯æåããScaldingã¯HiveãPigãããã³ãããã®å€ãã®ææ°ã®å¯Ÿå¿ç©ã®äººæ°ã«è¿œãã€ãæ©äŒãåŸãŸããã ãã®ãããScalaãåŠã¶ããšã¯çã«ããªã£ãŠããã®ã§ãä»ããè¡ããŸãã
ã¹ã±ãŒãªã³ã°ã®æŠèŠ
ããã§ãScaldingãšCascadingã®å éšæ§é ã«é¢é£ãããã¹ãŠãæèçã«ã¹ãããããŸãã ããã¯ãããŒã¿ãæ°ãã䟿å©ãªã€ã³ã¿ãŒãã§ã€ã¹ãåãããã©ãã¯ããã¯ã¹ã§ãããšæ³å®ããŠããŸãã ãã¹ãŠãããŸãããã°ããããã®ãã¬ãŒã ã¯ãŒã¯ã®å éšæ§é ã«é¢ããå¥ã®èšäºããããŸãã
Scalaã«æ
£ããŠããªã人åã
å宣èšã®åŸã«ã¯ãå€æ°ãŸãã¯é¢æ°ã®ååã®åŸã«ã³ãã³ãç¶ããŸãã
ã¿ãã«ã¯ãäžç·ã«ä¿æãããäžé£ã®ãªããžã§ã¯ãã§ãã ã¿ãã«ã®å žåçãªäŸã¯ããã¢ãããªãã«ãªã©ã§ãã Scalaã§ã¯ããããã¯èšèªã®äžéšã§ãã ã¿ãã«ã¯ãã³ã³ãã§åºåãããæ¬åŒ§ã§å²ãŸããŠããŸãã
ãžã§ããªãã¯ã¯è§æ¬åŒ§ã§ã¯ãªããè§æ¬åŒ§ã§æžãããŠããŸãã
ã¿ãã«ã¯ãäžç·ã«ä¿æãããäžé£ã®ãªããžã§ã¯ãã§ãã ã¿ãã«ã®å žåçãªäŸã¯ããã¢ãããªãã«ãªã©ã§ãã Scalaã§ã¯ããããã¯èšèªã®äžéšã§ãã ã¿ãã«ã¯ãã³ã³ãã§åºåãããæ¬åŒ§ã§å²ãŸããŠããŸãã
ãžã§ããªãã¯ã¯è§æ¬åŒ§ã§ã¯ãªããè§æ¬åŒ§ã§æžãããŠããŸãã
val longConstant: Long = 0L // final long longConstant = 0L; var list: List[Int] // List<Integer> list; (String, Int) // Pair<String, Integer>
ãã©ããæäœ
Scaldingã®äž»ãªæŠå¿µã¯Pipeã§ãã ãã€ãã¯ãããŒã¿ãããã°ã©ããŒã«åãã£ãŠæµãããã€ãã©ã€ã³ã§ãã æ¬è³ªçã«ãããã¯Java 8ã®ã¹ããªãŒã ã«äŒŒãŠããŸããæåã®Pipeå®è£ ã¯ã¿ã€ãã³ã°ããµããŒãããŠããŸããã§ããããé·ç¶ãããŸããã§ããã å³å¯ãªåæå®ã®ãã¡ã³ã¯TypedPipeãæãä»ããŸãã-Javaã®çšèªã§ãå³å¯ã«å®çŸ©ãããåãªããžã§ã¯ãããžã§ããªãã¯ãåãããã€ãã©ã€ã³ã§ãã
TypedPipeã®å Žåã map ã flatMap ã filter ã limitãªã©ãããã€ãã®æšæºãããŒæäœãå®çŸ©ãããŠããŸãã ãããã¯ãã¹ãŠã¹ããªãŒã ã«å¯Ÿãããã©ãããªæäœã§ãããçè«çã«ã¯ãç¡å¶éã®äžŠååŠçãšä»»æã®éã®ããŒã¿ã«å¯ŸããŠå¹æçã«å®è¡ã§ããŸãã
TypedPipeã®ããŒã¿ã¯ã©ããããèªã¿åãå¿ èŠããããŸãã ãã®ããã«ãScaldingã«ã¯Source ïŒããŒã¿ãœãŒã¹ïŒããããŸãã ãã®å¯äžã®ç®çã¯ãPipeãŸãã¯TypedPipeãçæããããšã§ãã ããã€ãã®æ¢è£œã®ãœãŒã¹ãããããããã®ã»ãšãã©ã¯ããŸããŸãªåœ¢åŒã®ãã¡ã€ã«ããèªã¿åããŸãããä»»æã®ã€ãã¬ãŒã¿ãŒããïŒãããã£ãŠã¡ã¢ãªãŒå ã®ã³ã¬ã¯ã·ã§ã³ããïŒèªã¿åãæ©èœããããŠãã¡ããããœãŒã¹ãå€å¥ããæ©èœããããŸãã éèŠãªã®ã¯ãåãã«ã¹ã±ãŒããšã¹ã±ãŒãªã³ã°ã®ã³ãŒããHadoopã¯ã©ã¹ã¿ãŒãšããŒã«ã«ããŒã¿ã®äž¡æ¹ã§æ©èœããããšã§ãããããã¯ãã¹ãã«éåžžã«äŸ¿å©ã§ãã
ãã¹ãŠã®æäœãå®äºããããããŒã¿ãä¿åããŸãã ãã€ãã©ã€ã³ã®æåŸã®éšåã§ããã·ã³ã¯ã¯ãScaldingã§ãã£ã¹ã¯ã«æžã蟌ã圹å²ãæãããŸãã ã·ã³ã¯ã¯Sourceã«äŒŒãŠããŸããå€ãã®å Žåã2ã€ã®ã€ã³ã¿ãŒãã§ã€ã¹ãå®è£ ããã®ã¯åãã¯ã©ã¹ã§ãã
ã°ã«ãŒãåæäœ
MapReduceã䜿çšãããšãTypedPipeã§è¡šãããã¹ããªãŒã ãåç·šæã§ããŸãã ãŸããSQLã®GROUP BYã«çžåœããããŒã§ã¹ããªãŒã å šäœããã¬ã³ãŒããã°ã«ãŒãåããgroupByã°ã«ãŒãåæäœã§ãã ã°ã«ãŒãåããåŸãTypedPipe [V]ã¯ç¹å¥ãªãã©ãŒã Grouped [ KãV]ãåããããã§è¿œå ã®æäœãå©çšå¯èœã«ãªããŸãã
æåã«ã mapGroupã¡ãœãããšmapValuesStreamã¡ãœããã䜿çšããŠãã°ã«ãŒãåãè¡ãããããŒKãããã¢ãšããŠã°ã«ãŒãåããã[KãV]èŠçŽ ãååŸãããã®ããŒã§çºçããVã®ãã¹ãŠã®å€ã®å埩åãååŸã§ããŸãã Scalaã®ã³ã¬ã¯ã·ã§ã³é¢æ°ã¯ãå€å埩åã«é©çšãããŸãã ããããéåžžãããã¯å¿ é ã§ã¯ãããŸããã ã°ã«ãŒãåã«ã¯ãæãäžè¬çãªã±ãŒã¹ãã«ããŒããå€ãã®ã·ã§ãŒãã«ããæ©èœããããŸãã
次ã«ãGroupByã¯sortByæäœã§ãœãŒãã§ããŸãã ãã®åŸãmapGroupãmapValuesStreamãããã³ãããã®ãã¹ãŠã®æŽŸçç©ãé©çšã§ããŸãã
第äžã«ãã°ã«ãŒãåããã[KãV1]ã¯å¥ã®ã°ã«ãŒãåããã[KãV2]ãšçµåã§ããŸãã ããã§ã¯ããªã¬ãŒã·ã§ãã«ããŒã¿ããŒã¹ãšåãã«ãŒã«ãæ©èœããŸããleftJoin ã rightJoin ã join ïŒinnerïŒã outerJoin ïŒfullïŒã䜿çšå¯èœã§ãã åºåã¯ã°ã«ãŒãåãããŸã[KãïŒV1ãV2ïŒ]ã
ã°ã«ãŒãåãããŠããªãã¹ããªãŒã ã«TypedPipe [ïŒKãVïŒ]ãã¢ãå«ãŸããŠããå Žåã hashJoinæäœãé©çšã§ããããšã«æ³šæããŠãã ããã éåžžã®Grouped.joinã«äŒŒãŠããŸãããã¡ã¢ãªå ã§å®è¡ãããŸãã ããã¯ãå°ããªãã£ã¬ã¯ããªããã®ããŒã¿ãå å®ãããã®ã«é©ããŠããŸããã倧ããªããŒãã«ã§ã¯OOMã«ã€ãªããå¯èœæ§ããããŸãã
ã°ã«ãŒãåã¯ãTypedPipeãããŒããŸãã¯å€ã«å¯Ÿããæäœã䜿çšããŠãTypedPipeã«æ»ãããšãã§ããŸãã æåã¯ããŒãšå€ã®äž¡æ¹ãä¿åããæ®ãã¯1ã€ã®ãã®ãè¿ããŸãã
äŸã«ããã¹ã±ãŒãªã³ã°
ããã§ããã¬ãŒã ã¯ãŒã¯ã®äž»ãªæ©èœã確èªããåŸãäŸã䜿çšããŠãããã©ã®ããã«æ©èœããããèŠãŠã¿ãŸãããã
ç§ãã¡ãRTBãµã€ãã§ããã芳å¯ããããµã€ãã®URLã«ãããŠãŒã¶ãŒã®ã¯ãªãã¯ã®å±¥æŽããããšããŸãã ã¹ããŒãªãŒã¯ãURLãTimestampãUserIdã®3ã€ã®åãæã€å·šå€§ãªTSVãã¡ã€ã«ã§è¡šç€ºãããŸãã
ãããã¯ã«é¢ããããŒã¯ã¢ãããµã€ãããããŸãã ãµã€ãã®æ°ã¯ããã»ã©å€ããããŸããããæ倧æ°åã§ãã ãã¹ãŠã®ããŒã¯ã¢ããã¯ããã¡ã€ã³ãšãããã¯ã®åãæã€å°ããªTSVãã¡ã€ã«ã«é 眮ãããŸãã
ãŠãŒã¶ãŒããããã¯ãåãæ¿ããé »åºŠãç解ããå¿ èŠããããŸãã ãããè¡ãã«ã¯ããŠãŒã¶ãŒã1ã€ã®ãµããžã§ã¯ãã®ãµã€ãããå¥ã®ãµããžã§ã¯ãã®ãµã€ãã«ç§»åãããšãã«ãã¯ãªãã¯å±¥æŽã«ãããã®ã€ãã³ãã®ã¿ãæ®ãå¿ èŠããããŸãã
ãã®å€æãè¡ãã³ãŒããäœæããŸãã çºå°ã€ã³ãã©ã¹ãã©ã¯ãã£ã¯èæ ®ãããŸããã èå³ãããã°ãå®å šãªãµã³ãã«ã³ãŒãã¯githubã§å ¥æã§ããŸãã
Scalaã§ã¯ãåã®ãšã€ãªã¢ã¹ãèšå®ã§ããŸãã ããã¯äŸ¿å©ã§ã å宣èšã§ããã¹ããªã³ã°ãšå¥ã®ã¹ããªã³ã°ãåºå¥ã§ããããã«ãªããŸãã
type Domain = String type UserId = String type Topic = String type Url = String type Timestamp = Long
ãã¡ã€ã³ã¢ãã«ããã¯ã©ã¹ã宣èšããŸãã
case class Click(url: Url, ts: Timestamp, userId: UserId) case class SiteInfo(domain: Domain, topic: Topic)
Scalaã®ã±ãŒã¹ã¯ã©ã¹ã¯ãäžå€ã®å€ã®ã¯ã©ã¹ãèšè¿°ãã䟿å©ãªæ¹æ³ã§ãã ã³ã³ã¹ãã©ã¯ã¿ãŒãã²ãã¿ãŒãããã³ãã®ä»ã®åæ§ã®ã³ãŒããèªåçã«çæãããŸãã
ã¯ãªãã¯ããŠè¡šãèªãïŒ
val clicksPipe: TypedPipe[Click] = TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple))
ããã§ããœãŒã¹ïŒåïŒStringãLongãUserIdïŒã®åãæã€åä»ãTSVïŒãçºè¡šããŸããã 次ã«ããã®ãœãŒã¹ãTypedPipeã§ã©ããããŸããã ããã«ã䟿å®äžã3åã®ã¿ãã«ïŒUrlãTimestampãUserIdïŒãClickã¯ã©ã¹ã®ãªããžã§ã¯ãã«å€æããŸããã
TypedPipe [ã¯ãªãã¯]ãå€æããŸããã
URLã®ãã¡ã€ã³ã®ã¿ãæ®ããŸãã
def url2domain(url: Url): Domain = { return new URL(url).getHost } val domainsPipe: TypedPipe[Click] = clicksPipe .map(click => click.copy(url = url2domain(click.url)))
ãã¡ã€ã³ã件åã§åå²ãããŠãããã£ã¬ã¯ããªãèªã¿åããããã«hashJoinã«é©ãã圢åŒã«ã°ã«ãŒãåããŸãã
val sitesGroupByDomain: Grouped[Domain, SiteInfo] = TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(siteInfo => siteInfo.domain)
ãµã€ãã®ãããã¯ã«é¢ããã¯ãªãã¯æ å ±ã®æµãã«è¿œå ããŸãã ãããè¡ãã«ã¯ãã¯ãªãã¯ã¹ããªãŒã ããã¡ã€ã³ã®ãã£ã¬ã¯ããªã«åå ãããŸãã
val clicksWithSiteInfo: TypedPipe[(Domain, (Click, SiteInfo))] = domainsPipe .map(click => (click.url, click)) .hashJoin(sitesGroupByDomain)
ã¯ãªãã¯ã®ã¹ããªãŒã ããŠãŒã¶ãŒããšã«ã°ã«ãŒãåããã¯ãªãã¯ã®ã¿ã€ã ã¹ã¿ã³ãã§äžŠã¹æ¿ããŸãã ããã«ããã¡ã€ã³ã«é¢ããæ å ±ã«ã¯ãã¯ãé¢å¿ããªãããµã€ãã®äž»é¡ã«é¢ããæ å ±ã ãã§ååã§ãã ãããè¡ãããã«ããããã¯ã«å¯ŸãããŠãŒã¶ãŒã®ç©æ¥µçãªé¢å¿ãäžåºŠã«åæ ããè£å©ã¯ã©ã¹ãå°å ¥ããŸãã
case class TopicActivity(topic: Topic, ts: Timestamp, userId: UserId) val topicActivityStreamPerUser: SortedGrouped[UserId, TopicActivity] = clicksWithSiteInfo .map(tuple => { val (domain, (click, siteInfo)) = tuple TopicActivity(siteInfo.topic, click.ts, click.userId) }) .groupBy(activity => activity.userId) .sortBy(activity => activity.ts)
æãé£ããç¬é-ãŠãŒã¶ãŒã¢ã¯ãã£ããã£ã®æµãã®äžã§ããããã¯ãåãæ¿ããç¬éããã£ããããå¿ èŠããããŸãã åãæ¿ãããã£ããããããã«ãScalaã§Javaã¹ã¿ã€ã«ã®é¢æ°ãäœæããŸãã çµæã¯ArrayBufferïŒArrayListã®é¡äŒŒç©ïŒã«èç©ãããéåžžã«é·ãã¹ããŒãªãŒã§OOMã«ã€ãªããå¯èœæ§ããããŸãã
def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { val result = ArrayBuffer[TopicActivity]() var firstTs = 0l var lastTopic = null.asInstanceOf[Topic] for (activity <- activities) { if (firstTs == 0l || lastTopic != activity.topic) { result.append(activity) firstTs = activity.ts lastTopic = activity.topic } } result.toIterator } val firstTopicActivitiesPipe: TypedPipe[TopicActivity] = topicActivityStreamPerUser .mapGroup((userId, activities) => topicSwitches(userId, activities)) .values
åé¢å¿ã®æåã®ã¢ã¯ãã£ããã£ã®ã¿ãã¹ããªãŒã ã«æ®ããŸããã ãããã¯ããŠãŒã¶ãŒã®é¢å¿ã®çŠç¹ãæéãšãšãã«ã©ã®ããã«å€åãããã远跡ããããã«äœ¿çšã§ããŸãã çµæããã¡ã€ã«ã«æžã蟌ãããšã¯æ®ã£ãŠããŸãã
firstTopicActivitiesPipe .map(activity => (activity.topic, activity.ts, activity.userId)) .write(TypedTsv(args.required("output")))
以äžã§ãã æåéã40è¡ã§ãéèŠãªããŒã¿å€æã説æããŸããã
ã¹ã«ã©ãŠã§ã€ã®æçµã³ãŒã
æ£èŠã®ã¹ã«ã©ãŒãŠã§ã€ã«åŸããšãã³ãŒãã¯ããã«çããªããŸãã ãŸãããããã¯ãå埩ã¢ãããŒãããæ©èœã¢ãããŒãã«åãæ¿ããããã®æ€çŽ¢æ©èœãæžãæããŠããããã¡ãŒã®äœ¿çšãåé€ããããšãã§ããŸãã ä»ãããã»ã¹ã¯ç¡éã®å ¥ãå£ã§ãèœã¡ãŸããã çè«çã«ã¯...
def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = { activities.scanLeft(Helper())((helper, activity) => { if (helper.topic.isEmpty || helper.topic.get != activity.topic) { Helper(Some(activity.topic), activity.ts, true) } else { Helper(helper.topic, helper.firstTs, false) } }).filter(_.firstVisit).map(helper => TopicActivity(helper.topic.get, helper.firstTs, userId)) } TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks)) .map(tuple => Click.tupled(tuple)) .map(click => click.copy(url = new URL(click.url).getHost)) .map(click => (click.url, click)) .hashJoin( TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites)) .map(tuple => SiteInfo.tupled(tuple)) .groupBy(_.domain) ) .map({case (_, (click, siteInfo)) => TopicActivity(siteInfo.topic, click.ts, click.userId)}) .groupBy(_.userId) .sortBy(_.ts) .mapGroup(topicSwitches) .values .write(TypedTsv(outputPath))
次ã®èšäºã§ã¯ãã€ã³ã©ã€ã³ããŒã¿ã®åŠçãšãã¹ãã®ããã«ã³ãŒããæŽçããåé¡ã«ã€ããŠèª¬æããŸãã ãããŠæåŸã«ããã¹ãŠãå éšããã©ã®ããã«æ©èœãããã説æããŸãã
å
責äºé
Javaããã°ã©ããŒã«ãµã³ãã«ã³ãŒããã§ããéãæ確ã«èšè¿°ããããããçš®é¡ã®éæ³ã®å€æãé¿ãããã€ããç¯çŽããŸããã ããã¯ãETLããã»ã¹ã«å°ããªScalaãè¿œå ããã®ãè¿
éã§ç°¡åã§ããããšã瀺ãããã§ãã ã³ãŒãã¯æé©ã§ã¯ãããŸããã ããå¹ççã«æžãæ¹æ³ãç¥ã£ãŠãããªããããªãã¯çŽ æŽãããã§ãã
è³æº
å®å šãªgithubã®ãµã³ãã«ã³ãŒã
ã¹ã±ãŒãªã³ã°wiki
ããã¯ãScaldingã䜿çšããMapReduceã®ããã°ã©ãã³ã°ã