CDRSの概要、Rustで完全に記述されたApache Cassandraドライバー

CDRSRusで書かれたApache C assandra d river)は、RustエコシステムでCassandraのドライバーが不足していることを発見した後、開発することにした私自身のオープンソースプロジェクトです。







もちろん、そうではないとは言いません。 しかし、1つの部分はHello Worldの初期段階で放棄されたパッケージであり、2番目の部分はおそらくC ++で記述されたDataStaxドライバーへの唯一のバインディングです







CDRSに関しては、Rustで、彼は第4プロトコルバージョンの仕様を完全に実装しています







cargo.toml



通常どおり、プロジェクトにドライバーを含めるには、次のものが必要です。







まず、 cargo.toml



ファイルのdependencies



セクションにCDRSを追加します。







 [dependencies] cdrs = "1.0.0-beta.1"
      
      





これにより、暗号化せずにTCP接続を使用できます。







データベースでSSL暗号化接続を作成する場合は、「ssl」機能を使用してCDRSを有効にする必要があります。







 [dependencies] openssl = "0.9.6" [dependencies.cdrs] version = "1.0.0-beta.1" features = ["ssl"]
      
      





次に、 lib.rs



に追加しlib.rs









 extern crate CDRS
      
      





接続を確立する



TCP接続



暗号化されていない接続を確立するには、次のモジュールが必要です。







 use cdrs::client::CDRS; use cdrs::authenticators::{NoneAuthenticator, PasswordAuthenticator}; use cdrs::transport::TransportPlain;
      
      





クラスターでパスワード認証が不要な場合は、次のように接続を確立できます。







 let authenticator = NoneAuthenticator; let addr = "127.0.0.1:9042"; let tcp_transport = TransportPlain::new(addr).unwrap(); // pass authenticator and transport into CDRS' constructor let client = CDRS::new(tcp_transport, authenticator); use cdrs::compression; // start session without compression let mut session = try!(client.start(compression::None));
      
      





パスワード認証を必要とする接続を確立するには、 NoneAuthenticator



代わりにPasswordAuthenticator



を使用します。







 let authenticator = PasswordAuthenticator::new("user", "pass");
      
      





TLS接続



TLS接続の確立は、SSLトランスポートを作成するためにPEM証明書が必要であることを除いて、前のステップで説明したプロセスによく似ています。







 use cdrs::client::CDRS; use cdrs::authenticators::PasswordAuthenticator; use cdrs::transport::TransportTls; use openssl::ssl::{SslConnectorBuilder, SslMethod}; use std::path::Path;
      
      





 let authenticator = PasswordAuthenticator::new("user", "pass"); let addr = "127.0.0.1:9042"; // here needs to be a path of your SSL certificate let path = Path::new("./node0.cer.pem"); let mut ssl_connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap(); ssl_connector_builder.builder_mut().set_ca_file(path).unwrap(); let connector = ssl_connector_builder.build(); let ssl_transport = TransportTls::new(addr, &connector).unwrap(); // pass authenticator and SSL transport into CDRS' constructor let client = CDRS::new(ssl_transport, authenticator);
      
      





接続プール



既存の接続の管理を容易にするために、CDRSにはConnectionManager



が含まれていConnectionManager



。これは、本質的にr2d2のアダプターです。







 use cdrs::connection_manager::ConnectionManager; //... let config = r2d2::Config::builder() .pool_size(3) .build(); let transport = TransportPlain::new(ADDR).unwrap(); let authenticator = PasswordAuthenticator::new(USER, PASS); let manager = ConnectionManager::new(transport, authenticator, Compression::None); let pool = r2d2::Pool::new(config, manager).unwrap(); for _ in 0..20 { let pool = pool.clone(); thread::spawn(move || { let conn = pool.get().unwrap(); // use the connection // it will be returned to the pool when it falls out of scope. }); }
      
      





圧縮-lz4およびsnappy



lz4



およびlz4



圧縮を使用するには、目的のデコーダーをセッションコンストラクターに渡すだけです。







 // session without compression let mut session_res = client.start(compression::None); // session with lz4 compression let mut session_res = client.start(compression::Lz4); // session with snappy compression let mut session_res = client.start(compression::Snappy);
      
      





さらに、CDRSは、選択したデコーダーで圧縮形式の情報を受信する準備ができたことをクラスターに個別に通知します。 それ以降のアンパックは自動的に行われ、開発者による追加のアクションは必要ありません。







クエリ実行



Cassandraサーバーへのリクエストの実行は、認証方法、圧縮、およびトランスポートのタイプを選択した後、既存のセッションのフレームワーク内でのみ行われます。







特定のクエリを実行するには Query



オブジェクトを作成する必要があります Query



オブジェクトには 、おそらくあまり使用されないパラメータが多数含まれているため、一見単純なクエリでは冗長に見えるかもしれません。







このため、要求を構成するプロセスを簡素化するbuilder



が作成されました。 たとえば、単純な ' USE my_namespace;



'十分にシンプル







 let create_query: Query = QueryBuilder::new("USE my_namespace;").finalize(); let with_tracing = false; let with_warnings = false; let switched = session.query(create_query, with_tracing, with_warnings).is_ok();
      
      





新しいテーブルを作成する



以前のように、Cassandraクラスターに新しいテーブルを作成するには、最初にQuery



を構成してからQuery



を実行する必要があります。







 use std::default::Default; use cdrs::query::{Query, QueryBuilder}; use cdrs::consistency::Consistency; let mut create_query: Query = QueryBuilder::new("CREATE TABLE keyspace.authors ( id int, name text, messages list<text>, PRIMARY KEY (id) );") .consistency(Consistency::One) .finalize(); let with_tracing = false; let with_warnings = false; let table_created = session.query(create_query, with_tracing, with_warnings).is_ok();
      
      





新しいテーブルを作成するCQLクエリ自体については、より完全な情報を得るために、 DataStaxなどの特殊なリソースを使用することをお勧めします。







SELECTクエリと結果のマッピング



データベースに著者のテーブルがあり、各著者が自分の投稿のリストを持っているとします。 これらのメッセージをリスト列内に保存します。 Rustの用語では、作成者は次のようになります。







 struct Author { pub name: String, pub messages: Vec<String> }
      
      





テーブルを作成する場合と同様に、クエリ自体はSession::query



メソッドを介して実行できます。 当然、この場合、CQLは ' SELECT * FROM keyspace.authors;



ようなものでなければなりませんSELECT * FROM keyspace.authors;



'。 テーブルに一部の著者に関するデータが含まれている場合、受信したデータを ' Vec<Author>



'などのRust構造のコレクションにマップしようとすることができます。







 //... use cdrs::error::{Result as CResult}; let res_body = parsed.get_body(); let rows = res_body.into_rows().unwrap(); let messages: Vec<Author> = rows .iter() .map(|row| { let name: String = row.get_by_name("name").unwrap(); let messages: Vec<String> = row // unwrap Option<CResult<T>>, where T implements AsRust .get_by_name("messages").unwrap().unwrap() .as_rust().unwrap(); return Author { author: name, text: messages }; }) .collect();
      
      





結果を表示するときは、次の特性に注意する必要があります。







  1. IntoRustByName 。 簡単に言えば、この特性は、row(厳密には、仕様で定義された別個の型ではありませんが、内部構造ではユーザー定義型に近いものと見なすことができます)などの複雑なCassandra型に適用されますUDT。 大まかに言うと、 get_by_name



    は名前で「プロパティ」を見つけようとし、見つかった場合は、このプロパティをRustタイプまたはList



    、 'Map'、 UDT



    などのCDRSタイプに変換した結果を返します。 これらの型自体は、仕様で定義されている対応するデータ型のマッピングです。







  2. AsRust 。 この特性は、Rustタイプへの最終的なマッピングを目的としています。 インプリケーターの完全なリストは、提供されているリンクで確認できます。


準備と実行



最初は複雑なクエリを一度準備してから、異なるデータを異なるタイミングで複数回実行すると便利な場合があります。 準備と実行はこれに最適です。







 // prepare just once let insert_table_cql = " insert into user_keyspace.users (user_name, password, gender, session_token, state) values (?, ?, ?, ?, ?)"; let prepared = session.prepare(insert_table_cql.to_string(), true, true) .unwrap() .get_body() .into_prepared() .unwrap(); // execute later and possible few times with different values let v: Vec<Value> = vec![Value::new_normal(String::from("john").into_bytes()), Value::new_normal(String::from("pwd").into_bytes()), Value::new_normal(String::from("male").into_bytes()), Value::new_normal(String::from("09000").into_bytes()), Value::new_normal(String::from("FL").into_bytes())]; let execution_params = QueryParamsBuilder::new(Consistency::One).values(v).finalize(); // without tracing and warnings let executed = session.execute(prepared.id, execution_params, false, false);
      
      





また、準備とバッチを組み合わせて、複数の準備されたクエリを一度に実行することも理にかなっています。 最も単純なバッチの例は、 例にもあります。







Cassandraイベント



上記のすべてに加えて、CDRSは、サーバーが発行するイベントをサブスクライブして追跡する機能を提供します。







 let (mut listener, stream) = session.listen_for(vec![SimpleServerEvent::SchemaChange]).unwrap(); thread::spawn(move || listener.start(&Compression::None).unwrap()); let topology_changes = stream // inspects all events in a stream .inspect(|event| println!("inspect event {:?}", event)) // filter by event's type: topology changes .filter(|event| event == &SimpleServerEvent::TopologyChange) // filter by event's specific information: new node was added .filter(|event| { match event { &ServerEvent::TopologyChange(ref event) => { event.change_type == TopologyChangeType::NewNode }, _ => false } }); println!("Start listen for server events"); for change in topology_changes { println!("server event {:?}", change); }
      
      





イベントの完全なリストを見つけるには、 ドライバーのドキュメントだけでなく、 仕様自体を参照することをお勧めします







将来的には、スマートロードバランシングにイベントを使用する計画があります。







便利なリンク






All Articles