Apache Sparkの非線形回帰。 自分でやる





信号処理の問題を解決する際、生データを回帰モデルで近似する方法がよく使用されます。 構造に基づいて、モデルは、線形、線形に縮小、および非線形の3つのタイプに分類できます。 Spark ML機械学習モジュールでは、最初の2つのタイプのApache Spark機能は、それぞれLinearRegressionクラスとGeneralizedLinearRegressionクラスで表されます。 標準ライブラリの非線形モデルのトレーニングは提示されておらず、独立した開発が必要です。



まず、非線形モデルを構築するための理論的基礎を簡単に確認し、次にSpark ML拡張機能の実用的な開発に進みます。



ちょっとした数学



線形モデルと比較した非線形モデルの学習は、より複雑なタスクです。 これは、複数の極値の存在および/または応答面の「渓谷」の性質による可能性があります。 非線形関数を使用する主な刺激は、よりコンパクトなモデルを取得できる可能性です。 また、物理学および工学の分野からの多くの分析方程式は、最初は非線形であるため、適切なモデルの使用が強制される場合があることに注意する必要があります。



非線形モデルのトレーニングには、さまざまなツールがあり、その選択は特定の関数の種類、適用される制限の種類や種類などに依存します。この記事では、二次誤差関数と1次の準アルゴリズムであるニュートンガウス法の組み合わせを使用しますニュートン型。 ほとんどの場合、このアルゴリズムの収束はかなり良好です。



ニュートンガウス法の反復ステップは、次の関係によって決定されます。



d =-(J ^ T J)^ {-1} J ^ T r ここで、 Jはヤコビ行列、 rは剰余の列ベクトルです r_i = y_i-f(w; x_i)



示された式は論理的に2つの部分で構成されます:ヘッセ行列の近似 H \約(J ^ T J) および勾配近似 \ nabla f \およそJ ^ T r



ヤコビ行列の行数は、トレーニング例の数n 、列の数は重みベクトルmのサイズによって決まります。 [1]に示すように、ヘッセ行列の近似は、ヤコビ行列の行列の2行を読み取り、それらを乗算することで計算できます。 受け取った n ^ 2 行列サイズ m \回m 折りたたむだけです。 提案された操作の順列は、全体的な計算の複雑さを変更しませんが、ヤコビ行列全体をロードせず、並列に計算を実行することを許可しません。 同様に、勾配の近似が計算され、長さmの n個のベクトルのみが加算されます。 得られたヘッセ行列の反転は、サイズが比較的小さいため、それほど難しくありません。 アルゴリズムの収束を保証するには、計算された行列の正定性を監視する必要があります (J ^ T J) 、固有値とベクトルを計算することで実現されます。



記事[2、3]で、Apache Sparkに上記のアプローチを適用するための一般的なスキームが提案されました。 私の意見では、これらの作品の唯一の欠点は、既存のSpark ML APIとの明確なリンクがないことです。 次のセクションでは、このギャップを埋めようとします。



Spark ML APIの実装



非線形モデルの実装を成功させるには、基本クラスの構造と目的を理解する必要があります。 Sparkシステムの機械学習APIには2つのバージョンがあります。1.xmllibパッケージ内にあり、2.xはmlパッケージ内にあります。 Spark MLモジュール[4]のドキュメントでは、APIバージョン1.xからバージョン2.xへの移行は、チェーン(パイプライン)に埋め込み、型指定されたDataFrame構造で動作する可能性を提供することを目的としています。 この例では、新しいクラス構造を使用しますが、必要に応じて、古い構造の下で非常に簡単に実装できます。



重要なSpark APIクラス





回帰クラスの拡張



org.apache.spark.ml.regression.NonLinearRegressionクラスはRegressorコントラクトを拡張し、トレーニングの結果としてNonLinearRegressionModelのインスタンスを返します。 非線形モデルを作成するには、一意の文字列識別子とNonLinearFunctionモデルのカーネル関数を指定する必要があります。 オプションのパラメーターの中には、トレーニング反復の最大数、係数ベクトルの初期近似、および必要な精度をリストできます。 非線形関数には多くの極値があり、特定の核関数の振る舞いに関する先験的な考え方に基づいた初期近似の選択により、グローバル極値の領域に正確に検索を向けることができます。 Breezeライブラリのメモリ消費が制限された(LBFGS)Broyden-Fletcher-Goldfarb-Shannoアルゴリズムの既製の実装を使用することは注目に値します。 モデルトレーニングコードを以下に示します。



コードorg.apache.spark.ml.regression.NonLinearRegression#train
override protected def train(dataset: Dataset[_]): NonLinearRegressionModel = { // set instance weight to 1 if not defined the column val instanceWeightCol: Column = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select( col($(labelCol)).cast(DoubleType), instanceWeightCol, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } // persists dataset if defined any storage level val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) val costFunc = new SquaresLossFunctionRdd(kernel, instances) val optimizer = new LBFGS[BDV[Double]]($(maxIter), 10, $(tol)) // checks and assigns the initial coefficients val initial = { if (!isDefined(initCoeffs) || $(initCoeffs).length != kernel.dim) { if ($(initCoeffs).length != kernel.dim) logWarning(s"Provided initial coefficients vector (${$(initCoeffs)}) not corresponds with model dimensionality equals to ${kernel.dim}") BDV.zeros[Double](kernel.dim) } else BDV($(initCoeffs).clone()) } val states = optimizer.iterations(new CachedDiffFunction[BDV[Double]](costFunc), initial) val (coefficients, objectiveHistory) = { val builder = mutable.ArrayBuilder.make[Double] var state: optimizer.State = null while (states.hasNext) { state = states.next builder += state.adjustedValue } // checks is method failed if (state == null) { val msg = s"${optimizer.getClass.getName} failed." logError(msg) throw new SparkException(msg) } (Vectors.dense(state.x.toArray.clone()).compressed, builder.result) } // unpersists the instances RDD if (handlePersistence) instances.unpersist() // produces the model and saves training summary val model = copyValues(new NonLinearRegressionModel(uid, kernel, coefficients)) val (summaryModel: NonLinearRegressionModel, predictionColName: String) = model.findSummaryModelAndPredictionCol() val trainingSummary = new NonLinearRegressionSummary( summaryModel.transform(dataset), predictionColName, $(labelCol), $(featuresCol), objectiveHistory, model ) model.setSummary(trainingSummary) }
      
      







trainメソッドの与えられたコードは3つの部分に分けることができます:データセットからトレーニング例を取得する。 ペナルティ関数の開始と最適解の検索。 係数のベクトルと学習結果をモデルインスタンスに保存します。



RegressionModelクラス拡張



org.apache.spark.ml.regression.NonLinearRegressionModelクラスの実装は非常に簡単です。 predictメソッドは、カーネル関数を使用して、ポイントで値を取得します。



 override protected def predict(features: Vector): Double = { kernel.eval(coefficients.asBreeze.toDenseVector, features.asBreeze.toDenseVector) }
      
      





Spark ML APIの注意すべき唯一の要件は、モデルの直列化可能性の要件です。 抽象クラスorg.apache.spark.ml.util。{MLReader、MLWriter} [6]を拡張することにより、コンパニオンオブジェクトで目的の動作が保証されます。 トレーニング済みモデルの状態は、係数ベクトルとカーネル関数の2つの部分で構成されています。 すべてが係数ベクトルですでに発明されている場合、カーネル関数では少し複雑になります。 カーネル関数を直接DataFrameに保存することはできませんが、いくつかの代替オプションがあります。



簡単にするために、関数をBase64文字列にバイナリシリアル化するオプションが選択されました。 欠点には、結果の文字列を読む人がアクセスできないこと、および実装のバージョン管理をサポートする必要があることが含まれます。



より有望なアプローチは、関数をシンボリック形式で保存することです。 これは、R言語の式クラスclass statsパッケージのオブジェクトのイメージで実行できます。たとえば、 log(y)〜a + log(x)です。 この方法は、最初の方法よりも複雑ですが、多くの問題を解決します。機能の人間が読める表現と、下位互換性を維持しながら、異なるバージョンのパーサーでの逆シリアル化の可能性です。 ここで重要な複雑さは、関数のシンボリック式の十分に柔軟なパーサーの開発です。



おそらく有用な改善点は、コア機能を数値的に差別化するためのステップを選択できることでしょう。 モデルの保存の複雑さはそれほど影響を受けません。



二次損失関数の実装



トレーニングに必要な最後の要素は損失関数です。 この例では、2つの実現の形で2次損失関数を使用します。 1つはトレーニング例がBreezeマトリックスの形式で指定され、もう1つはRDD [インスタンス] Spark構造の形式で指定されています。 最初の実装は理解しやすく(行列式を直接使用)、小規模なトレーニングセットに適しています。 それは私たちのテストベンチマークとして機能します。



コードorg.apache.spark.ml.regression.SquaresLossFunctionBreeze
 package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} /** * Breeze implementation for the squares loss function. * * @param fitmodel concrete model implementation * @param xydata labeled data combined into the matrix: * the first n-th columns consist of feature values, (n+1)-th columns contains labels */ class SquaresLossFunctionBreeze(val fitmodel: NonLinearFunction, xydata: BDM[Double]) extends SquaresLossFunction { /** * The number of instances. */ val instanceCount: Int = xydata.rows /** * The number of features. */ val featureCount: Int = xydata.cols - 1 /** * Feature matrix. */ val X: BDM[Double] = xydata(::, 0 until featureCount) /** * Label vector. */ val y: BDV[Double] = xydata(::, featureCount) /** * The model dimensionality (the number of weights). * * @return dimensionality */ override def dim: Int = fitmodel.dim /** * Evaluates loss function value and the gradient vector * * @param weights weights * @return (loss function value, gradient vector) */ override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val r: BDV[Double] = diff(weights) (0.5 * (rt * r), gradient(weights)) } /** * Calculates a positive definite approximation of the Hessian matrix. * * @param weights weights * @return Hessian matrix approximation */ override def hessian(weights: BDV[Double]): BDM[Double] = { val J: BDM[Double] = jacobian(weights) posDef(Jt * J) } /** * Calculates the Jacobian matrix * * @param weights weights * @return the Jacobian */ def jacobian(weights: BDV[Double]): BDM[Double] = { val gradData = (0 until instanceCount) map { i => fitmodel.grad(weights, X(i, ::).t).toArray } BDM(gradData: _*) } /** * Calculates the difference vector between the label and the approximated values. * * @param weights weights * @return difference vector */ def diff(weights: BDV[Double]): BDV[Double] = { val diff = (0 until instanceCount) map (i => fitmodel.eval(weights, X(i, ::).t) - y(i)) BDV(diff.toArray) } /** * Calculates the gradient vector * * @param weights weights * @return gradient vector */ def gradient(weights: BDV[Double]): BDV[Double] = { val J: BDM[Double] = jacobian(weights) val r = diff(weights) 2.0 * Jt * r } }
      
      







2番目のオプションは、分散環境で実行するように設計されています。 計算には、 RDD.treeAggregate関数が使用されます 。これにより、「Map-Reduce」スタイルのアルゴリズムを実装できます。



コードorg.apache.spark.ml.regression.SquaresLossFunctionRdd
 package org.apache.spark.ml.regression import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.feature.Instance import org.apache.spark.rdd.RDD /** * Spark RDD implementation for the squares loss function. * * @param fitmodel concrete model implementation * @param xydata RDD with instances */ class SquaresLossFunctionRdd(val fitmodel: NonLinearFunction, val xydata: RDD[Instance]) extends SquaresLossFunction { /** * The model dimensionality (the number of weights). * * @return dimensionality */ override def dim: Int = fitmodel.dim /** * Evaluates loss function value and the gradient vector * * @param weights weights * @return (loss function value, gradient vector) */ override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = { val bcW: Broadcast[BDV[Double]] = xydata.context.broadcast(weights) val (f: Double, grad: BDV[Double]) = xydata.treeAggregate((0.0, BDV.zeros[Double](dim)))( seqOp = (comb, item) => (comb, item) match { case ((loss, oldGrad), Instance(label, _, features)) => val featuresBdv = features.asBreeze.toDenseVector val w: BDV[Double] = bcW.value val prediction = fitmodel.eval(w, featuresBdv) val addedLoss: Double = 0.5 * math.pow(label - prediction, 2) val addedGrad: BDV[Double] = 2.0 * (prediction - label) * fitmodel.grad(w, featuresBdv) (loss + addedLoss, oldGrad + addedGrad) }, combOp = (comb1, comb2) => (comb1, comb2) match { case ((loss1, grad1: BDV[Double]), (loss2, grad2: BDV[Double])) => (loss1 + loss2, grad1 + grad2) }) (f, grad) } /** * Calculates a positive definite approximation of the Hessian matrix. * * @param weights weights * @return Hessian matrix approximation */ override def hessian(weights: BDV[Double]): BDM[Double] = { val bcW = xydata.context.broadcast(weights) val (hessian: BDM[Double]) = xydata.treeAggregate(new BDM[Double](dim, dim, Array.ofDim[Double](dim * dim)))( seqOp = (comb, item) => (comb, item) match { case ((oldHessian), Instance(_, _, features)) => val grad = fitmodel.grad(bcW.value, features.asBreeze.toDenseVector) val subHessian: BDM[Double] = grad * grad.t oldHessian + subHessian }, combOp = (comb1, comb2) => (comb1, comb2) match { case (hessian1, hessian2) => hessian1 + hessian2 } ) posDef(hessian) } }
      
      







プロジェクトの組み立て



元のSpark MLプロジェクトからの開発とテストを簡素化するために、 pom.xmlを少し変更して借用しました。 Sparkのバージョンをリリースされたバージョンの1つ、この場合は2.0.1に修正します。 org.apache.spark:spark-parent_2.11:2.0.1からのPOMファイルの継承に注意する必要があります。これにより、Mavenプラグインの構成を再配置できなくなります。



SparkContextを必要とするテストを実行するには、 org.apache.sparkを追加します。spark -mllib_2.11:2.0.1:test-jar :traits org.apache.spark.mllib.util.MLlibTestSparkContextorg.apache.sparkを依存関係をテストします.ml.util.TempDirectoryは 、対応するテストクラスに実装されています。 また、テストに便利なのは、 SparkFunSuiteなどのコンテキストでの作業に役立つorg.apache.sparkパッケージのSuiteクラスの拡張です



結論の権利について



この記事では取り上げていない点がいくつかありますが、それらの研究は非常に興味深いようです。





現時点では、上記の側面に関する十分な情報はありませんが、すべての共有ソースに感謝します。



完全なコードはGithubで表示できます。



このソリューションの完全かつ包括的なテストはまだ実施されていないため、改善を検討するためのコンセプトおよびトピックとして資料を扱ってください。



コメントや提案については、プライベートメッセージを使用することをお勧めします。コメントはディスカッションに適しています。



ご清聴ありがとうございました。



使用材料



  1. ieeexplore.ieee.org/document/5451114
  2. www.nodalpoint.com/nonlinear-regression-using-spark-part-1-nonlinear-models
  3. www.nodalpoint.com/non-linear-regression-using-spark-part2-sum-of-squares
  4. spark.apache.org/docs/latest/ml-guide.html
  5. github.com/scalanlp/breeze
  6. jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-mllib/spark-mllib-pipelines-persistence.html



All Articles