spark.mlでのパイピング

今日は、バージョン1.2に登場したspark.mlという新しいパッケージについてお話したいと思います。 機械学習アルゴリズム用の単一の高レベルAPIを提供するように設計されており、作成と構成を簡素化し、複数のアルゴリズムを単一のパイプラインまたはワークフローに結合します。 現在、ヤードにはバージョン1.4.1があり、開発者はパッケージがアルファ版ではないと主張していますが、多くのコンポーネントにはまだExperimentalまたはDeveloperApiというラベルが付いています。



さて、新しいパッケージで何ができるのか、どれだけ良いのかを確認しましょう。 まず、spark.mlで導入された基本的な概念を理解する必要があります。



1. MLデータセット -spark.mlはspark.sqlを使用して、 spark.sqlパッケージのDataFrameデータを操作します。 DataFrameは、名前付き列としてデータが保存される分散コレクションです。 概念的には、DataFrameはリレーショナルデータベースのテーブルまたはRやPythonのフレームなどのデータ型に相当しますが、内部ではより高度な最適化が行われます。 (作業の例と方法は、この記事の後半で説明します)。



2. トランスフォーマー (修飾子)は、1つのDataFrameを別のDataFrameに変換できる任意のアルゴリズムです。 たとえば、一連の特性(機能)を予測(予測)に変換するため、訓練されたモデルは修飾子です。



3. Estimator (評価アルゴリズム)は、DataFrameからTransformerへの変換を実行できるアルゴリズムです。 たとえば、学習アルゴリズムは評価アルゴリズムでもあります。これは、学習用のデータセットを取得し、出力で学習済みモデルを作成するためです。



4. パイプライン -任意の数の修飾子と評価アルゴリズムを組み合わせて機械学習ワークフローを作成するパイプライン。



5. Paramは、修飾子と推定アルゴリズムがパラメーターの設定に使用する一般的なタイプです。



説明されているインターフェイスによると、各Estimatorには、DataFrameを受け入れ、Transformerを返すfitメソッドが必要です。 次に、Transformerには、1つのDataFrameを別のDataFrameに変換する変換メソッドが必要です。



スケーラブルな機械学習コースでは、実験室の作業の1つで、線形回帰について話している教師が、「オーディオ特性のセットによって曲が作成された年を決定する」という問題を解決しました。 データ処理と最適なモデルの評価と検索の両方のために、非常に多くのメソッドを実装しています。 これは、機械学習の主なプロセスをより詳細に学生に理解させるために行われましたが、spark.mlパッケージがどのように私たちの生活を楽にしているかを確認しましょう。



実験室での作業では、準備されたわずかにトリミングされたデータが提供されました。 しかし、すべての方法に関心があるため、 生データセットを取得することをお勧めします 。 フォームの各行:

2007, 45.17809 46.34234 -40.65357 -2.47909 1.21253 -0.65302 -6.95536 -12.20040 17.02512 2.00002 -1.87785 9.85499 25.59837 1905.18577 3676.09074 1976.85531 913.11216 1957.52415 955.98525 942.72667 439.85991 591.66138 493.40770 496.38516 33.94285 -255.90134 -762.28079 -66.10935 -128.02217 198.12908 -34.44957 176.00397 -140.80069 -22.56380 12.77945 193.30164 314.20949 576.29519 -429.58643 -72.20157 59.59139 -5.12110 -182.15958 31.80120 -10.67380 -8.13459 -122.96813 208.69408 -138.66307 119.52244 -17.48938 75.58779 93.29243 85.83507 47.13972 312.85482 135.50478 -32.47886 49.67063 -214.73180 -77.83503 -47.26902 7.58366 -352.56581 -36.15655 -53.39933 -98.60417 -82.37799 45.81588 -16.91676 18.35888 -315.68965 -3.14554 125.45269 -130.18808 -3.06337 42.26602 -9.04929 26.41570 23.36165 -4.36742 -87.55285 -70.79677 76.57355 -7.71727 3.26926 -298.49845 11.49326 -89.21804 -15.09719
      
      



年が最初になり、12個の数字が平均トーンで、最後の78個がトーンの共分散です。



まず、このデータをDataFrameに取り込む必要がありますが、最初にデータ形式をわずかに変換します。

  val sc = new SparkContext("local[*]", "YearPrediction") val rawData: RDD[(Double, linalg.Vector, linalg.Vector)] = sc.textFile("data/YearPredictionMSD.txt") .map(_.split(',')) .map(x => ( x.head.toDouble, Vectors.dense(x.tail.take(12).map(_.toDouble)), Vectors.dense(x.takeRight(78).map(_.toDouble)) ))
      
      





各RDD要素は、年と2つの特性ベクトルを含むタプルです。DataFrameを取得するには、もう1つの変換を実行する必要があります。

  val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawDF: DataFrame = labeledPointsRDD.toDF("label", "avg", "cov")
      
      





sqlContextを作成し、暗黙的な変換メソッドをプルアップしたことに注意してください(この場合、 import sqlContext.implicits.rddToDataFrameHolder





記述できますimport sqlContext.implicits.rddToDataFrameHolder





import sqlContext.implicits.rddToDataFrameHolder





)toDFメソッドを使用します。 列名も指定しましたが、データ構造は次のようになります。

  label | avg | cov -------|-----------------------------------------|--------------------------------------------- 2001 | [49.94357 21.47114 73.07750 8.74861... | [10.20556 611.10913 951.08960 698.11428... -------|-----------------------------------------|--------------------------------------------- 2007 | [50.57546 33.17843 50.53517 11.5521... | [44.38997 2056.93836 605.40696 457.4117...
      
      





線形回帰で使用される勾配法は、特性の値のばらつきの影響を受けやすいため、トレーニングの前にデータを正規化または標準化する必要があります。 これらの目的のために、spark.ml.featureパッケージにはStandardScalerとNormalizerの2つのクラスがあります。

  import org.apache.spark.ml.feature.{Normalizer, StandardScalerModel, StandardScaler} val scalerAvg: StandardScalerModel = new StandardScaler() .setWithMean(true) .setWithStd(true) .setInputCol("avg") .setOutputCol("features") //    ,    //   (    ) .fit(rawDF) val normAvg: Normalizer = new Normalizer() .setP(2.0) .setInputCol("avg") .setOutputCol("features")
      
      





StandardScalerは推定器であることに注意してください。これは、Transformer(この場合はStandardScalerModel)を取得するためにfitメソッドを呼び出す必要があることを意味します。 DataFrameで動作するすべてのクラスには、2つの一般的なメソッドがあります。

setInputCol-データを読み取る列の名前を設定します

setOutputCol-変換されたデータが書き込まれる列の名前を示します。



この場合のこれらのクラスの動作の結果としての違いは、スケーラーが-1から1の範囲のデータを返し、ノーマライザーが0から1の範囲のデータを返すことです。操作アルゴリズムの詳細については、 こちらこちらをご覧ください



トレーニングサンプルを準備しました(またはデータ処理に使用する修飾子を受け取りました)。次に、出力でトレーニングモデルを提供する評価アルゴリズム(Estimator)を作成する必要があります。 ほぼ標準の設定を行っていますが、この段階では特に興味深いものではありません。

  import org.apache.spark.ml.regression.LinearRegression val linReg = new LinearRegression() .setFeaturesCol("features") .setLabelCol("label") .setElasticNetParam(0.5) .setMaxIter(500) .setRegParam(1e-10) .setTol(1e-6)
      
      





これで、簡単なコンベアを組み立てるのに必要なすべてが揃いました。

  import org.apache.spark.ml.Pipeline val pipeline = new Pipeline().setStages(Array( normAvg, linReg ))
      
      





Pipelineには、トレーニングサンプルが到着したときに指定された順序で実行されるステップの配列を受け入れるsetStagesメソッドがあります。 今、私たちに残っているのは、データをトレーニングとテストのサンプルに分割することを忘れないことです:

  val splitedData = rawDF.randomSplit(Array(0.8, 0.2), 42).map(_.cache()) val trainData = splitedData(0) val testData = splitedData(1)
      
      





作成したパイプラインを実行して、その作業の結果を評価しましょう。

  val pipelineModel = pipeline.fit(trainData) val fullPredictions = pipelineModel.transform(testData) val predictions = fullPredictions.select("prediction").map(_.getDouble(0)) val labels = fullPredictions.select("label").map(_.getDouble(0)) val rmseTest = new RegressionMetrics(predictions.zip(labels)).rootMeanSquaredError > (2003.0,1999.6153819348176) (1997.0,2000.9207184703566) (1996.0,2000.4171327880172) (1997.0,2002.022142263423) (2000.0,1997.6327888556184) RMSE: 10,552024
      
      





この段階では、すべてが既に明確になっているはずです。モデルを評価するために、既製のRegressionMetricsクラスを使用し、おなじみのRMSE推定とともに、他の基本的な推定も実装されていることに注意してください。



先に進む:スケーラブルな機械学習コースでは、ソースを次数2の多項式に変換することで新しい特性を作成しました。spark.ml開発者もこれを処理しました。 主なことは、このプロセスで混乱せず、列の名前を正しく示すことです。

  import org.apache.spark.ml.feature.PolynomialExpansion //  ,      "features"       "polyFeatures" val polynomAvg = new PolynomialExpansion() .setInputCol("features") .setOutputCol("polyFeatures") .setDegree(2) //         linReg.setFeaturesCol("polyFeatures") //       val pipeline = new Pipeline().setStages(Array( normAvg, polynomAvg, linReg ))
      
      







これまで、トレーニングに使用したのは12の特性のみでしたが、生データにはさらに78があったことを覚えています。おそらくそれらを組み合わせてみますか? この場合、spark.mlにはVectorAssemblerソリューションがあります。 決定したら、次のことを行います。

  import org.apache.spark.ml.feature.VectorAssembler val assembler = new VectorAssembler() .setInputCols(Array("avg", "cov")) .setOutputCol("united") normAvg.setInputCol("united") val pipeline = new Pipeline().setStages(Array( assembler, normAvg, polynomAvg, linReg ))
      
      





データの準備を少し整理しましたが、アルゴリズムに最適なパラメーターを選択するという疑問が残りました。私は本当にそれを手動でやりたくないのです。 この目的のために、spark.mlはCrossValidatorクラスを実装します。 CrossValidatorは、評価アルゴリズム(この場合はlinReg)、テストするパラメーターのセット、および評価ツール(モデルを手動で評価する場合、RMSEを使用しました)を受け入れます。 CrossValidatorは、データセットをいくつかのサンプル(デフォルトではk)に分割し、トレーニングおよび検証サンプルをランダムに選択することで作業を開始します(検証サンプルのサイズは元の1 / kになります)。 次に、各サンプルのパラメーターセットごとに、モデルがトレーニングされ、その有効性が評価され、最適なモデルが選択されます。 CrossValidatorを介したモデルの選択は、かなり時間がかかる操作ですが、ヒューリスティックな手動選択よりも統計的に合理的であることに注意してください。



便宜上、spark.mlに一連のパラメーターを作成するには、ユーティリティクラスParamGridBuilderを使用します。

  import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} val paramGrid: Array[ParamMap] = new ParamGridBuilder() .addGrid(linReg.maxIter, Array(5, 500)) .addGrid(linReg.regParam, Array(1e-15, 1e-10)) .addGrid(linReg.tol, Array(1e-9, 1e-6, 1e-3)) .addGrid(linReg.elasticNetParam, Array(0, 0.5, 1)) .build() val crossVal = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(3) val bestModel = crossVal.fit(trainData) > Best set of parameters: { linReg_3a964d0300fd-elasticNetParam: 0.5, linReg_3a964d0300fd-maxIter: 500, linReg_3a964d0300fd-regParam: 1.0E-15, linReg_3a964d0300fd-tol: 1.0E-9 } Best cross-validation metric: -10.47433119891316
      
      





まあ、それはおそらく線形回帰に関するすべてです。spark.mlの分類およびクラスタリングアルゴリズムには、ワークフローを便利に整理するのに役立つ一連のソリューションも用意されています。



使用した材料:

公式文書

UCI機械学習リポジトリ

スケーラブルな機械学習



All Articles