BagriとMongoDBを友達にする方法

約1か月前、HabrにBagriプロジェクトについて説明しました。NoSQL は、分散キャッシュの上に構築されたオープンソースデータベースです。



かなり良い反応をした後、システムの組み込みAPIを使用して拡張機能を作成することでBagriの機能を拡張する方法に関する記事を書くことにしました。



画像








Bagriは現在、外部システムに接続するための2つのAPI、DataFormat APIとDataStore APIを公開しています。



1つ目は、新しい形式のドキュメントを解析してシステムの内部形式に変換すること、およびシステムの内部表現から新しい形式のドキュメントを逆構築することを目的としています。



2番目のAPIは、外部ストレージシステムからドキュメントをロード、保存、削除します。 多くの場合、ドキュメントの新しいソースに接続するには、両方のインターフェイスを実装する必要があります。



MongoDBへのDataStoreコネクタを実装し、ドキュメントストレージシステムとして使用する方法を示します。 この場合、MongoがJSON形式でドキュメントを提供するため、DataFormat APIの実装は必要ありません。これはシステムで最初にサポートされています。



いくつかコメントをしたいだけです。



  1. そのようなコネクタの実際的な利点は? 明らかに、Mongoは集中化されたドキュメントリポジトリとして単純に使用できます。 データが既にMongoに保存されているが、その機能がシステムの機能の開発には不十分になっている場合、 この記事で説明されているスクリプトでも役立ちます。
  2. 私はMongoDBの専門家ではありません。それを使用する最適な方法があれば、喜んで聞きます。


それでは始めましょう。



DataStore APIは、com.bagri.xdm.cache.api.DocumentStoreインターフェースの実装を前提としています。



public interface DocumentStore { /** * Lifecycle method. Invoked when the store initialized. * * @param context the environment context */ void init(Map<String, Object> context); /** * Lifecycle method. Invoked when parent schema is closing */ void close(); /** * Load document from persistent store * * @param key the document key * @return XDM Document instance if corresponding document found, null otherwise */ Document loadDocument(DocumentKey key); /** * Load bunch of documents from persistent store * * @param keys the collection of document keys to load * @return the map of loaded documents with their keys */ Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys); /** * Load document keys. Can do it in synch or asynch way. * * @return iterator over found document keys */ Iterable<DocumentKey> loadAllDocumentKeys(); /** * Stores document to persistent store. * * @param key the document key * @param value the XDM document instance */ void storeDocument(DocumentKey key, Document value); /** * Stores bunch of documents to persistent store * * @param entries the map of document keys and corresponding document instances */ void storeAllDocuments(Map<DocumentKey, Document> entries); /** * Deletes document from persistent store * * @param key the document key */ void deleteDocument(DocumentKey key); /** * Deletes bunch o documents from persistent store * * @param keys the keys identifying documents to be deleted */ void deleteAllDocuments(Collection<DocumentKey> keys); }
      
      





最も簡単な方法は、システムコンテキストにアクセスするためのヘルパーメソッドのセットを提供する抽象DocumentStoreBaseクラスからDocumentStoreインターフェイスを実装するクラスを継承することです。 まず、外部システムに接続して初期化するためのパラメーターを処理します。



Mongoに接続するには、mongodサーバーのアドレス、データベースの名前、およびドキュメントをロードするコレクションの名前が必要です。 これらのパラメーターの名前を定義します:mongo.db.uri、mongo.db.database、mongo.db.collections。 その後、mongoサーバーに接続するための初期化コードは次のようになります。



 public class MongoDBStore extends DocumentStoreBase implements DocumentStore { private MongoClient client; private MongoDatabase db; private Map<String, MongoCollection<org.bson.Document>> clMap = new HashMap<>(); @Override public void init(Map<String, Object> context) { super.setContext(context); String uri = (String) context.get("mongo.db.uri"); MongoClientURI mcUri = new MongoClientURI(uri); client = new MongoClient(mcUri); String dbName = (String) context.get("mongo.db.database"); db = client.getDatabase(dbName); String clNames = (String) context.get("mongo.db.collections"); boolean all = "*".equals(clNames); List<String> clns = Arrays.asList(clNames.split(",")); for (String clName: db.listCollectionNames()) { if (all || clns.contains(clName)) { MongoCollection<org.bson.Document> cln = db.getCollection(clName); clMap.put(clName, cln); } } } @Override public void close() { client.close(); } }
      
      





initメソッドは、コンテキストからmongodサーバーに接続するためのパラメーターを受け取り、接続を確立し、ダウンロード用に宣言されている各コレクションのMongoCollectionオブジェクトをキャッシュします。 ここで、すべてのドキュメントキーをロードするメソッドを実装する必要があります。



  private String getMappingKey(String id, String cln) { return id + "::" + cln; } private String[] getMappingParts(String keyMap) { return keyMap.split("::"); } @Override public Iterable<DocumentKey> loadAllDocumentKeys() { SchemaRepository repo = getRepository(); if (repo == null) { return null; } String id; DocumentKey key; Map<DocumentKey, String> keyMap = new HashMap<>(); for (MongoCollection<org.bson.Document> cln: clMap.values()) { String clName = cln.getNamespace().getCollectionName(); // load _ids only MongoCursor<org.bson.Document> cursor = cln.find().projection(include(“_id”)).iterator(); while (cursor.hasNext()) { org.bson.Document doc = cursor.next(); id = doc.get(“_id”).toString(); // TODO: handle possible duplicates via revisions key = repo.getFactory().newDocumentKey(id, 0, 1); keyMap.put(key, getMappingKey(id, clName)); } } PopulationManagement popManager = repo.getPopulationManagement(); popManager.setKeyMappings(keyMap); return keyMap.keySet(); }
      
      





そして、ロードされたキーに一致するドキュメントをロードするためのメソッド:



  @Override public Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys) { Map<DocumentKey, Document> entries = new HashMap<>(keys.size()); for (DocumentKey key: keys) { Document doc = loadDocument(key); if (doc != null) { entries.put(key, doc); } } return entries; } @Override public Document loadDocument(DocumentKey key) { SchemaRepository repo = getRepository(); Document doc = null; PopulationManagement popManager = repo.getPopulationManagement(); String id = popManager.getKeyMapping(key); if (id == null) { return null; } String[] mParts = getMappingParts(id); Document newDoc = null; int[] clns = null; com.bagri.xdm.system.Collection xcl = repo.getSchema().getCollection(mParts[1]); if (xcl != null) { clns = new int[] {xcl.getId()}; } MongoCollection<org.bson.Document> cln = clMap.get(mParts[1]); Object oid; Date creDate; try { oid = new ObjectId(mParts[0]); creDate = ((ObjectId) oid).getDate(); } catch (IllegalArgumentException ex) { oid = mParts[0]; creDate = new Date(); } org.bson.Document mongoDoc = cln.find(eq("_id", oid)).first(); String content = mongoDoc.toJson(new JsonWriterSettings(true)); try { DocumentManagementl docMgr = (DocumentManagement) repo.getDocumentManagement(); newDoc = docMgr.createDocument(key, mParts[0], content, “JSON”, creDate, "owner", 1, clns, true); } catch (XDMException ex) { // TODO: log error, but do not stop the whole loading } return doc; }
      
      





そもそもこれで十分です;読者自身がメソッドstoreDocument / storeAllDocumentsおよびdeleteDocument / deleteAllDocumentsを実装することを提案します。 また、上記のコードはコネクタの実装プロセスを示すことのみを目的としており、さまざまな例外的な状況や可能な追加の構成パラメータを処理しないことに注意してください。 完全なコネクタコードは、 bagri-extensionsリポジトリから表示およびコンパイルできます。



ここで、DataStoreコネクタを登録し、それを使用するスキームを宣言する必要があります。 これを行うには、コネクタ構成を<BAGRI_HOME> /config/config.xmlファイルのdataStoresセクションに追加する必要があります。



 <dataStore name="mongo"> <version>1</version> <createdAt>2016-08-01T16:17:20.542+03:00</createdAt> <createdBy>admin</createdBy> <description>MongoDB data store</description> <enabled>true</enabled> <storeClass>com.bagri.samples.MongoDBStore</storeClass> <properties> <entry name="mongo.db.uri">mongodb://localhost:27017</entry> <entry name="mongo.db.database">test</entry> <entry name="mongo.db.collections">*</entry> </properties> </dataStore>
      
      





MongoDBを使用した多くの例で説明されているレストランコレクションの例を使用して、コネクタの動作をテストします。 ここに示すように、Mongoでテストドキュメントのコレクションをダウンロードします: docs.mongodb.com/getting-started/shell/import-data 次に、MongoDBを操作するスキームを登録し、このコレクションからデータをロードするように構成します。 同じconfig.xmlファイルで、スキーマのセクションに新しいスキームを追加します。



以下のパラメーターを追加します。



 <entry name="xdm.schema.store.enabled">true</entry> <entry name="xdm.schema.store.type">mongo</entry> <entry name="mongo.db.collections">restaurants</entry>
      
      





新しいパラメーターを含むconfig.xmlのスキーマセクション
 <schema name="Mongo" active="true"> <version>1</version> <createdAt>2016-08-01T21:30:58.096+04:00</createdAt> <createdBy>admin</createdBy> <description>Schema for MongoDB</description> <properties> <entry name="xdm.schema.store.tx.buffer.size">1024</entry> <entry name="xdm.schema.data.backup.read">false</entry> <entry name="xdm.schema.trans.backup.async">0</entry> <entry name="xdm.schema.store.enabled">true</entry> <entry name="xdm.schema.thread.pool">10</entry> <entry name="xdm.schema.data.stats.enabled">true</entry> <entry name="xdm.schema.query.cache">true</entry> <entry name="xdm.schema.store.type">mongo</entry> <entry name="mongo.db.collections">restaurants</entry> <entry name="xdm.schema.format.default">JSON</entry> <entry name="xdm.schema.ports.first">10300</entry> <entry name="xdm.schema.ports.last">10400</entry> <entry name="xdm.schema.population.size">1</entry> <entry name="xdm.schema.population.buffer.size">1000000</entry> <entry name="xdm.schema.data.backup.async">1</entry> <entry name="xdm.schema.store.data.path">../data/mongo</entry> <entry name="xdm.schema.dict.backup.sync">0</entry> <entry name="xdm.schema.trans.backup.sync">1</entry> <entry name="xdm.schema.query.backup.sync">0</entry> <entry name="xdm.schema.buffer.size">16</entry> <entry name="xdm.schema.dict.backup.async">1</entry> <entry name="xdm.schema.dict.backup.read">true</entry> <entry name="xdm.schema.trans.backup.read">false</entry> <entry name="xdm.schema.query.backup.async">0</entry> <entry name="xdm.schema.members">localhost</entry> <entry name="xdm.schema.data.backup.sync">0</entry> <entry name="xdm.schema.partition.count">157</entry> <entry name="xdm.schema.query.backup.read">true</entry> <entry name="xdm.schema.transaction.timeout">0</entry> <entry name="xdm.schema.health.threshold.low">25</entry> <entry name="xdm.schema.health.threshold.high">0</entry> <entry name="xdm.schema.query.parallel">true</entry> <entry name="xdm.schema.partition.pool">32</entry> <entry name="xqj.schema.baseUri">file:///../data/mongo/</entry> <entry name="xqj.schema.orderingMode">2</entry> <entry name="xqj.schema.queryLanguageTypeAndVersion">1</entry> <entry name="xqj.schema.bindingMode">0</entry> <entry name="xqj.schema.boundarySpacePolicy">1</entry> <entry name="xqj.schema.scrollability">1</entry> <entry name="xqj.schema.holdability">2</entry> <entry name="xqj.schema.copyNamespacesModePreserve">1</entry> <entry name="xqj.schema.queryTimeout">0</entry> <entry name="xqj.schema.defaultFunctionNamespace"></entry> <entry name="xqj.schema.defaultElementTypeNamespace"></entry> <entry name="xqj.schema.copyNamespacesModeInherit">1</entry> <entry name="xqj.schema.defaultOrderForEmptySequences">2</entry> <entry name="xqj.schema.defaultCollationUri"></entry> <entry name="xqj.schema.constructionMode">1</entry> </properties> <collections> <collection id="1" name="restaurants"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Mongo restaurants collection</description> <enabled>true</enabled> </collection> </collections> <fragments/> <indexes/> <triggers/> </schema>
      
      







さらに、サーバープロファイルを使用して新しいファイルを作成する必要があります。 <BAGRI_HOME> / configディレクトリで、mongo.propertiesファイルを作成し、そのサーバーが使用するスキームを指定します。



 xdm.cluster.node.schemas=Mongo
      
      





MongoDBサーバーが実行中であり、コネクター設定で指定されたアドレスで接続を待機していることを確認してください。 これでBagriサーバーを起動できます。 <BAGRI_HOME> / binディレクトリで、コマンド> bgcache.cmd mongo(Windowsの場合)または> ./ bgcache.sh mongo(Linuxの場合)を実行します。 このスクリプトは、mongo.propertiesプロファイルの設定で単一のBagriサーバーを起動します。 ダウンロードの最後に、サーバーログに次の行が含まれている必要があります。



画像



コネクタがMongoスキーマを初期化し、外部MongoDBサーバーから25359ドキュメントをロードしたことを示しています。



次に、XQueryクエリを使用してJSONドキュメントを操作する方法を示します。



XQueryクエリをインタラクティブに実行するには、これを実行できるクライアントが必要です。 Bagriには、この機能を提供するVisualVMプラグインが付属しています。 こちらのインストール手順をご覧ください



Bagri管理サーバー<BAGRI_HOME> / bin / bgadminを実行します。 VisualVMアプリケーションを開き、Bagri Manager管理サーバーに接続して、Mongoスキーマを選択します。 [ドキュメント管理]タブでは、ドキュメントとコレクションを操作できます。



画像



、およびXQueryクエリを含む[クエリ管理]タブ。 次の簡単なクエリを実行して、識別子でレストランを選択します。



 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json', 'indent': fn:true()} for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'restaurant_id') = '40362098' return (fn:serialize($map, $props), '\&\#xa;')
      
      





* Habrは有効な改行に変換するため、最後の行の改行文字は\でエスケープされることに注意してください。そのため、クエリを実行するときに\文字を削除する必要があります。



画像



または、料理の種類によるレストランの選択の場合:



 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'cuisine') = 'Bakery' return (fn:serialize($map, $props), '\&\#xa;')
      
      





画像



XQueryを使用すると、Mongoで選択項目を簡単に使用できるようになります(地理インデックスのクエリを除き、ボックスから直接サポートされていません)。



次に、MongoDB:JOINでサポートされていないクエリを示します。 これを行うには、レストランコレクションのデータをより正規化された形式にすることができます。たとえば、レストラン自体のデータからレストランのレビューを分けて、異なるコレクションに保存できます。



このクエリを実行して結果をファイルに保存し、受信したデータをMongoDBのrest-shortコレクションにインポートします。



 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $rest := m:remove($rest, '_id') let $rest := m:remove($rest, 'grades') return (fn:serialize($rest, $props), '\&\#xa;')
      
      





次のクエリはレビューを表示します。 また、それらを別のファイルに保存してから、MongoDBにグレードコレクションにインポートします。



 declare namespace a="http://www.w3.org/2005/xpath-functions/array"; declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $grades := m:get($rest, 'grades') return for $i in (1 to a:size($grades)) let $grade := a:get($grades, $i) let $date := m:get($grade, 'date') return ('{"restaurant_id": "', m:get($rest, 'restaurant_id'), '", "date": ', fn:serialize($date, $props), ', "grade": "', m:get($grade, 'grade'), '", "score": "', m:get($grade, 'score'), '"}', '\&\#xa;')
      
      





次に、スキーマ設定を調整して、ダウンロードする新しいコレクションを宣言します。



 <schema name="Mongo" active="true"> ………... <properties> <entry name="xdm.schema.store.collections">rest-short, grades</entry> ……... </properties> <collections> <collection id="2" name="rest-short"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Restaurant headers collection</description> <enabled>true</enabled> </collection> <collection id="3" name="grades"> <version>1</version> <createdAt>2016-08-01T01:01:26.965+03:00</createdAt> <createdBy>admin</createdBy> <description>Restaurant grades collection</description> <enabled>true</enabled> </collection> </collections> <fragments/> <indexes/> <triggers/> </schema>
      
      





Bagriサーバーを再起動して、新しいデータのコレクションをダウンロードします。 これで、結合の仕組みを確認できます。 次のクエリを実行して、2つのコレクションから完全なレストラン構造を形成します。



 declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'} for $ruri in fn:uri-collection("rest-short") let $rest := fn:json-doc($ruri) let $rid := m:get($rest, 'restaurant_id') let $addr := m:get($rest, 'address') let $txt := ('{"restaurant_id": "', $rid, '", "cuisine": "', m:get($rest, 'cuisine'), '", "name": "', m:get($rest, 'name'), '", "borough": "', m:get($rest, 'borough'), '", "address": ', fn:serialize($addr, $props), ', "grades": [') return ($txt, fn:string-join( for $guri in fn:uri-collection("grades") let $grade := fn:json-doc($guri) let $gid := m:get($grade, 'restaurant_id') where $gid = $rid return fn:serialize(m:remove(m:remove($grade, '_id'), 'restaurant_id'), $props), ', '), ']}\&\#xa;')
      
      





そのため、MongoDBへのDataStoreコネクタを実装し、ドキュメントストレージシステムとして使用する方法を検討しました。 この記事が他のBagree拡張機能を記述するための出発点になるか、この興味深い製品をより詳しく理解することをお勧めします。 プロジェクトには常にBagriの開発に関心のあるJava開発者が必要です。プロジェクトコードの詳細についてはGithubを参照してください。



All Articles