Sparkローカルモード:通常のラップトップで大きなファイルを処理する

画像

みなさんこんにちは。

1月4日、 Apache Spark 1.6の新しいバージョンがリリースされ 、ビッグデータを処理するための新機能を備えたバグ修正 が行われました。 Habréで、このツールの使用に関する多くの記事を、 プロジェクトの 導入から使用経験まで書きました 。 Sparkはほとんどのオペレーティングシステムで実行され、通常のラップトップでもローカルモードで実行できます。 この場合のSparkセットアップのシンプルさを使用すると、主要な機能を使用しないのは罪です。 この記事では、ラップトップで通常のSQLクエリを使用して大きなファイル(より多くのコンピューターRAM)の処理を迅速に構成する方法について説明します。 これにより、トレーニングを受けていないユーザーでもリクエストを行うことができます。 追加の接続iPython( Jupyter )ノートブックを使用すると、完全なレポートを作成できます。 この記事では、ファイル処理の簡単な例を解析しました; Pythonの他の例はこちらです。



入力データ:注文されたデータを含む数GBのファイル、空きRAM <1 GBのラップトップ。 SQLまたはファイルへの同様の単純なクエリを使用して、さまざまな分析データを取得する必要があります。 ファイルに1か月間の検索クエリの統計が含まれている場合の例を分析しましょう(スクリーンショットのデータは例として示されており、現実とは一致しません)。

画像



特定のサブジェクトのクエリについては、検索クエリの単語数の分布を取得する必要があります。 たとえば、「不動産」という単語が含まれています。 つまり、この例では、単純に検索クエリをフィルター処理し、各クエリの単語数をカウントし、単語数でグループ化し、分布を構築します。

画像



ローカルモードでのSparkのインストールは、メインのオペレーティングシステムとほぼ同じであり、次のアクションになります。

1. Sparkをダウンロードし (この例はバージョン1.6で動作します)、任意のフォルダーに解凍します。



2. Javaをインストールします(インストールされていない場合)

-WindowsおよびMACの場合、java.comからバージョン7をダウンロードしてインストールします

-Linuxの場合:$ sudo apt-get updateおよび$ sudo apt-get install openjdk-7-jdk + .bashrcにJAVAインストールアドレスを追加する必要がある場合があります:JAVA_HOME = "/ usr / lib / jvm / java-7-openjdk-i386 」

Pythonでない場合は、単純にAnacondaをインストールできます。



pySparkを実行します(sparkシェルを実行してネイティブ言語のようにScalaで作業できます):解凍したSparkアーカイブに移動し、binフォルダーでpysparkを実行します(例: spark.apache.org/docs/latest/quick-start.html )。 正常に起動すると、次のものが得られます。

画像



SQLクエリ用にファイルを「準備」する必要があります(Spark 1.6では、一部の種類のファイルでは、テーブルを作成せずに SQLクエリ直接作成できます )。 つまり、DataFrame(DataFrameには多数の便利な関数もあります )を作成し、それからSQLクエリ用のテーブルを作成します。

1.必要なライブラリをダウンロードします

>>> from pyspark.sql import SQLContext, Row >>> sqlContext = SQLContext(sc)
      
      







2.テキスト変数を処理用のソースファイルとして開始し、最初の行を確認します。

 >>> text = sc.textFile('  ') >>> text.first() u'2015-09-01\tu' '\t101753'
      
      







このファイルでは、行はタブで区切られています。 列を正しく分離するために、タブをセパレーターとして使用して、MapおよびSplit関数を使用します:map(lambda l:l.split( '\ t'))。 パーティション結果から必要な列を選択します。 このタスクでは、特定の検索クエリの単語数を知る必要があります。 したがって、クエリ(クエリ列)とその中の単語数(wc列)のみを使用します。map(lambda l:Row(query = l [1]、wc = len(l [1] .split( ''))))。



将来的に任意のSQLクエリを作成するために、テーブルのすべての列を取得できます。

map(lambda l:Row(date = l [0]、query = l [1]、stat = l [2]、wc = len(l [1] .split( ''))))



これらの手順を1行で実行します

 >>> schema = text.map(lambda l: l.split('\t')).map(lambda l: Row(query=l[1], wc=len(l[1].split(' '))))
      
      







スキーマをDataFrameに変換し、多くの有用な処理操作を実行できます( spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operationsの例):

 >>> df = sqlContext.createDataFrame(schema) >>> df.show() +--------------------+---+ | query| wc| +--------------------+---+ | ...| 2| |  | 2| |  | 3| ...
      
      







3. DataFrameをテーブルに変換して、SQLクエリを作成しましょう。

  >>> df.registerTempTable('queryTable')
      
      







4.ファイル全体のSQLクエリを作成し、結果を出力変数にアップロードします。

 >>> output = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable GROUP BY wc').collect()
      
      







700MBの空きRAMを持つ2GBファイルの場合、このようなリクエストには9分かかりました。 プロセス処理は、フォームの行(... of 53)で確認できます。

INFO TaskSetManager:localhost(1/53)で11244ミリ秒でステージ8.0(TID 61)でタスク1.0を完了しました



追加の制限を追加できます。

 >>> outputRealty = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable WHERE query like "%%" GROUP BY wc').collect()
      
      







この分布のヒストグラムを描くことは残っています。 たとえば、出力結果をファイル 'output.txt'に書き込み、分布をExcelで簡単に描画できます。



 >>> with open('output.txt', 'w') as f: ... f.write('wc \t count \n') ... for line in output: ... f.write(str(line[0]) + '\t' + str(line[1]) + '\n')
      
      






All Articles