O(1)追加メモリを使用した外部ソート

この記事を読んだ後、O(1)外部メモリを使用した外部ソートを書いたことを思い出しました。 関数は、バイナリファイルと、配列に割り当てることができる最大メモリサイズを受け取りました。



void ext_sort(const std::string filename, const size_t memory)
      
      





追加のディスク領域なしで外部ソートの効果的なパフォーマンスのアルゴリズムを使用しました:



  1. ファイルを使用可能なメモリに収まるブロックに分割します。 これらのブロックBlock_1、Block_2、...、Block_(S-1)、Block_Sを示します。 P = 1に設定します。
  2. Block_Pをメモリに読み込みます。
  3. メモリ内のデータをソートし、Block_Pに書き戻します。 P = P + 1に設定し、P≤Sの場合、Block_Pをメモリに読み込み、この手順を繰り返します。 つまり、ファイルの各ブロックを並べ替えます。
  4. 各ブロックを小さなブロックB_1とB_2に分割します。 これらの各ブロックは、使用可能なメモリの半分を占有します。
  5. 利用可能なメモリの前半でブロックBlock_1のブロックB_1を読み取ります。 Q = 2に設定します。
  6. 利用可能なメモリの後半にあるBlock_QブロックのブロックB_1を読み取ります。
  7. インプレースマージを使用してメモリ内の配列を結合し、メモリの後半をBlock_QブロックのブロックB_1に書き込み、Q≤Sの場合、Q = Q + 1に設定し、Block_QブロックのブロックB_1を使用可能なメモリの後半に読み取り、この手順を繰り返します。
  8. 利用可能なメモリの前半をブロックBlock_1のブロックB_1に書き込みます。 常にメモリ内の要素の半分未満を残し、すべてのブロックとマージするため、ファイル全体のM個の最小要素がメモリのこの部分に格納されます。
  9. Block_SブロックのブロックB_2を使用可能なメモリの後半に読み込みます。 Q = S −1と設定します。
  10. 利用可能なメモリの前半のBlock_QブロックのブロックB_2を読み取ります。
  11. インプレースマージを使用してメモリ内の配列を結合し、使用可能なメモリの前半をBlock_QのブロックB_2に書き込み、Q = Q -1に設定します。 Q≥1の場合、使用可能なメモリの前半のBlock_QブロックのブロックB_2を読み取り、この手順を繰り返します。
  12. 利用可能なメモリの後半をBlock_SブロックのブロックB_2に書き込みます。 手順8と同様に、ファイル全体の最大要素がここに保存されます。
  13. Block_1のブロックB_2からBlock_SのブロックB_1まで、ファイルに新しいブロックを定義し、Block_1からBlock_Sに再び番号を付けます。 各ブロックをブロックB_1とB_2に分割します。 P = 1に設定します。
  14. Block_PブロックのB_1とB_2をメモリに読み込みます。 メモリ内の配列を結合します。 ソートされた配列をBlock_Pに書き込み、P = P +1に設定します。 P≤Sの場合、この手順を繰り返します。
  15. S> 1の場合、ステップ5に戻ります。M個の最小要素と最大要素を選択するたびに、それぞれファイルの先頭と末尾に書き込み、ファイルの中央に到達するまで残りの要素で同じことを行います。


このアルゴリズムの利点は、ディスク上にバッファがないことに加えて、比較的大きな部分でディスクからデータを読み取るため、アルゴリズムが高速化されることです。



C ++でアルゴリズムを実装します。



まず、ブロック数とブロックサイズをバイト単位と要素単位で決定し、メモリを割り当てます。



  const size_t type_size = sizeof(T); const uint64_t filesize = file_size(filename); std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary); const uint64_t chunk_number = filesize / memory; const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size = chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2; std::vector<T> *chunk = new std::vector<T>(chunk_size);
      
      





次にポイント2〜3-各ブロックをソートします。



  for (uint64_t i = 0; i < chunk_number; ++i) { data.seekg(chunk_byte_size * i); data.read((char *) chunk->data(), chunk_byte_size); flat_quick_sort(chunk->begin(), chunk->end()); data.seekp(chunk_byte_size * i); data.write((char *) chunk->data(), chunk_byte_size); }
      
      





ソートは少し後で記述します。



合併に進みます。 下半分:



  int64_t s = chunk_number, start = 0; while (s > 0) { data.seekp(half_chunk_byte_size * start); data.read((char *) chunk->data(), half_chunk_byte_size); for (int64_t q = 1; q < s; ++q) { data.seekg(half_chunk_byte_size * start + chunk_byte_size * q); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * q); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); } data.seekp(half_chunk_byte_size * start); data.write((char *) chunk->data(), half_chunk_byte_size);
      
      





そして同様にトップ:



  data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); for (int64_t q = s - 2; q >= 0; --q) { data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.read((char *) chunk->data(), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.write((char *) chunk->data(), half_chunk_byte_size); } data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size);
      
      





ブロックを再配布し、サイクルを終了し、メモリを解放することを忘れないでください。



  --s; ++start; for (int64_t p = 0; p < s; ++p) { data.seekp(half_chunk_byte_size * start + chunk_byte_size * p); data.read((char *) chunk->data(), chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekg(half_chunk_byte_size * start + chunk_byte_size * p); data.write((char *) chunk->data(), chunk_byte_size); } } delete chunk; }
      
      





全機能
 template<typename T> void ext_sort(const std::string filename, const size_t memory) { const size_t type_size = sizeof(T); const uint64_t filesize = file_size(filename); std::fstream data(filename, std::ios::in | std::ios::out | std::ios::binary); const uint64_t chunk_number = filesize / memory; const size_t chunk_size = memory / type_size - (memory / type_size) % 2, chunk_byte_size = chunk_size * type_size, half_chunk_byte_size = chunk_byte_size / 2, half_chunk_size = chunk_size / 2; std::vector<T> *chunk = new std::vector<T>(chunk_size); for (uint64_t i = 0; i < chunk_number; ++i) { data.seekg(chunk_byte_size * i); data.read((char *) chunk->data(), chunk_byte_size); flat_quick_sort(chunk->begin(), chunk->end()); data.seekp(chunk_byte_size * i); data.write((char *) chunk->data(), chunk_byte_size); } int64_t s = chunk_number, start = 0; while (s > 0) { data.seekp(half_chunk_byte_size * start); data.read((char *) chunk->data(), half_chunk_byte_size); for (int64_t q = 1; q < s; ++q) { data.seekg(half_chunk_byte_size * start + chunk_byte_size * q); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * q); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); } data.seekp(half_chunk_byte_size * start); data.write((char *) chunk->data(), half_chunk_byte_size); data.seekp(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.read((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); for (int64_t q = s - 2; q >= 0; --q) { data.seekg(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.read((char *) chunk->data(), half_chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekp(half_chunk_byte_size * (start + 1) + chunk_byte_size * q); data.write((char *) chunk->data(), half_chunk_byte_size); } data.seekg(half_chunk_byte_size * start + chunk_byte_size * s - half_chunk_byte_size); data.write((char *) (chunk->data() + half_chunk_size), half_chunk_byte_size); --s; ++start; for (int64_t p = 0; p < s; ++p) { data.seekp(half_chunk_byte_size * start + chunk_byte_size * p); data.read((char *) chunk->data(), chunk_byte_size); in_place_merge(chunk->begin(), chunk->begin() + half_chunk_size - 1, chunk->begin() + chunk_size); data.seekg(half_chunk_byte_size * start + chunk_byte_size * p); data.write((char *) chunk->data(), chunk_byte_size); } } delete chunk; }
      
      







関数flat_quick_sortおよびin_place_mergeの実装は残ります。 ripatti habraiserの 記事で、フラットクイックソートのアイデア(およびほとんどの実装)を取り上げました 。 中央値の中央値(平均的なケースでは過剰と見なされます)を中央値9に置き換え、配列の小さな部分に挿入を使用した並べ替えを追加しました。



flat_quicksort.h
 #ifndef EXTERNAL_SORT_FLAT_QUICKSORT_H #define EXTERNAL_SORT_FLAT_QUICKSORT_H template<class ForwIt> void insertion_sort(ForwIt be, ForwIt en) { for (ForwIt ii = be + 1; ii != en; ii++) { ForwIt jj = ii - 1; auto val = *ii; while (jj >= be and *jj > val) { *(jj + 1) = *jj; --jj; } *(jj + 1) = val; } } template<class ForwIt> ForwIt median_of_3(ForwIt it1, ForwIt it2, ForwIt it3) { return (*it1 > *it2) ? (*it3 > *it2) ? (*it1 > *it3) ? it3 : it1 : it2 : (*it3 > *it1) ? (*it2 > *it3) ? it3 : it2 : it1; } template<class ForwIt> ForwIt choose_pivot(ForwIt be, ForwIt en) { ptrdiff_t s = (en - be) / 8; ForwIt mid = be + (en - be) / 2; ForwIt best1 = median_of_3(be, be + s, be + 2 * s), best2 = median_of_3(mid - s, mid, mid + s), best3 = median_of_3( en - 2 * s, en - s, en); return median_of_3(best1, best2, best3); } // search for the end of the current block template<class ForwIt> ForwIt block_range(ForwIt be, ForwIt en) { ForwIt it = be; while (it != en) { if (*be < *it) return it; ++it; } return it; } // warning: after the partition outer pivot may point to random element template<class ForwIt> std::pair<ForwIt, ForwIt> partition_range(ForwIt be, ForwIt en, ForwIt pivot) { std::pair<ForwIt, ForwIt> re; ForwIt j = be; for (ForwIt i = be; i != en; ++i) if (*i < *pivot) { if (pivot == i) pivot = j; else if (pivot == j) pivot = i; std::swap(*j, *i); ++j; } re.first = j; for (ForwIt i = j; i != en; ++i) if (*pivot == *i) { if (pivot == i) pivot = j; else if (pivot == j) pivot = i; std::swap(*j, *i); ++j; } re.second = j; return re; } // makes largest element the first template<class ForwIt> void blockify(ForwIt be, ForwIt en) { for (ForwIt it = be; it != en; ++it) if (*be < *it) std::swap(*be, *it); } template<class ForwIt> void flat_quick_sort(ForwIt be, ForwIt en) { ForwIt tmp = en; // the current end of block while (be != en) { if (std::is_sorted(be, tmp)) { be = tmp; tmp = block_range(be, en); continue; } if (tmp - be < 32) insertion_sort(be, tmp); else { ForwIt pivot = choose_pivot(be, tmp); std::pair<ForwIt, ForwIt> range = partition_range(be, tmp, pivot); blockify(range.second, tmp); tmp = range.first; } } } #endif //EXTERNAL_SORT_FLAT_QUICKSORT_H
      
      







マージはより困難でした。 まず、O(n)メモリを使用してスタブを使用しました。



 template<typename T> void merge(std::vector<T> &chunk, size_t s, size_t q, size_t r) { std::vector<T> *chunk2 = new std::vector<T>(q - s + 1); auto it2 = chunk2->begin(), it1 = chunk.begin() + q + 1, it = chunk.begin() + s; std::copy(it, it1, it2); while (it2 != chunk2->end() && it1 != chunk.begin() + r + 1) { if (*it1 > *it2) { *it = *it2; ++it2; } else { *it = *it1; ++it1; } ++it; } if (it1 == chunk.begin() + r + 1) std::copy(it2, chunk2->end(), it); else std::copy(it1, chunk.begin() + r + 1, it); delete chunk2; }
      
      





スタブをインプレースバージョンに置き換えたいと思ったとき、高速インプレースマージアルゴリズムはほとんど紛らわしいことがわかりました(たとえば、 オンプレースおよび最適インプレースマージを参照)。 もっとシンプルなものが必要だったので、インプレースマージ用のシンプルなアルゴリズムの記事からアルゴリズムを選択しました



インプレースマージ
 template<typename Iter> void merge_B_and_Y(Iter z, Iter y, Iter yn) { for (; z < y && y < yn; ++z) { Iter j = std::min_element(z, y); if (*j <= *y) std::swap(*z, *j); else { std::swap(*z, *y); ++y; } } if (z < y) flat_quick_sort(z, yn); } template<typename Iter> Iter find_next_X_block(Iter x0, Iter z, Iter y, size_t k, size_t f, Iter b1, Iter b2, auto max) { auto min1 = max, min2 = max; Iter m = x0 + (ptrdiff_t) floor((z - x0 - f) / k) * k + f, x = x0; if (m <= z) m += k; for (auto i = m; i + k <= y; i += k) { if (i != b1 && i != b2) { Iter j = (i < b1 && b1 < i + k) ? m - 1 : i + k - 1; if (*i <= min1 && *j <= min2) { x = i; min1 = *i; min2 = *j; } } } return x; } template<typename Iter> void in_place_merge(Iter x0, Iter y0, Iter yn, int64_t k, bool rec) { if (k == -1) k = (int64_t) sqrt(yn - x0); size_t f = (y0 - x0) % k; Iter x = (f == 0) ? y0 - 2 * k : y0 - k - f; auto t = *x, max = *std::max_element(x0, yn); *x = *x0; Iter z = x0, y = y0, b1 = x + 1, b2 = y0 - k; int i = 0; while (y - z > 2 * k) { ++i; if (*x <= *y || y >= yn) { *z = *x; *x = *b1; ++x; if ((x - x0) % k == f) if (z < x - k) b2 = x - k; x = find_next_X_block(x0, z, y, k, f, b1, b2, max); } else { *z = *y; *y = *b1; ++y; if ((y - y0) % k == 0) b2 = y - k; } ++z; *b1 = *z; if (z == x) x = b1; if (z == b2) b2 = yn + 1; ++b1; if ((b1 - x0) % k == f) b1 = (b2 == yn + 1) ? b1 - k : b2; } *z = t; if (rec) merge_B_and_Y(z, y, yn); else { flat_quick_sort(z, y); in_place_merge(z,y,yn,(int64_t)sqrt(k),true); } }
      
      







しかし、私のコンピューターでは、マージをインプレースマージに置き換えると、アルゴリズムがほぼ1桁遅くなりました。 実装を間違えたか、単純さを追求して遅いアルゴリズムを選択しただけかもしれません。 いつものように、それを整理する時間がありませんでした。さらに、gprofが何らかの形で落ちました。 そして、それは私に夜明けをもたらしました。 Mバイトの動的メモリを割り当てる場合、どのように使用しても問題ありませんが、O(1)が得られます。 次に、データに2/3を選択し、マージバッファに3番目を選択します。 スローダウンはずっと少なくなります。 そして真実は:

アルゴリズム 時間(7.5MBのメモリで75MB int64) 速度(7.5MBのメモリで75MB int64) 時間(75KBのメモリに7.5MB int64) 速度(75KBのメモリで7.5MB int64) 時間(75MBのメモリで750MB int64) 速度(75MBのメモリで750MB int64)
インプレースマージ 6.04329秒 1,241,045 B / s 24.2993秒 3 086 508 B / s - -
マージ 0.932663秒 8 041 489 B / s 2.73895秒 27382756 B / s 47.7946秒 15 691 689 B / s
アルゴリズムSLY_G 1.79629秒 4175272 B / s 3.84775秒 19 491 910 B / s 39.77秒 18858436 B / s
残念ながら、大容量の場合、バッファがまったく使用されないため、アルゴリズムの速度が低下します。 それでも、アルゴリズムの速度は非常に適切であり、改善できると確信しています。



すべてのソースはこちらです。



All Articles