LevelDBを使用する

同僚がSQLite、MemcacheDB、Redisを使用して、LevelDB、Sophia、HamsterDBなどの埋め込みリポジトリを無視し、ローカルの永続的なキーと値のストレージを整理する状況に直面しました。



この記事を2つの部分に分けました。

  1. LevelDB APIの簡単な紹介
  2. LevelDBを使用して時系列を保存します。






LevelDBとそのAPI



LevelDBのいくつかのプロパティ:





開閉


発見:

#include <assert> #include "leveldb/db.h" leveldb::DB* db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db); assert(status.ok());
      
      







閉鎖:

 delete db;
      
      







オプション:

説明 デフォルト値
コンパレータ キーコンパレータ BytewiseComparator、 memmcpを内部で使用
create_if_missing 欠落している場合はベースを作成します
error_if_exists ベースが存在する場合、エラーをスローします
paranoid_checks 積極的なデータベース整合性チェック
env I / O操作が実行される環境 Env ::デフォルト()
write_buffer_size 書き込みバッファサイズ 4MB
max_open_files 開いているファイルの数 1000
block_cache ブロックに特別なキャッシュを使用する NULL、8MBの内部キャッシュを作成して使用します
block_size ブロック内のユーザーデータのおおよその量 4K
圧縮 ブロック圧縮 kSnappyCompression
filter_policy (ブルーム)フィルターは、ディスクの読み取り操作を削減します。 ヌル




スライス


スライスは、多くのメソッドでキーと値を表す構造です。 Sliceにはデータへのポインターとデータのサイズが含まれていますが、Sliceにはデータ用のバッファーが含まれていないため、このような状況は許可しないでください。

 leveldb::Slice slice; if (...) { std::string str = ...; slice = str; } Use(slice);
      
      





leveldb ::スライスには、使いやすいようにいくつかのコンストラクタがあります。

 Slice(const char* d, size_t n) : data_(d), size_(n) { } Slice(const std::string& s) : data_(s.data()), size_(s.size()) { } Slice(const char* s) : data_(s), size_(strlen(s)) { }
      
      





データを取得する方法

 const char* leveldb::Slice::data() const; char leveldb::Slice::operator[](size_t n) const; std::string leveldb::Slice::ToString() const;
      
      







ステータス




起こりうるエラーについて通知するために、LevelDBのほとんどの関数はステータスを返します。

ステータスごとに、関数が正常に完了したかどうかを確認し、エラーの説明を取得できます。

 leveldb::Status s = ...; if (!s.ok()) cerr << s.ToString() << endl;
      
      







読み取り、書き込み、削除


署名の追加、取得、削除:

 Status leveldb::DB::Put(const WriteOptions& options, const Slice& key, const Slice& value); Status leveldb::DB::Get(const ReadOptions& options, const Slice& key, std::string* value); Status leveldb::DB::Delete(const WriteOptions& options, const Slice& key);
      
      





使用例:

 std::string value; leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value); if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value); if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);
      
      







イテレータ




イテレータは、leveldb :: Iteratorクラスで表され、次のインターフェイスを備えています。

 bool leveldb::Iterator::Valid() const; void leveldb::Iterator::SeekToFirst(); void leveldb::Iterator::SeekToLast(); void leveldb::Iterator::Seek(const Slice& target); void leveldb::Iterator::Next(); void leveldb::Iterator::Prev(); Slice leveldb::Iterator::key() const; Slice leveldb::Iterator::value() const; Status leveldb::Iterator::status() const;
      
      







イテレータインターフェイスは、順次および任意のメソッドを提供します

アクセス。 シリアルアクセスは、順方向または逆方向のいずれかです。

方向。



 leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl; } assert(it->status().ok()); // Check for any errors found during the scan delete it;
      
      





 for (it->Seek(start); it->Valid() && it->key().ToString() < limit; it->Next()) { ... }
      
      





 for (it->SeekToLast(); it->Valid(); it->Prev()) { ... }
      
      







LevelDBを使用して時系列を保存します。





私の仕事は監視システムに関連しているため、時系列と時系列データベースが登場しました



次の2つの操作に限定しています。





データスキーマ




次のスキームは、キーと値のタイプのストレージにメトリックを格納するために使用されます:キー=メトリック+タイムスタンプ+タグ(タグはオプションです)。

HBase上で実行されるOpenTSDBも同様に構成されます

OpenTSDBの中には、メトリックuidスキームとデータスキームがあります。 ここでも同じ原則が関係します。



1つのデータベースを使用して、メトリック識別子を保存します。 ここでのキーはsize_tの数値で、値はCスタイルの文字列です。



3番目のベースはデータに使用され、ここでのキーはフォームの構造です:

 struct Key { size_t muid; time_t timestamp; };
      
      





値はdoubleとして保存されます。 ここでは、LevelDBのキーと値がバイトの配列であるという事実が最大限に使用され、

つまり、シリアル化なしで単純なデータ構造を使用できます。



ストレージインターフェース




 #pragma once #include <ctime> #include <cstdint> #include <memory> #include <unordered_map> namespace leveldb { class DB; class Iterator; class Slice; class Comparator; class Cache; } /*! *   */ class Storage { public: class Iterator; typedef size_t MetricUid; /*! *   */ struct Key { MetricUid muid; //!< uid  time_t timestamp; //!<  }; /*! *  * @param dir         * @param cacheSizeMb    */ Storage(const std::string& dir, size_t cacheSizeMb = 16); /*! * @brief  . * @param name   * @return    * *     UID'   UID . *     ,   UID  */ MetricUid addMetric(const std::string& name); /*! *   * @param muid   * @param timestamp   * @param value  * @return true    */ bool put(MetricUid muid, time_t timestamp, double value); /*! *       * @param muid   * @param from   * @param to   * @return  */ Iterator get(MetricUid muid, time_t from, time_t to); Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; private: /*! *   uid  */ void initUID(); /*! *   */ void initData(); private: /*! *    UID */ MetricUid m_currentIndx; /*! *   */ std::string m_dir; /*! *    */ size_t m_cacheSizeMb; /*! *    */ std::shared_ptr<leveldb::Cache> m_dataCache; /*! *  UID' */ std::shared_ptr<leveldb::DB> m_uid; /*! *   */ std::shared_ptr<leveldb::DB> m_data; /*! *   -> uid */ std::unordered_map<std::string, MetricUid> m_metric2uid; }; /*! *      */ class Storage::Iterator { public: typedef std::tuple<time_t, double> Value; typedef std::shared_ptr<leveldb::Iterator> IteratorPrivate; Iterator(); Iterator(const IteratorPrivate& iter, const Key& limit); /*! *     * @return true    */ bool valid() const; /*! *   * @return  <, > */ Value value() const; /*! *     */ void next(); private: IteratorPrivate m_iter; //!<  LevelDB Key m_limit; //!<      };
      
      







ストレージコンストラクターは、uidを含むベースとデータを含むデータベースが配置されるディレクトリへのパス、つまりキャッシュブロックのサイズを受け入れます。



実装




コンパレーターから始めましょう。 memcmpは数値の比較には適していません。 構造をキーとして使用することにより、コードはシンプルで読みやすくなります。

 namespace { class TimeMeasurementComporator: public leveldb::Comparator { public: int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { const char* dataA = a.data(); const char* dataB = b.data(); const Storage::Key* keyA = reinterpret_cast<const Storage::Key*>(dataA); const Storage::Key* keyB = reinterpret_cast<const Storage::Key*>(dataB); if (keyA->muid < keyB->muid) { return -1; } else if (keyA->muid > keyB->muid) { return 1; } if (keyA->timestamp < keyB->timestamp) { return -1; } else if (keyA->timestamp > keyB->timestamp) { return 1; } return 0; } // Ignore the following methods for now: const char* Name() const { return "TimeMeasurementComporator"; } void FindShortestSeparator(std::string*, const leveldb::Slice&) const { } void FindShortSuccessor(std::string*) const { } }; TimeMeasurementComporator GLOBAL_COMPORATOR; }
      
      







データのデータベースのさらなる初期化/作成:



 void Storage::initData() { DB* data; Options options; options.create_if_missing = true; options.compression = kNoCompression; options.comparator = &GLOBAL_COMPORATOR; if (m_cacheSizeMb) { options.block_cache = leveldb::NewLRUCache(m_cacheSizeMb * 1048576); m_dataCache.reset(options.block_cache); } Status status = DB::Open(options, m_dir + "/data", &data); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_data.reset(data); }
      
      





グローバルコンパレーターはオプションで渡され、圧縮は無効になります。 LelelDBはSnappyなしで行っています。



メトリック識別子を使用したデータベースの初期化:

 void Storage::initUID() { Options options; options.create_if_missing = true; options.compression = kNoCompression; DB* cfg; Status status = DB::Open(options, m_dir + "/conf", &cfg); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_uid.reset(cfg); std::unique_ptr<leveldb::Iterator> it( m_uid->NewIterator(leveldb::ReadOptions())); for (it->SeekToFirst(); it->Valid(); it->Next()) { const size_t* index = reinterpret_cast<const size_t*>(it->key().data()); m_metric2uid[it->value().ToString()] = *index; m_currentIndx = *index; } }
      
      





ここで、データベースが初期化され、UIDのメトリック表示が埋められます。



データの追加は非常に簡単です。

 Storage::MetricUid Storage::addMetric(const std::string& name) { auto result = m_metric2uid.find(name); if (result != m_metric2uid.end()) { return result->second; } ++m_currentIndx; m_metric2uid[name] = m_currentIndx; const auto s = m_uid->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&m_currentIndx), sizeof(m_currentIndx)), name); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return m_currentIndx; } bool Storage::put(MetricUid muid, time_t timestamp, double value) { const Key key = {muid, timestamp}; const auto s = m_data->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&key), sizeof(key)), Slice(reinterpret_cast<char*>(&value), sizeof(value))); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return s.ok(); }
      
      







データ取得は、LevelDBイテレーターのラッパーを作成することにより実装されます。

 Storage::Iterator Storage::get(MetricUid muid, time_t from, time_t to) { const Key begin = {muid, from}; const Key end = { muid, to }; Storage::Iterator::IteratorPrivate iter(m_data->NewIterator(ReadOptions())); iter->Seek(Slice(reinterpret_cast<const char*>(&begin), sizeof(begin))); return Storage::Iterator(iter, end); } Storage::Iterator::Iterator(): m_iter(nullptr) { memset(&m_limit, 0, sizeof(m_limit)); } Storage::Iterator::Iterator(const IteratorPrivate& iter, const Key& limit) : m_iter(iter), m_limit(limit) { } bool Storage::Iterator::valid() const { if(!m_iter) { return false; } const Slice right(reinterpret_cast<const char*>(&m_limit), sizeof(m_limit)); return m_iter->Valid() && (GLOBAL_COMPORATOR.Compare(m_iter->key(),right) < 0); } Storage::Iterator::Value Storage::Iterator::value() const { if(!m_iter) { return Value(0,0); } const Key* data =reinterpret_cast<const Key*>(m_iter->key().data()); double val = *reinterpret_cast<const double*>(m_iter->value().data()); return Value(data->timestamp, val); } void Storage::Iterator::next() { if(m_iter && m_iter->Valid()) { m_iter->Next(); } }
      
      







プロトタイプのソースコードはGitHubにあります



興味深いから:

ディスクにデータを保存するための一般的な最新のアルゴリズムの概要:LevelDB、TokuDB、LMDB、Sophia

ディスクデータ構造、Bツリー、LSMツリー、フラクタルツリーへの深い没入



All Articles