Rとスパーク

画像 Sparkは、クラスターコンピューティング用のApacheプロジェクトであり、機械学習を含むデータ処理用の高速で汎用性の高い環境です。 SparkにはRの APISparkRパッケージ)もあり、これはSparkディストリビューション自体の一部です。 ただし、このAPIでの作業に加えて、 RSparkを使用するためのさらに2つの方法があります 合計で、Sparkクラスターと対話する3つの異なる方法があります。 この投稿では、各方法の主な機能の概要を説明し、いずれかのオプションを使用して、 Azure HDInsightにデプロイされたSparkクラスター上の少量のテキストファイル(3.5 GB、1400万行)で最も単純な機械学習モデルを構築します。



Sparkインタラクションの概要



機械学習機能が弱い公式のSparkRパッケージ(バージョン1.6.2には1つのモデルのみがあり、バージョン2.0.0には4つのモデルがあります)に加えて、 Sparkへのアクセスにはさらに2つのオプションがあります。



最初のオプションは、 Microsoftの製品-Microsoft R Server for Hadoopを使用することです。これは、最近Sparkサポートを統合しました。 この製品を使用すると、ローカルコンピューティング、 Hadoopmap-reduce )またはSparkのコンテキストで同じR関数を使用して計算を実行できます。 RのローカルインストールとSparkクラスターへのアクセスに加えて、 Microsoft Azure HDInsightクラウドサービスを使用すると、既製のクラスターを展開でき、通常のSparkクラスターに加えて、 RサーバーをSparkクラスターに展開できます。 このサービスは、追加の境界ノードにHadoop用RサーバーがプリインストールされたSparkクラスターです。これにより、このサーバー上でローカルに計算を実行するか、 SparkまたはHadoopコンテキストに切り替えることができます。 この製品の使用については、 Microsoft WebサイトのHDInsightの公式ドキュメントに詳しく説明されています



2番目のオプションは、まだ開発中の新しいsparklyrパッケージを使用することです。 この製品は、最も有用で必要なパッケージの一部がリリースされているRStudio(knitr、ggplot2、tidyr、lubridate、dplyrなど)の後援の下で開発されているため、このパッケージは別のリーダーになります。 このパッケージはまだ正式にリリースされていないため、まだ十分に文書化されていません。



これらのSparkの各作業方法に関するドキュメントと実験に基づいて、各メソッドの一般化された機能を備えた次の表(表1)を準備しました(もう少し可能性があったSparkR 2.0.0も追加しました)。



画像

表1. Sparkと対話するさまざまな方法の可能性の概要



表からわかるように、すぐに必要なニーズを完全に実現するツールはありませんが、 sparklyrパッケージはSparkRおよびR Server比較して有利です。 その主な利点は、 csvjsonhdfsから寄木細工のファイルを読み取ることです。 dplyrデータ操作構文と完全に互換性があります-フィルタリング操作、列選択、集約関数、データのマージ、列名の変更などを実行する機能など。 これらのタスクの一部が実行されないか、非常に不便なHadoopの SparkRまたはRサーバーとは異なり( HadoopのRサーバーでは、オブジェクトのデータマージはまったくなく、組み込みのxdfデータタイプでのみサポートされます)。 このパッケージのもう1つの利点は、Rコードから直接Javaメソッドを実行する関数を作成できることです。







count_lines <- function(sc, file) { spark_context(sc) %>% invoke("textFile", file, 1L) %>% invoke("count") } count_lines(sc, "/text.csv")
      
      





これにより、 Sparkの既存のjavaメソッドを使用するか、自分で実装することで、パッケージにない機能を実装できます。



そして、もちろん、機械学習モデルの数は、 SparkR (バージョン2.0であっても)およびHadoop用Rサーバーの数よりもはるかに多くなっています 。 したがって、このパッケージを最も有望で使いやすいものとして選択しましょう。 Sparkクラスターは、5種類のクラスター( HBaseStormHadoopSparkRpark on Spark )の展開を提供するAzure HDInsightクラウドサービスを使用して、最小限の労力でさまざまな構成で展開されました。



使用したリソース





環境設定



最初に、 Sparkクラスターを展開します-2つのD12v2ヘッドノードと4つのD12v2作業ノードを含む構成を選択しました。 (D12v2:4コア/ 28 GBのRAM、200 GBのディスク。この構成は完全に最適ではありませんが、 sparklyrは構文のデモに適しています)。 さまざまな種類のクラスターを展開する方法とそれらを操作する方法の説明は、 HDInsightのドキュメントに記載されています。 作業ノードへのSSH接続を使用してクラスターを正常にデプロイした後、RとRStudioを必要な依存関係とともにそこにインストールします。 sparklyrパッケージの追加機能(Sparkの元のデータフレームを表示する追加のウィンドウ、およびプロパティまたは自身を表示する機能)があるため、RStudioのプレビューエディターを使用することをお勧めします。 R、R Studioをインストールした後、 localhost:8787へのトンネリングを使用して接続をリセットします。



したがって、 localhost:8787のブラウザーで RStudioに接続し、作業を続行します。



データ準備



このタスクのすべてのコードは、この投稿の最後に記載されています。



このテストタスクでは、 NYC Taxi TripsにあるNYC Taxiデータセットのcsvファイルを使用します。 データは、タクシーの乗車とその支払いに関する情報です。 情報提供を目的として、1か月に制限します。 同じ完全なデータセット上で、 Rad for Hadoopを使用して( Hadoopのコンテキストで)モデルを構築する方法については、 Microsoft R ServerとHDInsightを使用したNYCタクシーデータの探索を参照してください 。 ただし、ファイルの読み取り、すべての前処理(データフィルタリング、テーブルマージ)はHiveで実行され、Rサーバーではモデルを作成したばかりで、ここではすべてがsparklyrを使用して通常のRで実行されます。



両方のファイルをSparkクラスターのhdfsに移動し、 sparklyr関数を使用して、これらのファイルを読み取ります。



データ操作



旅行と運賃のファイルはキーで接続されています-「 メダリオン 」、「 hack_licence 」、「 pickup_datetime 」の列なので、 データデータフレームの左側、つまり運賃 データフレームに添付します。 データと操作を組み合わせた後、データフレームを寄せ木細工の形式で保存します。 モデルを構築する前に、データを見てみましょう。このために、2000個のランダムな観測値のサンプルを作成し、collectを使用してRに渡します。 この小さなサンプルを使用して、標準ダイアグラムggplot2 (チップ対料金、ポイントのサイズを示します-乗客の数によってルートの距離とポイントの色を示し、支払いタイプとタクシーオペレーターによってグリッドパネルに分割されます)(図1)。



画像

図1主な依存関係を示す図



運賃のチップサイズには依存性(請求書の「標準」%として線形)があり、ほとんどの支払いはクレジットカード(CRDパネル)と現金(CSHパネル)を使用して行われ、現金で支払う場合、チップは常に欠席(これはおそらく、現金で支払う場合、チップはすでに価格に含まれていますが、カードで支払う場合は含まれていないためです)。 したがって、トレーニングのサンプルでは、​​クレジットカードで支払われた旅行のみを残しています。 便利なdplyr構文とmagrittr パイピングを使用して、結合されたデータフレームはチェーンに渡されます:行(外れ値と非論理値を除く)と列(モデルの構築に必要なものだけを残して)の後続の選択、最終的なデータセットを線形回帰関数に渡します。 モデルをトレーニングするために、すべてのデータの70%を使用し、テストでは残りの30%を使用します。 このタスクでは、単純な線形回帰を使用します。 検出したい依存関係は、トリップのパラメーターに対するチップサイズです。 このデータのこのモデルはかなり退化しており、まったく正確ではありません(多数のヒントが0にあります)が、単純であり、モデルの解釈された係数を表示し、 sparklyrの基本的な機能を示すことができます。 モデルでは、次の予測変数を使用します。vendor_id-タクシーオペレーターの識別子、 パッセンジャー_count-乗客数、 trip_time_in_secs-トリップ時間、 trip_distance-トリップ距離、 payment_type-支払タイプ、 fare_amount-トリップ価格、 surcharg e-料金 トレーニングの結果、モデルは次の形式になります。



 Call: ml_linear_regression(., response = "tip_amount", features = c("vendor_id", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount", "surcharge")) Deviance Residuals: (approximate): Min 1Q Median 3Q Max -27.55253 -0.33134 0.09786 0.34497 31.35546 Coefficients: Estimate Std. Error t value Pr(>|t|) (Intercept) 3.2743e-01 1.4119e-03 231.9043 < 2e-16 *** vendor_id_VTS -1.0557e-01 1.1408e-03 -92.5423 < 2e-16 *** passenger_count -1.0542e-03 4.1838e-04 -2.5197 0.01175 * trip_time_in_secs 1.3197e-04 2.0299e-06 65.0140 < 2e-16 *** trip_distance 1.0787e-01 4.7152e-04 228.7767 < 2e-16 *** fare_amount 1.3266e-01 1.9204e-04 690.7842 < 2e-16 *** surcharge 1.4067e-01 1.4705e-03 95.6605 < 2e-16 *** --- Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1 R-Squared: 0.6456 Root Mean Squared Error: 1.249
      
      





このモデルを使用して、テストサンプルの値を予測します。



結論



この記事では、RでSparkを操作する3つの方法の主な機能について説明し、 sparklyrパッケージを使用してファイルの読み取り、その前処理、操作、および単純な機械学習モデルの構築を実装する例を示します。



ソースコード
 devtools::install_github("rstudio/sparklyr") library(sparklyr) library(dplyr) spark_disconnect_all() sc <- spark_connect(master = "yarn-client") data_tbl<-spark_read_csv(sc, "data", "taxi/data") fare_tbl<-spark_read_csv(sc, "fare", "taxi/fare") fare_tbl <- rename(fare_tbl, medallionF = medallion, hack_licenseF = hack_license, pickup_datetimeF=pickup_datetime) taxi.join<-data_tbl %>% left_join(fare_tbl, by = c("medallion"="medallionF", "hack_license"="hack_licenseF", "pickup_datetime"="pickup_datetimeF", )) taxi.filtered <- taxi.join %>% filter(passenger_count > 0 , passenger_count < 8 , trip_distance > 0 , trip_distance <= 100 , trip_time_in_secs > 10 , trip_time_in_secs <= 7200 , tip_amount >= 0 , tip_amount <= 40 , fare_amount > 0 , fare_amount <= 200, payment_type=="CRD" ) %>% select(vendor_id,passenger_count,trip_time_in_secs,trip_distance, fare_amount,surcharge,tip_amount)%>% sdf_partition(training = 0.7, test = 0.3, seed = 1234) spark_write_parquet(taxi.filtered$training, "taxi/parquetTrain") spark_write_parquet(taxi.filtered$test, "taxi/parquetTest") for_plot<-sample_n(taxi.filtered$training,1000)%>%collect() ggplot(data=for_plot, aes(x=fare_amount, y=tip_amount, color=passenger_count, size=trip_distance))+ geom_point()+facet_grid(vendor_id~payment_type) model.lm <- taxi.filtered$training %>% ml_linear_regression(response = "tip_amount", features = c("vendor_id", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount", "surcharge")) print(model.lm) summary(model.lm) predicted <- predict(model.lm, newdata = taxi.filtered$test) actual <- (taxi.filtered$test %>% select(tip_amount) %>% collect())$tip_amount data <- data.frame(predicted = predicted,actual = actual)
      
      








All Articles