最近、Sparkがプロジェクトに登場しました。 開発プロセスでは、多くの困難に直面し、多くのことを学びます。 私は自分のためにこの知識を体系化し、他の人と共有したいと思っています。 そこで、Apache Sparkの使用に関する一連の記事を書くことにしました。 この記事は最初の記事であり、紹介になります。
そのため、Spark自体については、Habréについても1回と2 回と、かなり多くのことが書かれています。 したがって、少し繰り返す必要があります。
Apache Sparkは、分散データ処理用のアプリケーションを作成できるフレームワークです。 Sparkは、データを操作するためのソフトウェアAPIを提供します。これには、ロード、保存、変換、および集約に加えて、コードの開発とデバッグを目的としてローカルで実行する機能など、多くの小さなものが含まれます。
さらに、Sparkはアプリケーションの分散実行を担当します。 彼自身がコードをクラスターのすべてのノードに分散し、サブタスクに分割し、実行計画を作成して成功を監視します。 いずれかのノードで障害が発生し、一部のサブタスクが失敗すると、再起動されます。
Sparkの柔軟性は、さまざまな分散システムの制御下でアプリケーションを実行できるという事実にあります。
- スタンドアロンモード 。 このモードでは、Sparkインフラストラクチャを個別に展開でき、クラスターのすべてのリソースを管理し、アプリケーションを実行します。
- 糸 これは、Hadoopエコシステムの一部であるコンピューティングプラットフォームです。 Sparkアプリケーションは、このプラットフォームを実行しているHadoopクラスターで実行できます。
- メソス 。 別の代替クラスターリソース管理システム。
- ローカルモード 。 開発とデバッグ用に作成されたローカルモードは、私たちの生活を楽にします。
これらすべてのシステムには、さまざまなタスクや要件に関連する独自の利点があります。
Sparkが1位になったのはなぜですか?
最近Sparkの人気が高まっている理由と、古き良きHadoop MapReduce(以降、単にMR)に取って代わり始めた理由を見てみましょう。
問題は、新しいアーキテクチャアプローチです。これは、従来のMRアプリケーションよりも大幅に優れています。
これは、MRが2000年代に開発され始め、コードRAMが高価であり、64ビットシステムがまだ世界を獲得していないことです。 そのため、開発者は唯一の正しい道を進みました-ハードドライブ(正確には分散HDFSファイルシステム)を介した中間データの交換を実装しました。 つまり、MapフェーズとReduceフェーズの間のすべての中間データがHDFSにダンプされました。 その結果、Hadoopクラスターのノード間のディスクI / Oおよびデータ複製に多くの時間が費やされました。
Sparkは後で、まったく異なる条件で登場しました。 現在、中間データはシリアル化されてRAMに保存され、ノード間のデータ交換はネットワークを介して不要な抽象化なしで直接行われます。 ディスクI / Oはまだ使用されている(シャッフルステージで)と言う価値があります。 しかし、その強度ははるかに小さいです。
さらに、JVMの最適化により、Sparkタスクの初期化と起動がはるかに高速になりました。 MapReduceはタスクごとに新しいJVMを起動し、その後のすべての結果(すべてのJARファイルのダウンロード、JITコンパイルなど)を行います。一方、Sparkは各ノードで実行中のJVMを保持し、RPC呼び出しを介してタスクの起動を制御します。
最後に、SparkはRDD抽象化(Resilient Distributed Dataset)を使用します。これはMapReduceよりも汎用的です。 公平性のために、カスケードがあると言わなければなりませんが。 これは、MRのラッパーであり、柔軟性を高めるように設計されています。
さらに、もう1つの非常に重要な状況があります。Sparkを使用すると、バッチ処理タスクだけでなく、データストリームの操作(ストリーム処理)のためのアプリケーションを開発できます。 単一のアプローチと単一のAPIを提供します(ただし、わずかな違いはあります)。
そして、コードではどのように見えますか?
Spark APIはオフィスでよく文書化されています。 siteですが、ストーリーの整合性のために、ローカルで実行できる例で簡単に説明しましょう。
各ユーザーについて、ユーザーがアクセスしたサイトの総数を計算します。 そして、降順でソートされた上位10を返します。
public class UsersActivities { public static void main( String[] args ) { final JavaSparkContext sc = new JavaSparkContext( new SparkConf() .setAppName("Spark user-activity") .setMaster("local[2]") //local - . .set("spark.driver.host", "localhost") // ); // sc.textFile("users-visits.log"); // parallelize(); List<String> visitsLog = Arrays.asList( "user_id:0000, habrahabr.ru", "user_id:0001, habrahabr.ru", "user_id:0002, habrahabr.ru", "user_id:0000, abc.ru", "user_id:0000, yxz.ru", "user_id:0002, qwe.ru", "user_id:0002, zxc.ru", "user_id:0001, qwe.ru" //, :) ); JavaRDD<String> visits = sc.parallelize(visitsLog); // : (user_id), (1 - ) // (user_id:0000 : 1) JavaPairRDD<String, Integer> pairs = visits.mapToPair( (String s) -> { String[] kv = s.split(","); return new Tuple2<>(kv[0], 1); } ); // user_id JavaPairRDD<String, Integer> counts = pairs.reduceByKey( (Integer a, Integer b) -> a + b ); // Value 10 List<Tuple2<String, Integer>> top10 = counts.takeOrdered( 10, new CountComparator() ); System.out.println(top10); } // , Serializable. ( ), //SparkException: Task not serializable //http://stackoverflow.com/questions/29301704/apache-spark-simple-word-count-gets-sparkexception-task-not-serializable public static class CountComparator implements Comparator<Tuple2<String, Integer>>, Serializable { @Override public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { return o2._2()-o1._2(); } } }
はい、Spark APIはScala、Java、Pythonで利用可能であると言っておく価値があります。 それでも、それはもともとScala専用に設計されました。 なるほど、プロジェクトではJava 8を使用しており、一般的には非常に満足しています。 理由が分からなくなるまで岩に行きます。
次の記事では、ストリーム処理の詳細、それが必要な理由、プロジェクトでの使用方法、SparkSQLについて詳しく説明します。