テストとして、タスク「B. 11月のSberbank Data Science Journeyコンテストの顧客支出予測」。
この記事では、モデルの品質と予測ではなく、プラットフォームのパフォーマンスを比較する側面のみを説明していることをすぐに強調しておく必要があります。
そのため、最初に、C#で実装されたアクションのシーケンスの簡単な説明(コードの一部は以下になります):
1. csvからデータをダウンロードします。 使用されたライブラリはFast Csv Readerでした。
2.消耗品を除外し、月ごとにグループ化します。
3.操作を行っていないカテゴリを各クライアントに追加します。 長い列挙を避けるために、ループインループはブルームフィルターを使用しました 。 ここでC#の実装を見つけました。
4. ハッシュトリックアレイの形成。 C#での完成した実装が見つからなかったため、自分で実装する必要がありました。 これを行うには、 murmurhash3のハッシュ実装をダウンロードして終了します
5.実際に回帰を計算します。
Jupyter Notebook(以降JN)のソリューションは次のようになります(これは測定時間の一部ではないため、ライブラリの接続は省略します)。
%%time # transactions = pd.read_csv('.//JN//SBSJ//transactions.csv') all_cuses = transactions.customer_id.unique() # mcc = pd.read_csv('.//JN//SBSJ//tr_mcc_codes.csv', sep=';') all_mcc = mcc.mcc_code.unique() # transactions = transactions[transactions.amount < 0].copy() transactions['day'] = transactions.tr_day.apply(lambda dt: dt.split()[0]).astype(int) transactions.day += 29 - transactions['day'].max()%30 # transactions['month_num'] = (transactions.day) // 30 train_transactions = transactions[transactions.month_num < 15] # ( ) grid = list(product(*[all_cuses, all_mcc, range(11, 15)])) train_grid = pd.DataFrame(grid, columns = ['customer_id', 'mcc_code', 'month_num']) train = pd.merge(train_grid, train_transactions.groupby(['month_num', 'customer_id', 'mcc_code'])[['amount']].sum().reset_index(), how='left').fillna(0) # for month_shift in range(1, 3): train_shift = train.copy() train_shift['month_num'] = train_shift['month_num'] + month_shift train_shift = train_shift.rename(columns={"amount" : 'amount_{0}'.format(month_shift)}) train_shift = train_shift[['month_num', 'customer_id', 'mcc_code', 'amount_{0}'.format(month_shift)]] train = pd.merge(train, train_shift, on=['month_num', 'customer_id', 'mcc_code'], how='left').fillna(0) train['year_num'] = (train.month_num) // 12 # hashier trick hasher = FeatureHasher(n_features=6, input_type='string') train_sparse = \ hasher.fit_transform(train[['year_num', 'month_num', 'customer_id', 'mcc_code']].astype(str).as_matrix()) train_sparse2 = sparse.hstack([train_sparse, np.log(np.abs(train[['amount_1', 'amount_2']]) + 1).as_matrix(),]) # d = list(train_sparse2.toarray()) # clf = LinearRegression() clf.fit(d, np.log(np.abs(train['amount']) + 1)) # print('Coefficients: \n', clf.coef_) print('Intercept: \n', clf.intercept_) print("\nRMSLE: ") np.sqrt(mse(np.log(np.abs(train['amount']) + 1),clf.predict(d)))
C#の実装について詳しく説明します。 実験により、DataTableなどのクラスはメモリに関して非常に無駄が多いことが示されています。 したがって、Clientクラスの要素の単純なリストが使用されました。
[Serializable] public class Client { private Int32 name; private Int16 period; private Int16 year; private Int16 mcc; private double amount; private double amount1; private double amount2; // get/set ...
さらに、データの読み取りとグループ化:
// List<Client> lTransGrouped = lClientsTrans.AsParallel() .Where(row => row.getAmount() < 0) .GroupBy(row => new { month = (row.getPeriod() + 29 - Convert.ToInt16(maxNumDay) % 30) / 30, // mcc = row.getMcc(), cid = row.getName() }) .Select(grp => new Client( grp.Key.cid, Convert.ToInt16(grp.Key.month), grp.Key.mcc, Math.Log(Math.Abs(grp.Sum(r => r.getAmount())) + 1))).ToList(); lClientsTrans = null;
次に、ブルームフィルターを使用して不足しているタイプの操作を追加します。 それなしでも可能ですが、実行時間は増加します(各タイプの完全な列挙)または使用されるメモリの量(すべてのタイプを行に追加してから集計する場合)。
public static List<Client> addPeriodMcc(List<Client> lTransGrouped, Int16 maxNumMon) { List<Client> lMcc = new List<Client>(); string fnameMcc = @"j:\hadoop\Contest\Contest\tr_mcc_codes.csv"; // mcc_code CsvReader csvMccReader = new CsvReader(new StreamReader(fnameMcc), true, ';'); // while (csvMccReader.ReadNextRecord()) { Int16 mcc = Convert.ToInt16(csvMccReader[0]); lMcc.Add(new Client(0, 0, mcc, 0)); } // mcc List<Client> lNewMcc = new List<Client>(); // ID var lTransCID = lTransGrouped.AsParallel().Select(a => a.getName()).Distinct(); Console.WriteLine("Unique CID: " + lTransCID.Count()); // int capacity = lTransGrouped.Count() * 6; // , var filter = new Filter<string>(capacity); // // foreach (var i in lTransGrouped) filter.Add(i.getName().ToString() + i.getPeriod() + i.getMcc()); // , foreach (var cid in lTransCID) for (Int16 m = 0; m <= maxNumMon; m++) foreach (var mcc in lMcc) if (filter.Contains(cid.ToString() + m.ToString() + mcc.getMcc().ToString()) != true) lNewMcc.Add(new Client(cid, m, mcc.getMcc(), 0)); lTransCID = lMcc = null; Console.WriteLine("Count lNewMcc: " + lNewMcc.Count); Console.WriteLine("Count lTransGrouped: " + lTransGrouped.Count); // List<Client> lTransFull = lNewMcc.Union(lTransGrouped).ToList(); Console.WriteLine("Count lTransFull: " + lTransFull.Count); lTransGrouped = lNewMcc = null; return lTransFull; }
前月のオペレーションを追加する手順:
public static List<Client> addAmounts(List<Client> lTransFull) { List<Client> lTransFullA2; // lTransFullA2 = lTransFull.OrderBy(a => a.getName()) .ThenBy(a => a.getMcc()) .ThenBy(a => a.getYear()) .ThenBy(a => a.getPeriod()).ToList(); int name = 0; int month = 0; int year = 0; int mcc = 0; int i = 0; foreach (var l in lTransFullA2) { name = l.getName(); mcc = l.getMcc(); year = l.getYear(); month = l.getPeriod(); // if (i > 0 && name == lTransFullA2[i - 1].getName() && mcc == lTransFullA2[i - 1].getMcc() && year == lTransFullA2[i - 1].getYear() && month == lTransFullA2[i - 1].getPeriod() + 1) { l.setAmount1(lTransFullA2[i - 1].getAmount()); } // if (i > 1 && name == lTransFullA2[i - 2].getName() && mcc == lTransFullA2[i - 2].getMcc() && year == lTransFullA2[i - 2].getYear() && month == lTransFullA2[i - 2].getPeriod() + 2) { l.setAmount2(lTransFullA2[i - 2].getAmount()); } i++; } return lTransFullA2; }
次に、ハッシュトリック配列に入力し、理解可能なモデル形式でデータを準備し、実際に計算します
int n_features = 6; // Extreme.Mathematics.LinearAlgebra.SparseVector<double> v = Vector.CreateSparse<double>(lTransFullA2.Count); // (hash + ) md = Matrix.Create<double>(lTransFullA2.Count, n_features + 2); // Hashing trick Parallel.For(0, lTransFullA2.Count(), i => hashing_vectorizer(lTransFullA2[i], i, n_features)); for (int i = 0; i < lTransFullA2.Count; i++) { md[i, n_features] = lTransFullA2[i].getAmount1(); md[i, n_features + 1] = lTransFullA2[i].getAmount2(); v.AddAt(i, lTransFullA2[i].getAmount()); } lTransFullA2 = null; GC.Collect(2, GCCollectionMode.Forced); var model = new LinearRegressionModel(v, md); // model.MaxDegreeOfParallelism = 8; model.Compute(); // Console.WriteLine(model.Summarize()); // GC.Collect(2, GCCollectionMode.Forced);
最後に、ハッシュトリックの実装:
public static void hashing_vectorizer(Client f, int i, int n) { int[] x = new int[n]; string s = f.getYear().ToString(); // int idx = getIndx(s, n); x[idx] += calcBit(s); md[i,idx] = x[idx]; s = f.getPeriod().ToString(); idx = getIndx(s, n); x[idx] += calcBit(s); md[i, idx] = x[idx]; s = f.getName().ToString(); idx = getIndx(s, n); x[idx] += calcBit(s); md[i, idx] = x[idx]; s = f.getMcc().ToString(); idx = getIndx(s, n); x[idx] += calcBit(s); md[i, idx] = x[idx]; } // public static int calcBit(string s) { byte b = 0; b = Convert.ToByte(s[0]); for (int i = 1; i < s.Count(); i++) b ^= Convert.ToByte(s[0]); bool result = true; while (b >= 1) { result ^= (b & 0x01) != 0; b = Convert.ToByte(b >> 1); } if (result) return -1; else return 1; } public static int getIndx(string str, int n) { Encoding encoding = new UTF8Encoding(); byte[] input = encoding.GetBytes(str); uint h = MurMurHash3.Hash(input); return Convert.ToInt32(h % n); }
プログラムの結果はほぼ同じです(RMSLE約1.6)。 これは次のようなものです。
次に、最も興味深いテスト結果に進みます。 すべてのテストはi7-2600で実行されました(8スレッドですが、ほとんどの場合1-2が機能しました)。 RAM 12 GB、OS Win7。
実行時間のデータ量への依存性を判断するために、1.7、3.4、5.1、および680万のソースレコード(transactions.csvファイルの内容)で計算が実行されました。 ただし、データ準備中に11〜14か月にわたってフィルタリングが行われたため、グラフにはフィルタリング後のデータ量が表示されます。
ご覧のとおり、C#バージョンは約2倍高速です。 同様の状況は、メモリ消費です。 これは、Visual Studio(C#がデバッグモードで起動された)およびブラウザ(localhost:8888)が占有するメモリを考慮しません。 評価のために、ピーク値が取得されました。
サンプルがさらに増加すると、JNはすでにページファイルの使用を開始していました。その結果、すべてが大幅に遅くなりました。
したがって、ここではRAMがハードリミッターであるため、C#を使用するとJNよりもはるかに高速に大量のデータを処理できることがわかります。
一方、matplotlib視覚化ツールを使用すると、ほとんどその場でデータを分析でき、C#コードではより多くの記述が必要になります。 したがって、メモリ/速度が不足している場合、JNスタックを使用して、限られた選択でモデルをデバッグすることが最善のオプションであり、最終的な実装はすでにC#にあります。