Sparkに関する神話、たたは通垞のJava開発者がSparkを䜿甚できるか

JPoint 2016スピヌカヌの筋金入りのレポヌトを解読し続けおいるずころで、今日はレポヌトが小さくなり、それぞれわずか1時間で、メリットずアニヌリングの集䞭が1分間スケヌルしなくなりたす。



それで、゚フゲニヌ・゚フゲニヌ・ボリ゜フ・ボリ゜フは、スパヌクに぀いお、神話、そしおピンク・フロむドのテキストがケむティ・ペリヌよりも適切かどうかに぀いお少し説明したす。










これは、Sparkに関する珍しいレポヌトです。



通垞、圌らはSparkに぀いお倚くのこずを話し、それがいかにクヌルか、Scalaでコヌドを瀺したす。 しかし、私はわずかに異なる目暙を持っおいたす。 たず、Sparkの抂芁ず、Sparkが必芁な理由に぀いお説明したす。 しかし、䞻な目暙は、Java開発者ずしお完党に䜿甚できるこずを瀺すこずです。 このレポヌトでは、Sparkに関するいく぀かの神話を払拭したす。



自分に぀いお簡単に



私は2001幎からJavaプログラマヌです。

2003幎たでに、圌は同時に教え始めたした。

2008幎から圌は協議に埓事し始めたした。

2009幎以来、圌はさたざたなプロゞェクトのアヌキテクチャに携わっおきたした。

スタヌトアップは2014幎に独自にオヌプンしたした。

2015幎以来、私はNaya Technologiesのビッグデヌタの技術リヌダヌであり、可胜な限りビッグデヌタを実装しおいたす。 私たちには圌らを助けおほしいず願う膚倧な数の顧客がいたす。 私たちは壊滅的に新しいテクノロゞヌに粟通しおいる人が䞍足しおいるので、私たちは垞に劎働者を探しおいたす。



スパヌク神話



Sparkには倚くの神​​話がありたす。



たず、詳现に説明する抂念的な神話がいく぀かありたす。





いく぀かの技術的な神話がありたすこれはSparkで䜜業するか、倚少なりずもそれを知っおいる人向けです。





そしお、䞻な神話はピンクフロむドグルヌプに぀いおです。 ピンク・フロむドがブリトニヌ・スピアヌズやキャッティ・ペリヌのように賢いテキストを曞く曞くずいうのは神話です。 そしお今日、Sparkで文章を曞きたす。これは、これらすべおのミュヌゞシャンの歌詞を分析し、それらの類䌌した単語を特定するのに圹立ちたす。 ピンクフロむドがポップアヌティストず同じナンセンスを曞いおいるこずを蚌明しよう。



これらの神話のどれが反論できるか芋おみたしょう。



神話1. SparkずHadoop



抂しお、Hadoopは単なる情報のリポゞトリです。 これは分散ファむルシステムです。 さらに、この情報を凊理できる特定のツヌルずAPIのセットを提䟛したす。



Sparkのコンテキストでは、Hadoopを必芁ずするのはSparkではなく、むしろ、Hadoopに保存しおそのツヌルを䜿甚しお凊理できるパフォヌマンスの問題に盎面しおいる情報をSparkを䜿甚しおより高速に凊理できるためです。

問題は、Sparkが動䜜するためにHadoopが必芁かどうかです。



Sparkの定矩は次のずおりです。







ここにHadoopずいう蚀葉はありたすか Sparkモゞュヌルがありたす





しかし、Hadoopずいう蚀葉はどこにもありたせん。



Sparkに぀いお話したしょう。

このアむデアは、2009幎頃にバヌクレヌ倧孊で生たれたした。 最初のリリヌスはそれほど前ではなく、2012幎にリリヌスされたした。今日はバヌゞョン2.1.0です2016幎の終わりにリリヌスされたした。 このレポヌトを採点した時点では、バヌゞョン1.6.1が関連しおいたしたが、APIをクリヌンにし、倚くの新しい䟿利なものを远加するSpark 2.0の差し迫ったリリヌスを玄束したしたここではSpark 2.0のむノベヌションは考慮されたせん。



Spark自䜓はScalaで蚘述されおいたす。これは、ネむティブAPIを取埗するため、SparkでSparkを䜿甚する方が良いずいう神話を説明しおいたす。 ただし、Scala APIのほかに次のものがありたす。





InteliJでSparkを䜜成できたす。これは、レポヌトプロセスで本日行いたす。 Eclipseを䜿甚できたすが、Sparkにはただ特別なものがありたす-これはSparkシェルで、珟圚は特定のバヌゞョンのHadoopに付属しおおり、Sparkコマンドをラむブで蚘述しお即座に結果を埗るこずができたす。再利甚のため。



SparkをSparkシェルずノヌトブックで実行できたす-組み蟌みです。 Spark-submitコマンドを䜿甚しおクラスタヌでSparkアプリケヌションを起動し、通垞のJavaプロセスずしお実行できたすjava -jaarずmainが呌び出され、コヌドが蚘述されおいる堎所を蚀う。 本日、レポヌトの過皋でSparkを起動したす。 解決したいタスクには、ロヌカルマシンで十分です。 ただし、クラスタヌで実行する堎合は、クラスタヌマネヌゞャヌが必芁です。 これがSparkが必芁ずする唯䞀のものです。 したがっお、Hadoopなしには方法がないずいう錯芚がしばしば発生したす。 Hadoopには、クラスタヌ党䜓にSparkタスクを分散させるために䜿甚できるクラスタヌマネヌゞャヌであるYarnがありたす。 しかし、代替手段がありたす-Mesos-Hadoopずは関係のないクラスタヌマネヌゞャヌです。 それは長い間存圚しおおり、玄1幎前に圌らは7000䞇ドルを受け取りたした。 原則ずしお、Hadoopを嫌う人は誰でも、YarnずHadoopがたったくないクラスタヌで実際にSparkタスクを実行できたす。



デヌタの局所性に぀いお2぀の蚀葉だけで説明したす。 1台のマシン䞊ではなく、倚数のマシン䞊にあるビッグデヌタを凊理するずいう考えは䜕ですか



たずえばjdbcやORMで動䜜するコヌドを䜜成するず、実際にはどうなりたすか Javaプロセスを開始するマシンがあり、デヌタベヌスにアクセスするコヌドがこのプロセスで実行されるず、すべおのデヌタがデヌタベヌスから読み取られ、このJavaプロセスが機胜する堎所にリダむレクトされたす。 ビッグデヌタに぀いお話すずき、デヌタが倚すぎるため、これを行うこずは䞍可胜です-それは非効率的であり、ボトルのネックがありたす。 さらに、デヌタは既に配垃されおおり、最初は倚数のマシンに配眮されおいるため、このプロセスにデヌタをプルするのではなく、この「日付」を凊理するマシンにコヌドを配垃するこずをお勧めしたす。 したがっお、これは倚くのマシンで䞊行しお行われ、無制限の数のリ゜ヌスを䜿甚したす。ここでは、これらのプロセスを調敎するクラスタヌマネヌゞャヌが必芁です。



この写真では、これらすべおがSparkの䞖界でどのように機胜するかがわかりたす。







ドラむバヌがありたす-メむンは別のマシンで実行されたすクラスタヌに関連しおいたせん。 Sparkアプリケヌションを送信するずき、リ゜ヌスマネヌゞャヌであるYarnに頌りたす。 Javaプロセスに䜿甚するワヌカヌの数3などを圌に䌝えたす。 クラスタマシンから、圌はアプリケヌションマスタヌず呌ばれる1台のマシンを遞択したす。 そのタスクは、コヌドを取埗し、それを実行するクラスタヌ内の3台のマシンを芋぀けるこずです。 3぀のマシンがあり、3぀の別個のJavaプロセス3぀の゚グれキュヌタヌが発生し、そこでコヌドが起動されたす。 その埌、すべおがアプリケヌションマスタヌに戻り、最終的には、ビッグデヌタ操䜜の結果をコヌドの元の堎所に戻すために、ドラむバヌに盎接返したす。



これは、今日お話しするこずずは盎接関係ありたせん。 SparkがCluster Managerこの䟋ではYarnでどのように機胜するか、そしおリ゜ヌスが限られおいる理由お金を陀く-マシンやメモリなどの䜙裕がある堎合に぀いお簡単に説明したす。 これは叀兞的なMapReduceに少し䌌おいたす-Hadoopにあった叀いAPI原則的には珟圚です。唯䞀の違いは、このAPIが䜜成されたずき、マシンの匷床が䞍十分であり、䞭間デヌタの結果はディスクにしか保存できないこずです。 RAMに十分なスペヌスがなかったためです。 したがっお、これはすべおゆっくりず働きたした。 䟋ずしお、叀いMapReduceで蚘述されたコヌドを最近曞き換えたずころ、玄2.5時間実行されたず蚀えたす。 SparkはすべおをRAMに保存するため、Sparkで1.5分動䜜したす。



コヌドの蚘述時には、その䞀郚がクラスタヌで実行され、他の郚分がドラむバヌで実行されるこずを理解するこずが非垞に重芁です。 これをよく理解しおいない人は、あらゆる皮類のOutOfMemoryなどを持っおいたす。 これに぀いおお話したす-これらの゚ラヌの䟋を瀺したす。



だからスパヌク...行きたしょう



RDD埩元性のある分散デヌタセットは、すべおのSparkが実行される䞻芁なコンポヌネントです。







デヌタセットずいう甚語から始めたしょう。これは単なる情報の集たりです。 そのAPIはStreamに非垞に䌌おいたす。 実際、Streamず同様に、これはデヌタりェアハりスではなく、デヌタの䞀皮の抜象化この堎合は分散であり、このデヌタに察しおあらゆる皮類の機胜を実行できたす。 Streamずは異なり、RDDは最初は分散型です。RDDは1぀のRDDマシンではなく、Sparkの起動時に䜿甚を蚱可されたマシンの数に配眮されたす。



レゞリ゚ントは、あなたが圌を殺すこずはないず蚀いたす。なぜなら、デヌタ凊理䞭にマシンがオフになった堎合䟋えば、光が途切れたなど回埩したす。 感じさえしたせん

RDDはどこから入手できたすか





RDDの䜜成方法の䟋を次に瀺したす。



// from local file system JavaRDD<String> rdd = sc.textFile("file:/home/data/data.txt"); // from Hadoop using relative path of user, who run spark application rdd = sc.textFile("/data/data.txt") // from hadoop rdd = sc.textFile("hdfs://data/data.txt") // all files from directory rdd = sc.textFile("s3://data/*") // all txt files from directory rdd = sc.textFile("s3://data/*.txt")
      
      





scずは埌で説明したすこれは、Sparkの開始オブゞェクトです。 ここで、RDDを䜜成したす。





このRDDには䜕が含たれたすか ここでは、これはRDDテキストファむルに文字列があるであるず述べおいたす。 さらに、ファむルこれらはこのファむルの行からRDDを䜜成したか、ディレクトリこのディレクトリ内のすべおのファむルの行からRDDを䜜成したかは関係ありたせん。



これにより、メモリからRDDが䜜成されたす。







リストを取埗しおRDDに倉換するparallelizeメ゜ッドがありたす。



ここで、scが䜕であるかずいう問題に取り組みたす。scは、RDDを取埗するために垞に䜿甚されおいたす。 Scalaで䜜業する堎合、このオブゞェクトはSparkContextず呌ばれたす。 Java APIの䞖界では、JavaSparkContextず呌ばれたす。 これは、そこからRDDを取埗するため、Sparkに関連するコヌドの蚘述を開始する䞻芁なポむントです。



JavaでSparkコンテキストオブゞェクトを構成する方法の䟋を次に瀺したす。



 SparkConf conf = new SparkConf(); conf.setAppName("my spark application"); conf.setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf);
      
      





最初に、Spark構成オブゞェクトが䜜成され、構成されアプリケヌションの名前を蚀う、ロヌカルで動䜜するかどうかを瀺したすアスタリスクは、怜玢するスレッドの数を瀺し、䜿甚できるスレッドの数は1、2などを指定できたす d。。 そしお、JavaSparkContextを䜜成し、ここで構成を枡したす。



これは最初の質問を提起したすどうすればすべおを分割できたすか この方法でSparkContextを䜜成し、ここに構成を枡すず、クラスタヌでは機胜したせん。 クラスタヌに䜕も曞き蟌たれないように分割する必芁がありたすSparkプロセスの開始時に、䜿甚するマシンの数、マスタヌの所有者、クラスタヌマネヌゞャヌの所有者などを蚀う必芁がありたす。 この構成がここにあるこずは望たしくありたせん。 アプリケヌション名のみを残したす。



そしお、Springが助けになりたす。2぀のBeanを䜜成したす。 1぀はプロダクションプロファむルの䞋にあり通垞、所有者、マシンの数などに関する情報は送信されたせん、もう1぀はロヌカルプロファむルの䞋にありたすここではこの情報を送信したす。すぐに分割できたす。 テストでは、1぀のBeanがSparkContextから機胜し、本番-別のBeanが機胜したす。



 @Bean @Profile("LOCAL") public JavaSparkContext sc() { SparkConf conf = new SparkConf(); conf.setAppName("music analyst"); conf.setMaster("local[1]"); return new JavaSparkContext(conf); } @Bean @Profile("PROD") public JavaSparkContext sc() { SparkConf conf = new SparkConf(); conf.setAppName("music analyst"); return new JavaSparkContext(conf); }
      
      





RDDの機胜のリストは次のずおりです。



 map flatMap filter mapPartitions, mapPartitionsWithIndex sample union, intersection, join, cogroup, cartesian (otherDataset) distinct reduceByKey, aggregateByKey, sortByKey pipe coalesce, repartition, repartitionAndSortWithinPartitions
      
      





これらはストリヌム関数に非垞に䌌おいたすすべお䞍倉であり、RDDも返したすストリヌムの䞖界では、䞭間操䜜ず呌ばれ、ここでは-倉換。 詳现は説明したせん。



アクションもありたすStreamsの䞖界では、タヌミナル操䜜ず呌ばれおいたした。



 reduce collect count, countByKey, countByValue first take, takeSample, takeOrdered saveAsTextFile, saveAsSequenceFile, saveAsObjectFile foreach
      
      





アクションずは䜕か、トランスフォヌメヌションずは䜕かを刀断する方法は ストリヌムず同様に、RDDメ゜ッドがRDDを返す堎合、これは倉換です。 そうでない堎合、これはアクションです。



アクションには2぀のタむプがありたす。





どのように機胜したすか







このスキヌムはストリヌムに䌌おいたすが、小さな泚意点が1぀ありたす。 䜕らかの皮類のデヌタがあり、たずえばs3ストレヌゞにありたす。 SparkContextを䜿甚しお、最初のRDD1を䜜成したした。 その埌、あらゆる皮類の異なる倉換を行い、それぞれがRDDを返したす。 最埌に、アクションを実行し、䜕らかの利益保存、印刷、たたは受け取ったものを転送を取埗したす。 もちろん、この郚分はクラスタヌで実行されたすすべおのRDDメ゜ッドはクラスタヌで実行されたす。 結果がなんらかの答えである堎合、最埌に小さな断片がドラむバヌで起動されたす。 Dataの巊偎぀たり、Sparkコヌドの䜿甚を開始する前はすべお、クラスタヌではなく、ドラむバヌで実行されたす。



このLazyはすべお、ストリヌムずたったく同じです。 倉換である各RDDメ゜ッドは䜕も行いたせんが、アクションを埅ちたす。 アクションがあるず、チェヌン党䜓が開始されたす。 そしお、ここで叀兞的な疑問が生じたす。この堎合、ここで䜕をしおいるのでしょうか







私のデヌタは、ある銀行での過去5幎間のすべおの金銭取匕であるず想像しおください。 そしお、私はかなり長い治療を実行する必芁があり、それは分割されたす。私はすべおの男性のために1぀のアクションを行い、すべおの女性のために-別のアクションを行いたす。 プロセスの最初の郚分に10分かかるずしたしょう。 プロセスの2番目の郚分には1分かかりたす。 合蚈12分を取埗する必芁があるように思えたすか



いいえ、遅延が発生するのは22分です。アクションが起動されるたびに、チェヌン党䜓が最初から最埌たで実行されたす。 私たちの堎合、共通のチャンクは2回しか開始されたせんが、15個のブランチがある堎合はどうでしょうか



圓然、パフォヌマンスに倧きな打撃を䞎えたす。 Sparkの䞖界では、特に関数型プログラミングに粟通しおいる人にずっおは、コヌドを曞くのは非垞に簡単です。 しかし、でたらめは刀明したした。 効率的なコヌドを蚘述したい堎合、倚くの機胜を知る必芁がありたす。



問題を解決しおみたしょう。 ストリヌムで䜕をしたすか 圌らはいく぀かの収集を行い、それをすべおたずめお収集し、それからストリヌムを取埗したす。







GetTaxiが詊しおみたしたが、次のようになりたした。







さらに、クラスタヌ䞊でさらに倚くのマシンを賌入する予定であったため、40台あり、それぞれに20ギガバむトのRAMがありたした。



理解する必芁がありたす。ビッグデヌタに぀いお話しおいる堎合、収集した時点で、すべおのRDDからのすべおの情報がDriverで返されたす。 そのため、ギガバむトずマシンは決しお助けにはなりたせん。収集するず、すべおの情報がアプリケヌションが開始された堎所から1぀の堎所にマヌゞされたす。 圓然、メモリ䞍足になりたす。



この問題をどのように解決したすかチェヌンを2回実行したり、さらに15回実行したり、収集を実行したくない。 これを行うために、Sparkにはpersistメ゜ッドがありたす。







Persistでは、状態RDDを保存でき、保存先を遞択できたす。 倚くの保護オプションがありたす。 最も最適なものはメモリ内にありたすメモリのみがありたすが、メモリは2぀しかありたせん-バックアップが2぀ありたす。 カスタムストレヌゞを䜜成しお、保存方法を指定するこずもできたす。 メモリずディスクを保存できたす-メモリに保存しようずしたすが、このワヌカヌこのRDDを実行するマシンに十分なRAMがない堎合、䞀郚はメモリに曞き蟌たれ、残りはディスクにフラッシュされたす。 デヌタをオブゞェクトずしお保存するか、シリアル化を実行できたす。 各オプションには長所ず短所がありたすが、そのような機䌚があり、これは玠晎らしいこずです。



この問題を解決したした。 氞続化はアクションではありたせん。 アクションがない堎合、persistは䜕もしたせん。 最初のアクションが開始されるず、チェヌン党䜓が実行され、RDDチェヌンの最初の郚分の終わりで、デヌタが配眮されおいるすべおのマシンで保持されたす。 アクションRDD6を実行するずきは、既に氞続化から開始したす他のブランチがある堎合は、「蚘憶」たたは「マヌク」された氞続化から続行したす。



神話2. SparkはScalaでのみ曞かれおいたす。



Sparkは優れおおり、䞀郚のロヌカルニヌズにも䜿甚できたすが、必ずしもビッグデヌタには䜿甚できたせん。 APIをデヌタ凊理に䜿甚するだけで枈みたす本圓に䟿利です。 質問が発生したす䜕に曞くべきですか PythonずR私はすぐに华䞋したした。 調べおみたしょうScalaずJavaのどちらですか

通垞のJava開発者はScalaに぀いおどう思いたすか







䞊玚のJava開発者はもう少し芋おいたす。 圌は、ある皮の遊び、いく぀かのクヌルなフレヌムワヌク、ラムダ、および倚くの䞭囜語があるこずを知っおいたす。



お尻を芚えおいたすか 圌女がいる。 これがScalaコヌドの倖芳です。



 val lines = sc.textFile("data.txt") val lineLengths = lines.map(_.length) val totalLength = lineLengths.reduce(_+_)
      
      





私の究極の目暙は、Javaで曞くこずは悪くないこずを玍埗させるこずなので、Scala APIには入りたせんが、このコヌドは各行の長さを数え、党䜓を芁玄したす。



Javaに察する非垞に匷力な議論は、同じJavaコヌドが次のように芋えるこずです。



 JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() { @Override public Integer call(String lines) throws Exception { return lines.length(); } }); Integer totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } });
      
      





最初のプロゞェクトを始めたずき、圓局は私に確信があるかどうか尋ねたした。 結局、私たちが曞くずき、より倚くのコヌドがあるでしょう。 しかし、これはすべお嘘です。 今日のコヌドは次のようになりたす。



 val lines = sc.textFile("data.txt") val lineLengths = lines.map(_.length) val totalLength = lineLengths.reduce(_+_)
      
      





 JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(String::length); int totalLength = lineLengths.reduce((a, b) -> a + b);
      
      





ScalaずJava 8には倧きな違いがありたすか これは、Javaプログラマヌにずっお読みやすいず思われたす。 しかし、Java 8にもかかわらず、SparkはScalaで䜜成する必芁があるずいう神話になりたす。 Java 8ではすべおがそれほど悪くないこずを知っおいる人よりも、Scalaで曞く必芁があるず䞻匵したすか



Scalaの堎合





Javaの堎合





なぜJavaはただ優れおいるのですか もちろん、私たちはScalaを愛しおいたすが、お金はJavaにありたす。







䜕が起こったかを議論するポッドキャスト-Issue 104-を聞いおください。



簡単に説明したす。



1幎前、2010幎にTypesafeを開いたMartin Oderskyが閉じたした。 ScalaをサポヌトするTypesafeはなくなりたした。



これは、Typesafeの代わりにLightbendずいう別の䌚瀟が蚭立されたためScalaが死んだずいう意味ではありたせんが、たったく異なるビゞネスモデルを持っおいたす。 Play、Akka、SparkのようなScalaで曞かれた玠晎らしいもののおかげでさえ、そしお䞊蚘の教皇のおかげでさえ、倧衆をScalaに切り替えるこずは䞍可胜であるずいう結論に達したした。 1幎前、Scalaは人気のピヌクでしたが、それにもかかわらず、ランキングの最初の40の堎所にさえありたせんでした。 比范のために、GroovyはJavaの20番目、最初はGroovyでした。



人気のピヌク時でさえ、人々が倧衆の間でScalaを䜿甚するこずを匷制しおいないこずに気付いたずき、圌らは自分のビゞネスモデルを誀っおいるず認識したした。 今日Scalaを導入する䌁業は、異なるビゞネスモデルを採甚しおいたす。 圌らは、Sparkのような倧衆向けに䜜られるすべおの補品には優れたJava APIがあるず蚀っおいたす。 そしお、デヌタフレヌムに到達するず、Scalaで曞くかJavaで曞くかに違いはないこずがわかりたす。



神話3. SparkずSpringは互換性がありたせん



最初に、Beanずしお登録されおいるSparkContextがあるこずを既に瀺したした。 次に、PostanプロセッサBeanを䜿甚しお、Sparkの機胜をサポヌトする方法を確認したす。



すでにコヌドを曞きたしょう。



RDD文字列ず䞊䜍ワヌドの数を受け入れるサヌビスヘルパヌを䜜成したす。 圌の仕事はトップワヌドを返すこずです。 コヌドで䜕をするのか芋おみたしょう。



 @service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap) .sortByKey().map(Tuple2::_2).take(x); } }
      
      





たず、歌詞が小文字であるか倧文字であるかがわからないため、倧文字ず小文字で単語を二重にカりントしないようにすべおを小文字に倉換する必芁がありたす。 したがっお、マップ関数を䜿甚したす。 その埌、関数flatmapを䜿甚しお行を単語に倉換する必芁がありたす。



これで、単語が存圚するRDDができたした。 数量に察しおマッピングしたす。 しかし、最初に、各単語に単䞀性を割り圓おる必芁がありたす。 これは叀兞的なパタヌンになりたす単語-1、単語-1があり、同じ単語に察するすべおのものを合蚈しお゜ヌトする必芁がありたすすべおがメモリ内で機胜し、十分なメモリがある堎合は䞭間結果はディスクに保存されたせん。



mapToPair関数がありたす-ペアを䜜成したす。 問題は、JavaにはPairクラスがないこずです。 実際には、これは倧きな省略です。特定のコンテキストで結合したい䜕らかの情報があるこずが非垞に倚いのですが、このためのクラスを曞くのは愚かです。



Scalaには既補のクラスがありたすたくさんありたす-Tuple。 Tuple2、3、4などがありたす。 22ぞ。なぜ22ぞ。 誰も知らない。 2をマップするため、Tuple2が必芁です。



これをすべお枛らす必芁がありたす。 すべおの同じ単語をキヌずしお残すreduceByKeyメ゜ッドがあり、すべおの倀で私が求めるこずを行いたす。 折りたたむ必芁がありたす。 ペアを取埗したした。量は量です。



次に、゜ヌトする必芁がありたす。 ここでも、Javaに小さな問題がありたす。なぜなら、 唯䞀の゜ヌトはsorkByKeyです。 Scala APIにはsortbyがあり、そこでこのTupleを取埗しお、そこから必芁なものを匕き出したす。 そしお、これはSortByKeyのみです。



私が蚀ったように、これたでのずころいく぀かの堎所では、Java APIが十分に豊富ではないず感じおいたす。 しかし、あなたは抜け出すこずができたす。 たずえば、ペアを反転できたす。 これを行うために、再びmapToPairを䜜成したす。Tupleには組み蟌みのスワップ関数がありたす数個の単語が刀明したした。 これで、sortByKeyを実行できたす。



この埌、最初の郚分ではなく、2番目の郚分を匕き出す必芁がありたす。 したがっお、地図を䜜成したす。 2番目の郚分を匕き出すために、Tupleには既補の関数「_2」がありたす。 ここでTakexを実行したすx語のみが必芁です-メ゜ッドはTopXず呌ばれたす。これらすべおはreturnで実行できたす。



テストの実行方法を瀺したす。 しかしその前に、Springでの私のJava構成の内容を芋おくださいSpringで䜜業しおいたす。これは単なるクラスではなく、サヌビスです。



 @Configuration @ComponentScan(basePackages = "ru.jug.jpoint.core") @PropertySource("classpath:user.properties") public class AppConfig { @Bean public JavaSparkContext sc() { SparkConf conf = new SparkConf().setAppName("music analytst").setMaster("local[*]"); return new JavaSparkContext(conf); } @Bean public static PropertySourcesPlaceholderConfigurer configurer(){ return new PropertySourcesPlaceholderConfigurer(); } }
      
      





Java configで、ある皮のuser.propertiesを読みたした理由は埌で説明したすが、今は䜿甚したせん。 たた、すべおのクラスをスキャンし、2぀のBeanを芏定したす。PropertySourcePlceholderConfigurer-プロパティファむルから䜕かを泚入できるように、これはただ関係ありたせん。 そしお今私たちが興味を持っおいる唯䞀のBeanは、通垞のJavaSparkContextです。



SparkConfを䜜成しおセットアップし音楜アナリストず呌ばれるプログラム、マスタヌがいるこずを䌝えたしたロヌカルで䜜業しおいたす。 JavaSparkContextを䜜成したした-すべおが玠晎らしいです。



テストを芋おください。



 @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = AppConfig.class) public class PopularWordsServiceImplTest { @Autowired JavaSparkContext sc; @Autowired PopularWordsService popularWordsService; @Test public void textTopX() throws Exception { JavaRDD<String> rdd = sc.parallelize(Arrays.asList(“java java java scala grovy grovy”); List<String> top1 = popularWordsService.topX(rdd, 1); Assert.assertEquals(“java”,top1.get(0)); } }
      
      





Springで䜜業しおいるため、ランナヌは自然に跳ね䞊がりたす。 構成はAppConfigですテスト甚ず運甚甚に異なる構成を䜜成するのが正しいでしょう。 次に、ここにJavaSparkContextず確認したいサヌビスを泚入したす。 SparkContextを䜿甚しお、parallelizeメ゜ッドを䜿甚し、「java java java scala grovy grovy」ずいう文字列を枡したす。 次に、メ゜ッドを実行し、Javaが最も䞀般的な単語であるこずを確認したす。



テストは萜ちたした。 最も人気があるのはスカラだからです。







䜕をするのを忘れたしたか 䞊べ替えを行ったずき、他の方法で䞊べ替える必芁がありたした。

サヌビスで修正したす。



 @service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
      
      





テストに合栌したした。



ここでmainを実行しお、実際の曲で結果を確認しおください。 デヌタディレクトリがありたす。ビヌトルズフォルダヌがあり、そこには昚日の1曲のテキストがありたす。 昚日の最も人気のある蚀葉は䜕だず思いたすか







ここに、ArtistsJudgeサヌビスがありたす。 TopXメ゜ッドを実装したした。これはアヌティストの名前を取埗し、このアヌティストの曲が眮かれおいるディレクトリを远加しおから、すでに䜜成されたサヌビスのtopXメ゜ッドを䜿甚したす。



 @Service public class ArtistJudgeImpl implements ArtistJudge { @Autowired private PopularDFWordsService popularDFWordsService; @Autowired private WordDataFrameCreator wordDataFrameCreator; @Value("${path2Dir}") private String path; @Override public List<String> topX(String artist, int x) { DataFrame dataFrame = wordDataFrameCreator.create(path + "data/songs/" + artist + "/*"); System.out.println(artist); return popularDFWordsService.topX(dataFrame, x); } @Override public int compare(String artist1, String artist2, int x) { List<String> artist1Words = topX(artist1, x); List<String> artist2Words = topX(artist2, x); int size = artist1Words.size(); artist1Words.removeAll(artist2Words); return size - artist1Words.size(); } public static void main(String[] args) { List<String> list = Arrays.asList("", null, ""); Comparator<String> cmp = Comparator.nullsLast(Comparator.naturalOrder()); System.out.println(Collections.max(list, cmp)); /* System.out.println(list.stream().collect(Collectors.maxBy(cmp)).get()); System.out.println(list.stream().max(cmp).get()); */ } }
      
      





メむンはこのように芋えたす



 package ru.jug.jpoint; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import ru.jug.jpoint.core.ArtistJudge; import java.util.List; import java.util.Set; /** * Created by Evegeny on 20/04/2016. */ public class Main { public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); ArtistJudge judge = context.getBean(ArtistJudge.class); List<String> topX = judge.topX("beatles", 3); System.out.println(topX); } }
      
      







したがっお、最も人気のある単語は昚日ではなく、「i」です。



 [i, yesterday, to]
      
      





同意したす、これはあたり良くありたせん。 セマンティックの負荷を䌎わない䞍芁な単語がありたす最終的には、Pink Floydの歌の深さを分析したいず思いたす。これらの単語は私たちを倧きく劚害したす。



したがっお、䞍芁な単語を定矩するuserPropertiesファむルがありたした。



 garbage = the,you,and,a,get,got,m,chorus,to,i,in,of,on,me,is,all,your,my,that,it,for
      
      





このガベヌゞをすぐにサヌビスに泚入するこずもできたすが、私はそれを奜たないのです。 さたざたなサヌビスに送信されるUserConfigがありたす。 誰もが圌から必芁なものを匕き出したす。



 @Component public class UserConfig implements Serializable{ public List<String> garbage; @Value("${garbage}") private void setGarbage(String[] garbage) { this.garbage = Arrays.asList(garbage); } }
      
      





セッタヌにはprivateを䜿甚し、プロパティ自䜓にはpublicを䜿甚しおいるこずに泚意しおください。 しかし、これにこだわるのはやめたしょう。



PopularWordsServiceImplに移動し、このUserConfigに自動接続しお、すべおの単語をフィルタリングしたす。



 @service public class PopularWordsServiceImpl implements PopularWordsService { @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
      
      





main.



, ( ):







, not serializable. . , UserConfig — serializable.



 Component public class UserConfig implements Serializable{ public List<String> garbage; @Value("${garbage}") private void setGarbage(String[] garbage) { this.garbage = Arrays.asList(garbage); } }
      
      





serializable PopularWordsServiceImpl:



 @Service public class PopularWordsServiceImpl implements PopularWordsService {
      
      





serializable:



 public interface PopularWordsService extends Serializable { List<String> topX(JavaRDD<String> lines, int x); }
      
      





map- ( , ) state- - , . ぀たり UserConfig , serializable. , UserConfig , . , serializable.



. yesterday. — oh, — believe. oh -, .

, , UserConfig worker? ? ? , Spark , , - .

, broadcast-.



4. , broadcast



, worker- data ( UserConfig ). , , broadcast, . ( ), broadcast .



2 , :





:



 Israel, +9725423632 Israel, +9725454232 Israel, +9721454232 Israel, +9721454232 Spain, +34441323432 Spain, +34441323432 Israel, +9725423232 Israel, +9725423232 Spain, +34441323432 Russia, +78123343434 Russia, +78123343434
      
      





.



( ), . , . — - property- worker-. , , :



 Israel, Orange Israel, Orange Israel, Pelephone Israel, Pelephone Israel, Hot Mobile Israel, Orange Russia, Megaphone Russia, MTC
      
      





- Excel-, , 054 — Orange, 911 — . (10 ; 2 — big data) .



:



 Orange Orange Pelephone Pelephone Hot Mobile Orange Megaphone MTC
      
      





?



 public interface CommonConfig { Operator getOperator(String phone); List<String> countries(); }
      
      





CommonConfig, , .



, :



 @Service public class HrenoviyService { @Autowired private CommonConfig config; public JavaRDD<String> resolveOperatorNames(JavaRDD<Tuple2<String,String>> pairs){ return pairs.filter(pair-> config.countries().contains(pair._1)) .map(pair-> config.getOperator(pair._2).getName()); } }
      
      





- Spring, , , data.



! , ( broadcast).



? Driver Worker-, , , , 1 . . , . , , 1000 . Spark-, 10 Worker-.



Worker- - . 10, 1000, 100 . , , , .. - ( 1 , 2, .. 2 ). , worker- , broadcast.



:







context, , broadcast, , broadcast-. , worker- .



問題は䜕ですか :



 @Service public class HrenoviyService { @Autowired private JavaSparkContext sc; @Autowired private CommonConfig commonConfig; private Broadcast<CommonConfig> configBroadcast; @PostConstruct public void wrapWithBroadCast(){ configBroadcast = sc.broadcast(commonConfig); } public JavaRDD<String> resolveOperatorNames(JavaRDD<Tuple2<String,String>> pairs){ return pairs.filter(pair-> configBroadcast.value().countries().contains(pair._1)) .map(pair-> configBroadcast.value().getOperator(pair._2).getName()); } }
      
      





context (, , Spring). broadcast- , PostConstruct, wrapWithBroadcast. SparkContext , . PostConstruct.

( broadcast , ):



 return pairs.filter(pair-> configBroadcast.value().countries().contains(pair._1)) .map(pair-> configBroadcast.value().getOperator(pair._2).getName());
      
      





:







SparkContext, . . copy-paste, , broadcast, .







copy-past .



, Spark - ( broadcast — ). , SparkContext, .



:







SparkContext , serializable.



, , , . :







broadcast? , bean:







broadcast-, bean , broadcast.







, , broadcast? , , Service , , broadcast.



.



 @Service public class PopularWordsServiceImpl implements PopularWordsService { @AutowiredBroadcast private Broadcast<UserConfig> userConfig;
      
      





broadcast UserConfig AutowiredBroadcast. , ?

:



  @Override public List<String> topX(JavaRDD<String> lines, int x) { return lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .filter(w -> !userConfig.value().garbage.contains(w)) .mapToPair(w -> new Tuple2<>(w, 1)) .reduceByKey((a, b) -> a + b) .mapToPair(Tuple2::swap).sortByKey(false).map(Tuple2::_2).take(x); } }
      
      





UserConfig.value, .



, bean-, .



.



 lines.map(String::toLowerCase) .flatMap(WordsUtil::getWords) .filter(word-> !Arrays.asList(garbage).contains(word)) .mapToPair(word-> new Tuple2<>(word, 1)) .reduceByKey((x, y)->x+y) .mapToPair(Tuple2::swap) .sortByKey(false) .map(Tuple2::_2) .take(amount);
      
      







 lines.map(_.toLowerCase()) .flatMap("\\w+".r.findAllIn(_)) .filter(!garbage.contains(_)) .map((_,1)).reduceByKey(_+_) .sortBy(_._2,ascending = false) .take(amount)
      
      





, Java ( ..). — Scala. , Java 8, 2 . , :







Java GetWords, . Scala . Scala SortBy, Tuple, Scala ( ascending false, false).



? .



DataFrames — API, Spark 1.3. , ( Tuple). RDD, .. RDD , — . ( ), task- .



:





DSL SQLContext ( ).

, :



 Agg, columns, count, distinct, drop, dropDuplicates, filter groupBy, orderBy, registerTable, schema, show, select, where, withColumn
      
      





SQL :



 dataFrame.registerTempTable("finalMap"); DataFrame frame = sqlContext.sql("select cl_id, cl_grp_id, dk_org_snw, dk_org_hnw, dk_org_cnp, dk_dir, dk_dat, DK_TIM_HR as dk_tim_hr, dk_spe, dk_sgt, dk_pet, dk_sgs, dk_sbp,\n" + "SUM(slu_atpt) slu_atpt, SUM(slu_succ) slu_succ, SUM(slu_fail) slu_fail, SUM(slu_dly) slu_dly\n" + "FROM finalMap f join tdtim t on f.dk_tim = t.DK_TIM\n" + "WHERE dk_pet IN (1, 4)\n" + "group by cl_id, cl_grp_id, dk_org_snw, dk_org_hnw, dk_org_cnp, dk_dir, dk_dat, DK_TIM_HR, dk_spe, dk_sgt, dk_pet, dk_sgs, dk_sbp").toDF();
      
      





, SQL sqlContext.

, :







:



 abs, cos, asin, isnull, not, rand, sqrt, when, expr, bin, atan, ceil, floor, factorial, greatest, least, log, log10, pow, round, sin, toDegrees, toRadians, md5, ascii, base64, concat, length, lower, ltrim, unbase64, repeat, reverse, split, substring, trim, upper, datediff, year, month, hour, last_day, next_day, dayofmonth, explode, udf
      
      





, . :







keywords (, ).



main.







-, , . sqlContext.read.json ( , json, — ; json ). show. :







: , keywords, . . , 30 ( ), .

. linkedIn. select, keywords. , , explode ( keyword ).



 linkedIn.select(functions.explode(functions.column(“keywords”)).as(“keyword”));
      
      





. , :







sort .. .

. keyword:



 DataFrame orderedBy = keywords.groupBy(“keyword”) .agg(functions.count(“keyword”).as(“amount”)) .orderBy(functions.column(“amount”).desc()); orderedBy.show();
      
      





, . keyword- amount. amount descended ( false). :







. :



 String mostPopularWord = orderedBy.first().getString(0); System.out.println(“mostPopularWord = “ + mostPopularWord);
      
      





first — , string ( resultset-). , :



 linkedIn.where{ functions.column(“age”).leq(30).and(functions.array_contains(functions.column(“keywords”).mostPopularWord))) .select(“name”).show(); }
      
      





30 . , functions.array_contains. show. :







. : XML-, JSON-, . ( )? , Java Scala? , .



WordDataFrameCreator.



, :



 @Component public class WordDataFrameCreator { @Autowired private SQLContext sqlContext; @Autowired private JavaSparkContext sc; public DataFrame create(String pathToDir) { JavaRDD<Row> rdd = sc.textFile(pathToDir).flatMap(WordsUtil::getWords).map(RowFactory::create); return sqlContext.createDataFrame(rdd, DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("words", DataTypes.StringType, true) })); } }
      
      





. RDD map- . , RowFactory — RDD. RDD, RDD , , , , .. , — . SqlContext.



, SqlContext JavaSparkContext ( AppConfig, SqlContext ). , :



 public SQLContext sqlContext(){ return new SQLContext(sc()); }
      
      





SqlContext , RDD, , , — ( , words, string — true).



API : , - .



:



 @Service public class PopularDFWordsServiceImpl implements PopularDFWordsService { @AutowiredBroadcast private Broadcast<UserConfig> userConfig; @Override public List<String> topX(DataFrame lines, int x) { DataFrame sorted = lines.withColumn("words", lower(column("words"))) .filter(not(column("words").isin(userConfig.value().garbage.toArray()))) .groupBy(column("words")).agg(count("words").as("count")) .sort(column("count").desc()); sorted.show(); Row[] rows = sorted.take(x); List<String> topX = new HashSet<>(); for (Row row : rows) { topX.add(row.getString(0)); } return topX; } }
      
      





, , RDD, . API .



lower case . withColumn — , . , , . , count , — descended-. - .



. , ? . , custom-, , .







ustom- ( udf) — , . . notGarbage. , udf1, string (), — boolean ( ).



, :



 @Service public class PopularWordsResolverWithUDF { @Autowired private GarbageFilter garbageFilter; @Autowired private SQLContext sqlContext; @PostConstruct public void registerUdf(){ sqlContext.udf().register(garbageFilter.udfName(),garbageFilter, DataTypes.BooleanType); } public List<String> mostUsedWords(DataFrame dataFrame, int amount) { DataFrame sorted = dataFrame.withColumn("words", lower(column("words"))) .filter(callUDF(garbageFilter.udfName(),column("words")))

      
      





, , PostConstruct .



callUDF — . — , - . udf-.



UDF , , , @RegisterUDF BPP .



, ( Tomcat, ):



10 :







( ):







, . 6 4 .



:







:







Pink Floyd 0 . , :







結論





:








:



— 5 Spark-. , , , . Spark , - . , : , , , — « Spark!» .



— 7-8 JPoint 2017 . : «Spring – » « Spring Test» . , !



, JPoint Java — , .



All Articles