プロジェクトのコアとしてのApache Spark。 パート1

こんにちは同僚。



最近、Sparkがプロジェクトに登場しました。 開発プロセスでは、多くの困難に直面し、多くのことを学びます。 私は自分のためにこの知識を体系化し、他の人と共有したいと思っています。 そこで、Apache Sparkの使用に関する一連の記事を書くことにしました。 この記事は最初の記事であり、紹介になります。



そのため、Spark自体については、Habréについても1回2 と、かなり多くのことが書かれています。 したがって、少し繰り返す必要があります。



Apache Sparkは、分散データ処理用のアプリケーションを作成できるフレームワークです。 Sparkは、データを操作するためのソフトウェアAPIを提供します。これには、ロード、保存、変換、および集約に加えて、コードの開発とデバッグを目的としてローカルで実行する機能など、多くの小さなものが含まれます。



さらに、Sparkはアプリケーションの分散実行を担当します。 彼自身がコードをクラスターのすべてのノードに分散し、サブタスクに分割し、実行計画を作成して成功を監視します。 いずれかのノードで障害が発生し、一部のサブタスクが失敗すると、再起動されます。



Sparkの柔軟性は、さまざまな分散システムの制御下でアプリケーションを実行できるという事実にあります。





これらすべてのシステムには、さまざまなタスクや要件に関連する独自の利点があります。



Sparkが1位になったのはなぜですか?



最近Sparkの人気が高まっている理由と、古き良きHadoop MapReduce(以降、単にMR)に取って代わり始めた理由を見てみましょう。



問題は、新しいアーキテクチャアプローチです。これは、従来のMRアプリケーションよりも大幅に優れています。



これは、MRが2000年代に開発され始め、コードRAMが高価であり、64ビットシステムがまだ世界を獲得していないことです。 そのため、開発者は唯一の正しい道を進みました-ハードドライブ(正確には分散HDFSファイルシステム)を介した中間データの交換を実装しました。 つまり、MapフェーズとReduceフェーズの間のすべての中間データがHDFSにダンプされました。 その結果、Hadoopクラスターのノード間のディスクI / Oおよびデータ複製に多くの時間が費やされました。



Sparkは後で、まったく異なる条件で登場しました。 現在、中間データはシリアル化されてRAMに保存され、ノード間のデータ交換はネットワークを介して不要な抽象化なしで直接行われます。 ディスクI / Oはまだ使用されている(シャッフルステージで)と言う価値があります。 しかし、その強度ははるかに小さいです。



さらに、JVMの最適化により、Sparkタスクの初期化と起動がはるかに高速になりました。 MapReduceはタスクごとに新しいJVMを起動し、その後のすべての結果(すべてのJARファイルのダウンロード、JITコンパイルなど)を行います。一方、Sparkは各ノードで実行中のJVMを保持し、RPC呼び出しを介してタスクの起動を制御します。

最後に、SparkはRDD抽象化(Resilient Distributed Dataset)を使用します。これはMapReduceよりも汎用的です。 公平性のために、カスケードがあると言わなければなりませんが。 これは、MRのラッパーであり、柔軟性を高めるように設計されています。



さらに、もう1つの非常に重要な状況があります。Sparkを使用すると、バッチ処理タスクだけでなく、データストリームの操作(ストリーム処理)のためのアプリケーションを開発できます。 単一のアプローチと単一のAPIを提供します(ただし、わずかな違いはあります)。



そして、コードではどのように見えますか?



Spark APIはオフィスでよく文書化されています。 siteですが、ストーリーの整合性のために、ローカルで実行できる例で簡単に説明しましょう。



各ユーザーについて、ユーザーがアクセスしたサイトの総数を計算します。 そして、降順でソートされた上位10を返します。



public class UsersActivities { public static void main( String[] args ) { final JavaSparkContext sc = new JavaSparkContext( new SparkConf() .setAppName("Spark user-activity") .setMaster("local[2]") //local -     . .set("spark.driver.host", "localhost") //      ); //      sc.textFile("users-visits.log"); //        parallelize();   List<String> visitsLog = Arrays.asList( "user_id:0000, habrahabr.ru", "user_id:0001, habrahabr.ru", "user_id:0002, habrahabr.ru", "user_id:0000, abc.ru", "user_id:0000, yxz.ru", "user_id:0002, qwe.ru", "user_id:0002, zxc.ru", "user_id:0001, qwe.ru" //,    :) ); JavaRDD<String> visits = sc.parallelize(visitsLog); //    :  (user_id),  (1 -   ) // (user_id:0000 : 1) JavaPairRDD<String, Integer> pairs = visits.mapToPair( (String s) -> { String[] kv = s.split(","); return new Tuple2<>(kv[0], 1); } ); //     user_id JavaPairRDD<String, Integer> counts = pairs.reduceByKey( (Integer a, Integer b) -> a + b ); //  Value    10  List<Tuple2<String, Integer>> top10 = counts.takeOrdered( 10, new CountComparator() ); System.out.println(top10); } // ,    Serializable.  (   ),   //SparkException: Task not serializable //http://stackoverflow.com/questions/29301704/apache-spark-simple-word-count-gets-sparkexception-task-not-serializable public static class CountComparator implements Comparator<Tuple2<String, Integer>>, Serializable { @Override public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { return o2._2()-o1._2(); } } }
      
      





はい、Spark APIはScala、Java、Pythonで利用可能であると言っておく価値があります。 それでも、それはもともとScala専用に設計されました。 なるほど、プロジェクトではJava 8を使用しており、一般的には非常に満足しています。 理由が分からなくなるまで岩に行きます。



次の記事では、ストリーム処理の詳細、それが必要な理由、プロジェクトでの使用方法、SparkSQLについて詳しく説明します。



All Articles