みなさんこんにちは。
1月4日、 Apache Spark 1.6の新しいバージョンがリリースされ 、ビッグデータを処理するための新機能を備えた
入力データ:注文されたデータを含む数GBのファイル、空きRAM <1 GBのラップトップ。 SQLまたはファイルへの同様の単純なクエリを使用して、さまざまな分析データを取得する必要があります。 ファイルに1か月間の検索クエリの統計が含まれている場合の例を分析しましょう(スクリーンショットのデータは例として示されており、現実とは一致しません)。
特定のサブジェクトのクエリについては、検索クエリの単語数の分布を取得する必要があります。 たとえば、「不動産」という単語が含まれています。 つまり、この例では、単純に検索クエリをフィルター処理し、各クエリの単語数をカウントし、単語数でグループ化し、分布を構築します。
ローカルモードでのSparkのインストールは、メインのオペレーティングシステムとほぼ同じであり、次のアクションになります。
1. Sparkをダウンロードし (この例はバージョン1.6で動作します)、任意のフォルダーに解凍します。
2. Javaをインストールします(インストールされていない場合)
-WindowsおよびMACの場合、java.comからバージョン7をダウンロードしてインストールします
-Linuxの場合:$ sudo apt-get updateおよび$ sudo apt-get install openjdk-7-jdk + .bashrcにJAVAインストールアドレスを追加する必要がある場合があります:JAVA_HOME = "/ usr / lib / jvm / java-7-openjdk-i386 」
Pythonでない場合は、単純にAnacondaをインストールできます。
pySparkを実行します(sparkシェルを実行してネイティブ言語のようにScalaで作業できます):解凍したSparkアーカイブに移動し、binフォルダーでpysparkを実行します(例: spark.apache.org/docs/latest/quick-start.html )。 正常に起動すると、次のものが得られます。
SQLクエリ用にファイルを「準備」する必要があります(Spark 1.6では、一部の種類のファイルでは、テーブルを作成せずに SQLクエリを直接作成できます )。 つまり、DataFrame(DataFrameには多数の便利な関数もあります )を作成し、それからSQLクエリ用のテーブルを作成します。
1.必要なライブラリをダウンロードします
>>> from pyspark.sql import SQLContext, Row >>> sqlContext = SQLContext(sc)
2.テキスト変数を処理用のソースファイルとして開始し、最初の行を確認します。
>>> text = sc.textFile(' ') >>> text.first() u'2015-09-01\tu' '\t101753'
このファイルでは、行はタブで区切られています。 列を正しく分離するために、タブをセパレーターとして使用して、MapおよびSplit関数を使用します:map(lambda l:l.split( '\ t'))。 パーティション結果から必要な列を選択します。 このタスクでは、特定の検索クエリの単語数を知る必要があります。 したがって、クエリ(クエリ列)とその中の単語数(wc列)のみを使用します。map(lambda l:Row(query = l [1]、wc = len(l [1] .split( ''))))。
将来的に任意のSQLクエリを作成するために、テーブルのすべての列を取得できます。
map(lambda l:Row(date = l [0]、query = l [1]、stat = l [2]、wc = len(l [1] .split( ''))))
これらの手順を1行で実行します
>>> schema = text.map(lambda l: l.split('\t')).map(lambda l: Row(query=l[1], wc=len(l[1].split(' '))))
スキーマをDataFrameに変換し、多くの有用な処理操作を実行できます( spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operationsの例):
>>> df = sqlContext.createDataFrame(schema) >>> df.show() +--------------------+---+ | query| wc| +--------------------+---+ | ...| 2| | | 2| | | 3| ...
3. DataFrameをテーブルに変換して、SQLクエリを作成しましょう。
>>> df.registerTempTable('queryTable')
4.ファイル全体のSQLクエリを作成し、結果を出力変数にアップロードします。
>>> output = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable GROUP BY wc').collect()
700MBの空きRAMを持つ2GBファイルの場合、このようなリクエストには9分かかりました。 プロセス処理は、フォームの行(... of 53)で確認できます。
INFO TaskSetManager:localhost(1/53)で11244ミリ秒でステージ8.0(TID 61)でタスク1.0を完了しました
追加の制限を追加できます。
>>> outputRealty = sqlContext.sql('SELECT wc, COUNT(*) FROM queryTable WHERE query like "%%" GROUP BY wc').collect()
この分布のヒストグラムを描くことは残っています。 たとえば、出力結果をファイル 'output.txt'に書き込み、分布をExcelで簡単に描画できます。
>>> with open('output.txt', 'w') as f: ... f.write('wc \t count \n') ... for line in output: ... f.write(str(line[0]) + '\t' + str(line[1]) + '\n')