MySQLからPostgreSQLへのデータの移行

データベースを操作し、その長所と短所に慣れると、あるDBMSから別のDBMSに移行する決定が下される瞬間が生じます。 この場合、サービスをMySQLからPostgreSQLに転送するという問題が発生しました。 PostgreSQLバージョン9.2への移行から期待される利点の短いリストを以下に示します(機能の詳細なリストはこちらにあります )。



原則として、既存のソリューションは、ターゲットデータベースの構文に従って変換される既製のSQLダンプを使用することに依存しています。 ただし、場合によっては(大量の情報を持つアクティブに使用されるWebアプリケーション)、このオプションは、DBMSからSQLダンプを作成し、変換し、結果のダンプをDBMSに再度ロードするための一定の時間コストがかかります。 したがって、コンバーターのオンラインオプション(DBMSからDBMSへのストレート)がより最適になり、サービスのダウンタイムを大幅に削減できます。



実装の言語はC ++(C ++ 11xの機能を含む)で、MySQLとPostgreSQLに接続するためのライブラリはネイティブで使用され、Qt CreatorはIDEとして使用されました。



移行アルゴリズムは次のとおりです。 受信者データベースは、ソースデータベースの構造に対応するテーブル構造を既に作成していることがわかります。 データ転送用のテーブルのリストが生成され、スレッドプールに配布されます。 各スレッドには、ソースデータベースと宛先データベースへの接続があります。 つまり いくつかのテーブルが並行して転送されます。 利益!



従来、アプリケーションには特定のフレームワーク(他のコンポーネントが依存するシステムコンポーネントのセット)があり、構成ファイル、ログ、エラーハンドラ、メモリマネージャなどを操作します。 この場合、最も必要なものだけが問題の解決に使用されます。 まず、いくつかの基本型および複合型が(便宜上)のみ再定義されました(はい、私は知っています、aliテンプレート使用できましたが、次のようになりました):

単純型
typedef bool t_bool; typedef char t_char; typedef unsigned char t_uchar; typedef signed char t_schar; typedef int t_int; typedef unsigned int t_uint; typedef float t_float; typedef double t_double;
      
      





地図
 template<typename T, typename U> class CMap : public std::map<T, U> { public: CMap(); virtual ~CMap(); }; template<typename T, typename U> CMap<T, U>::CMap() { } template<typename T, typename U> CMap<T, U>::~CMap() { }
      
      





ベクトル
 template<typename T> class CVector : public std::vector<T> { public: CVector(); virtual ~CVector(); }; template<typename T> CVector<T>::CVector() { } template<typename T> CVector<T>::~CVector() { }
      
      





fstream
 class CFileStream : public std::fstream { public: CFileStream(); virtual ~CFileStream(); };
      
      





明示的なパターンのうち、シングルトンのみが使用されます。

クラシックシングルトンマイヤーズ
 template<typename T> class CSingleton { public: static T* instance(); void free(); protected: CSingleton(); virtual ~CSingleton(); }; template<typename T> T* CSingleton<T>::instance() { static T *instance = new T(); return instance; } template<typename T> void CSingleton<T>::free() { delete this; } template<typename T> CSingleton<T>::CSingleton() { } template<typename T> CSingleton<T>::~CSingleton() { }
      
      





タスク(別のスレッドで実行される)およびシステム(タスクの実行を開始する)の基本クラス:

task.h
 class CTask { public: CTask(); virtual ~CTask(); void execute(); t_uint taskID(); t_bool isExecuted(); protected: virtual void executeEvent() = 0; private: t_uint m_task_id; t_bool m_executed; };
      
      





task.cpp
 CTask::CTask() : m_executed(false) { static t_uint task_id = 0; m_task_id = task_id++; } CTask::~CTask() { } void CTask::execute() { executeEvent(); m_executed = true; } t_uint CTask::taskID() { return m_task_id; } t_bool CTask::isExecuted() { return m_executed; }
      
      





system.h
 class CSystem { public: CSystem(); virtual ~CSystem() = 0; protected: void executeTask(CTask *task); };
      
      





system.cpp
 CSystem::CSystem() { } CSystem::~CSystem() { } void CSystem::executeTask(CTask *task) { CTask& task_ref = *task; std::thread thread([&]() { task_ref.execute(); }); thread.detach(); }
      
      





基本型の検討の最後に、文字列クラスを最初から記述する必要があるため、一部の操作(部分文字列の置換と連結)で、追加のメモリ割り当てやいくつかのこと(文字列を数値に変換)せずに送信バッファ(少し後)を操作できるようになりましたおよび文字列内の数字)からクラスのメンバーを作成します(クラス宣言のみが提供されます)。

string.h
 class CString { public: CString(const t_char *data = nullptr); CString(const CString& s); ~CString(); const t_char* ptr() const; void setPtr(t_char *p); CString& operator= (const CString& s); CString operator+ (const t_char *p) const; CString operator+ (t_char c) const; CString operator+ (const CString& s) const; friend CString operator+ (const t_char *p, const CString& s); CString& operator+= (const t_char *p); CString& operator+= (t_char c); CString& operator+= (const CString& s); t_bool operator== (const CString& s) const; t_bool operator!= (const CString& s) const; t_bool operator< (const CString& s) const; t_bool operator> (const CString& s) const; t_bool operator<= (const CString& s) const; t_bool operator>= (const CString& s) const; t_char& at(t_uint index); t_char at(t_uint index) const; t_uint length() const; t_bool isEmpty() const; void clear(); t_int search(const CString& s, t_uint from = 0) const; CString substr(t_uint from, t_int count = -1) const; CString replace(const CString& before, const CString& after) const; static CString fromNumber(t_uint value); static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr); CVector<CString> split(const CString& splitter) const; t_bool match(const CString& pattern) const; static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer); static t_uint lengthPtr(const t_char *src); static t_uint concatenatePtr(const t_char *src, char *buffer); private: t_char *m_data; t_uint length(const t_char *src) const; t_char* copy(const t_char *src) const; t_char* concatenate(const t_char *src0, t_char c) const; t_char* concatenate(const t_char *src0, const t_char *src1) const; t_int compare(const t_char *src0, const t_char *src1) const; }; CString operator+ (const t_char *p, const CString& s);
      
      





必然的に、アプリケーションにとっては、「Hello、world」よりも少しだけ、ログと構成ファイルです。 各タスクは、テーブルの処理時にメッセージをログに書き込むため、ログにメッセージを書き込む方法ではミューテックスが使用されました。 ログへの書き込みがアプリケーションのボトルネックからはほど遠いため、小さな粒度のロックとロックフリーアルゴリズムは考慮されませんでした。

log.h
 class CLog : public CSingleton<CLog> { public: enum MessageType { Information, Warning, Error }; CLog(); virtual ~CLog(); void information(const CString& message); void warning(const CString& message); void error(const CString& message); private: std::mutex m_mutex; CFileStream m_stream; void writeTimestamp(); void writeHeader(); void writeFooter(); void writeMessage(MessageType type, const CString& message); };
      
      





log.cpp
 CLog::CLog() { m_stream.open("log.txt", std::ios_base::out); writeHeader(); } CLog::~CLog() { writeFooter(); m_stream.flush(); m_stream.close(); } void CLog::information(const CString& message) { writeMessage(Information, message); } void CLog::warning(const CString& message) { writeMessage(Warning, message); } void CLog::error(const CString& message) { writeMessage(Error, message); } void CLog::writeTimestamp() { time_t rawtime; tm *timeinfo; t_char buffer[32]; time(&rawtime); timeinfo = localtime(&rawtime); strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo); m_stream << buffer << " "; } void CLog::writeHeader() { writeMessage(Information, "Log started"); } void CLog::writeFooter() { writeMessage(Information, "Log ended"); } void CLog::writeMessage(MessageType type, const CString& message) { std::lock_guard<std::mutex> guard(m_mutex); writeTimestamp(); switch (type) { case Information: { m_stream << "Information " << message.ptr(); break; } case Warning: { m_stream << "Warning " << message.ptr(); break; } case Error: { m_stream << "Error " << message.ptr(); break; } default: { break; } } m_stream << "\n"; m_stream.flush(); }
      
      





config.h
 class CConfig : public CSingleton<CConfig> { public: CConfig(); virtual ~CConfig(); CString value(const CString& name, const CString& defvalue = "") const; private: CFileStream m_stream; CMap<CString, CString> m_values; };
      
      





config.cpp
 CConfig::CConfig() { m_stream.open("mysql2psql.conf", std::ios_base::in); if (m_stream.is_open()) { CString line; const t_uint buffer_size = 256; t_char buffer[buffer_size]; while (m_stream.getline(buffer, buffer_size)) { line = buffer; if (!line.isEmpty() && line.at(0) != '#') { t_int pos = line.search("="); CString name = line.substr(0, pos); CString value = line.substr(pos + 1); m_values.insert(std::pair<CString, CString>(name, value)); } } m_stream.close(); CLog::instance()->information("Config loaded"); } else { CLog::instance()->warning("Can't load config"); } } CConfig::~CConfig() { } CString CConfig::value(const CString& name, const CString& defvalue) const { CMap<CString, CString>::const_iterator iter = m_values.find(name); if (iter != m_values.end()) { return iter->second; } return defvalue; }
      
      





mysql2psql.conf
 # MySQL connection mysql_host=localhost mysql_port=3306 mysql_database=mysqldb mysql_username=root mysql_password=rootpwd mysql_encoding=UTF8 # PostgreSQL connection psql_host=localhost psql_port=5432 psql_database=psqldb psql_username=postgres psql_password=postgrespwd psql_encoding=UTF8 # Migration # (!) Note: source_schema == mysql_database source_schema=mysqldb destination_schema=public tables=* use_insert=0 # Other settings threads=16
      
      





さて、PostgreSQLにデータを追加するのはどうですか。 2つのオプションがあります-INSERTクエリを使用するには、パフォーマンス(トランザクションメカニズムの機能)の点で大きなデータセットであまりうまく表示されませんでした。または、送信の最後に特別なマーカー(ターミネータ文字)を送信してデータの一部を継続的に送信できるCOPYコマンドを使用します。 別のニュアンスは、PostgreSQLのタイプ決定(テーブルのフィールド)に関連しています。 ドキュメントは、ドキュメントの行間に読み物がないことを示していなかったため、人間が読み取れる型の識別子を返す方法が示されていなかったため、対応するoid(データベース内の各オブジェクトのほぼ一意の識別子)が型でコンパイルされました:



 case 20: // int8 case 21: // int2 case 23: // int4 case 1005: // int2 case 1007: // int4 case 1016: // int8 case 700: // float4 case 701: // float8 case 1021: // float4 case 1022: // float8 case 1700: // numeric case 18: // char case 25: // text case 1002: // char case 1009: // text case 1015: // varchar case 1082: // date case 1182: // date case 1083: // time case 1114: // timestamp case 1115: // timestamp case 1183: // time case 1185: // timestamptz case 16: // bool case 1000: // bool
      
      







タスクの準備と実行は次のとおりです。



各タスクでは、50 MBの3つの静的バッファーが割り当てられ、COPYコマンド用のデータが準備されます(特殊文字のエスケープとフィールド値の連結)。

タスク準備を含むコードスニペット
 // create connection pool t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1")); CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads)); if (!createConnectionPool(threads - 1)) { return false; } // create tasks CString destination_schema = CConfig::instance()->value("destination_schema"); t_uint range_begin = 0; t_uint range_end = 0; t_uint range = m_tables.size() / threads; for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j) { range_begin = i; range_end = i + range; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end)); m_migration_tasks.push_back(std::move(task)); } range_begin = range_end + 1; range_end = m_tables.size() - 1; std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end)); // executing tasks for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { executeTask(m_migration_tasks.at(i).get()); } task->execute(); // wait for completion for (t_uint i = 0; i < m_migration_tasks.size(); ++i) { while (!m_migration_tasks.at(i)->isExecuted()) { } }
      
      







COPYのデータタスクの準備を含むコードスニペット
 t_uint count = 0; t_char *value; CString copy_query = "COPY " + m_destination_schema + "." + table + " ( "; m_buffer[0] = '\0'; m_buffer_temp0[0] = '\0'; m_buffer_temp1[0] = '\0'; if (result->nextRecord()) { for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { copy_query += ", "; CString::concatenatePtr("\t", m_buffer); } copy_query += result->columnName(i); if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } copy_query += " ) FROM STDIN"; if (!m_destination_connection->copyOpen(copy_query)) { CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError()); return false; } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; while (result->nextRecord()) { m_buffer[0] = '\0'; for (t_uint i = 0; i < result->columnCount(); ++i) { if (i != 0) { CString::concatenatePtr("\t", m_buffer); } if (!result->isColumnNull(i)) { value = result->columnValuePtr(i); CString::replacePtr(value, "\\", "\\\\", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\b", "\\b", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\f", "\\f", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\n", "\\n", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\r", "\\r", m_buffer_temp0); CString::replacePtr(m_buffer_temp0, "\t", "\\t", m_buffer_temp1); CString::replacePtr(m_buffer_temp1, "\v", "\\v", m_buffer_temp0); CString::concatenatePtr(m_buffer_temp0, m_buffer); } else { CString::concatenatePtr("\\N", m_buffer); } } CString::concatenatePtr("\n", m_buffer); if (!m_destination_connection->copyDataPtr(m_buffer)) { CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError()); return false; } ++count; if (count % 250000 == 0) { CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":\t\ttable " + table + " processing, record count: " + CString::fromNumber(count)); } } }
      
      









結果


2 GBのデータをPostgreSQLに転送するのに約10分かかり、WALアーカイブを有効にしました(16スレッドが作成されました)。



考えるべきこと




ソースコード


ソースコードはgithubで入手できます。



All Articles