このドキュメントは、Rustプログラミング言語であるfutures
のコンテナを学習するのに役立ちます。これは、futuresとゼロコストスレッドの実装を提供します。 Futuresは、 C++
、 Java
、 Scala
など、他の多くのプログラミング言語で利用でき、 futures
コンテナはこれらの言語のライブラリからインスピレーションを得ています。 ただし、人間工学に基づいており、Rustに固有のゼロコスト抽象化の哲学にも準拠しています。つまり、未来を作成および構成するためにメモリ割り当てを必要とせず、それらを管理するTask
必要な割り当ては1つだけです。 FutureはRustの非同期で構成可能な高性能I / Oの基盤である必要があり、初期のパフォーマンス測定では、futureで構築された単純なHTTPサーバーが非常に高速であることを示しています。
この投稿は、 futures-rsの公式チュートリアルの翻訳です。
このドキュメントはいくつかのセクションに分かれています。
- 「Hello world!」;
- 特性の未来;
- タイプ
Stream
; - 具体的な先物とストリーム(
Stream
); - 先物を返します。
-
Task
と将来; - ローカルタスクデータ。
こんにちは世界!
futures
コンテナにはRustバージョン1.10.0以降が必要です。これはRustup
を使用して簡単にインストールできます。 コンテナはテスト済みで、Windows、macOS、Linuxで正常に動作しますが、他のプラットフォームのPRはいつでも歓迎します。 次のようCargo.toml
、プロジェクトのCargo.toml
追加できます。
[dependencies] futures = { git = "https://github.com/alexcrichton/futures-rs" } tokio-core = { git = "https://github.com/tokio-rs/tokio-core" } tokio-tls = { git = "https://github.com/tokio-rs/tokio-tls" }
注:このライブラリはアクティブに開発中であり、gitでソースを直接取得する必要がありますが、後でコンテナを取得する必要があります
crates.ioで公開されます。
ここでは、3つのコンテナを追加します:
- futures -
Future
およびStream
実装の定義とコア。 - tokio-core-特定を提供する
mio
コンテナーバインディング
TCPおよびUDPのFuture
およびStream
実装。 - tokio-tls-先物ベースのSSL / TLS実装。
futuresコンテナは、ランタイムや入力/出力レイヤーを一切持たないfuturesの低レベル実装です。 以下の例では、 tokio-coreで利用可能な特定の実装を使用して、 futureとスレッドを使用して、オーバーヘッドなしで複雑なI / Oを実行する方法を示します。
必要なものはすべて揃ったので、最初のプログラムを作成します。 ハローワールドの例として、ホームをダウンロードしてください
錆ページ:
extern crate futures; extern crate tokio_core; extern crate tokio_tls; use std::net::ToSocketAddrs; use futures::Future; use tokio_core::reactor::Core; use tokio_core::net::TcpStream; use tokio_tls::ClientContext; fn main() { let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap(); let socket = TcpStream::connect(&addr, &core.handle()); let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) }); let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) }); let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) }); let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data)); }
src/main.rs
沿ってこのコンテンツを含むファイルを作成し、 cargo run
コマンドを実行すると、RustメインページのHTMLが表示されます。
注:Rustc 1.10はこの例をゆっくりコンパイルします。 1.11ではコンパイルが高速になりました。
このコードは大きすぎてすぐに理解できないため、行を見ていきましょう。
main()
関数を見てください:
let mut core = Core::new().unwrap(); let addr = "www.Rust-lang.org:443".to_socket_addrs().unwrap().next().unwrap();
これにより、すべての入力/出力が実行されるイベントループが作成されます。 標準ライブラリのto_socket_addrs
メソッドを使用して、ホスト名「www.Rust-lang.org」を変換した後。
次:
let socket = TcpStream::connect(&addr, &core.handle());
イベントループハンドルを取得し 、 TcpStream :: connectを使用してホストに接続します。 特に、 TcpStream :: connectはfutureを返します。 実際、ソケットは接続されていませんが、接続は後で行われます。
ソケットが使用可能になったら、3つの手順に従ってRust-lang.orgのホームページをロードする必要があります。
TLSハンドシェイクを実行します。 このWebサイトはHTTPS経由でのみアクセスできるため、ポート443に接続し、TLSプロトコルに従う必要があります。
HTTP
GET
リクエストを送信します。 このガイドの一部として、リクエストを手動で作成しますが、戦闘プログラムでは、futures
構築されたHTTPクライアントを使用する必要があります。
- 結論として、ソケットからすべてのデータを読み取って応答をダウンロードします。
これらの各ステップを詳細に検討してください。
最初のステップ:
let tls_handshake = socket.and_then(|socket| { let cx = ClientContext::new().unwrap(); cx.handshake("www.Rust-lang.org", socket) });
ここでは、将来の特性のand_thenメソッドが使用され、 TcpStream :: connectメソッドの結果で呼び出します。 and_thenメソッドは、前のフューチャーの値を受け取るクロージャーを受け入れます。 この場合、 socket
のタイプはTcpStreamになります。
TcpStream :: connectがエラーを返した場合、 and_thenに渡されたクロージャーは実行されないことに注意してください。
socket
受信したら、 ClientContext :: newを使用してクライアントTLSコンテキストを作成します。 tokio-tls
このタイプは、TLS接続のクライアント側を表します。 次に、 ハンドシェイクメソッドを呼び出してTLS ハンドシェイクを実行します。 最初の引数は接続するドメイン名で、2番目はI / Oオブジェクト(この場合はsocket
オブジェクト)です。
TcpStream ::以前に接続するように、 ハンドシェイクメソッドはfutureを返します。 クライアントとサーバーはI / O、証明書の確認などを行う必要があるため、TLSハンドシェイクには時間がかかる場合があります。 実行後、 futureは上記のTcpStreamに似たTlsStreamを返します。
and_thenコンビネータは多くの秘密の作業を行い、先物が正しい順序で実行され、その場で追跡されるようにします。 同時に、 and_thenによって返される値はFuture トレイトを実装するため、 一連の計算を構成できます。
次に、HTTPリクエストを送信します。
let request = tls_handshake.and_then(|socket| { tokio_core::io::write_all(socket, "\ GET / HTTP/1.0\r\n\ Host: www.Rust-lang.org\r\n\ \r\n\ ".as_bytes()) });
ここで、前のステップ( tls_handshake
)から未来を取得し、再度and_thenを使用して計算を続行しました。 write_allコンビネーターは完全なHTTP要求を書き込み、必要に応じて複数の書き込みを生成します。
write_allメソッドによって返されるfutureは、すべてのデータがソケットに書き込まれるとすぐに実行されます。 TlsStreamがソケットに送信する前に書き込んだすべてのデータを秘密に暗号化することは注目に値します。
リクエストの3番目と最後の部分は次のようになります。
let response = request.and_then(|(socket, _)| { tokio_core::io::read_to_end(socket, Vec::new()) });
前の将来のrequest
、今度はread_to_endコンビネーターの結果に再びリンクされます。 このFutureは、ソケットからすべてのデータを読み取り、提供されたバッファーに入れ、処理中の接続がEOFを送信したときにバッファーを返します。
前と同じように、ソケットからの読み取りは実際にサーバーから受信したデータを秘密に解読するため、解読されたバージョンを読み取ります。
この場所で中断が中断されると、何も起こらないので驚くでしょう。 これは、私たちが行ったことはすべて将来の計算に基づいており、実際に実行していないためです。 ここまでは、I / Oを実行しておらず、HTTP要求なども実行していません。
先物を本当に実行し、それらを完了まで管理するには、イベントループを実行する必要があります。
let (_, data) = core.run(response).unwrap(); println!("{}", String::from_utf8_lossy(&data));
ここで、将来のresponse
はイベントループに配置され、futureを実行するように要求します 。 結果が取得されるまで、イベントループが実行されます。
特に、 core.run(..)
の呼び出しは、futureが返されるまで呼び出しスレッドをブロックします。 これは、 data
のタイプがVec<u8>
であることを意味しdata
。 その後、通常どおり標準出力で印刷できます。
ふう! TCP接続を初期化し、 計算チェーンを作成 し、ソケットからデータを読み取る Futureを調べました。 しかし、これは先物の可能性のほんの一例に過ぎず、ニュアンスを考慮します。
未来のキャラクター
futureは、 futures
コンテナの中核です。 この特性は、非同期コンピューティングとその結果を表します。
次のコードを見てください。
trait Future { type Item; type Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error>; // ... }
この定義には、疑問を投げかける多くのポイントが含まれていると確信しています。
-
Item
とError
; -
poll
- 将来のコンビネーター。
それらを詳細に分析します。
Item
とError
type Item; type Error;
ご存じのとおり、将来の特性の最初の特徴は、2つの関連するタイプが含まれていることです。 それらは未来が受け取ることができる価値のタイプです。 各Future
インスタンスは、 Result<Self::Item, Self::Error>
として処理できます。
これら2つのタイプは、futureを渡すwhere
条件と、futureが返されるタイプシグネチャwhere
非常に頻繁に使用されます。
たとえば、futureを返す場合、次のように記述できます。
fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... }
または、未来を受け入れるとき:
fn foo<F>(future: F) where F: Future<Error = io::Error>, F::Item: Clone, { // ... }
poll
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
将来の特性は、このメソッドに基づいています。 pollメソッドは、将来計算される値を取得するための唯一のエントリポイントです。 将来のユーザーとして、このメソッドを直接呼び出す必要はほとんどありません。 ほとんどの場合、先物の周りに高レベルの抽象化を作成するコンビネータを介して先物とやり取りします。 ただし、内部で先物がどのように機能するかを知ることは役立ちます。
&mut self
引数に注意してください。これにより、多くの制限とプロパティが発生します。
- 先物は、一度に1つのスレッドによってのみポーリングできます。
-
poll
メソッドの実行中、futureは状態を変更できます。 -
poll
コミットされた後、先物の所有権を別のエンティティに譲渡できます。
実際には、 ポーリングタイプはエイリアスです。
type Poll<T, E> = Result<Async<T>, E>;
また、 非同期列挙が何であるかを見てください:
pub enum Async<T> { Ready(T), NotReady, }
この列挙により、futureは、将来の値を使用する準備ができたときに相互作用できます。 エラーが発生した場合、 Err
がすぐに返されます。 それ以外の場合、Future値が完全に受信されるか、まだ準備ができていないときに、 非同期列挙が表示されます。
Iterator
ようなFutureトレイトは、futureがすでに処理されている場合にpollメソッドが呼び出された後に何が起こるかを決定しません。 つまり、 Futureトレイトを実装する人は、 pollメソッドが正常に返されたかどうかを確認するために状態を維持する必要はありません。
ポーリング呼び出しがNotReady
返した場合、将来は再び実行するタイミングを知る必要があります。 この目標を達成するために、将来は次のメカニズムを提供する必要がありますNotReady
を受信NotReady
現在のタスクは値が使用可能になったときに通知を受信できる必要があります。
パークメソッドは、通知配信の主要なエントリポイントです。 この関数は、 Send
および'static
型を実装するタスクを返し、メインメソッドunparkを持ちます。 unparkメソッドの呼び出しは、futureが計算を実行して値を返すことができることを示します。
より詳細なドキュメントはこちらにあります 。
将来のコンビネーター
今ではpollメソッドはワークフローに少し苦痛を加えることができるようです。 String
を返すu32
があり、 u32
を返すu32
に変換したい場合はどうしますか? そのような構成を得るために、将来のキャラクターは多数のコンビネーターを提供します。
これらのコンビネータはイテレータコンビネータに似ており、すべて未来を受け入れ、新しい未来を返します。
たとえば、次のように書くことができます。
fn parse<F>(future: F) -> Box<Future<Item=u32, Error=F::Error>> where F: Future<Item=String> + 'static, { Box::new(future.map(|string| { string.parse::<u32>().unwrap() })) }
ここでは、将来を変換するために、 String
型を返し、将来的にはu32
返すmapが使用されます。 ボックスのパッケージは必ずしも必要ではなく、 先物返品のセクションで詳細に説明します 。
コンビネータを使用すると、次の概念を表現できます。
- タイプfutureの変更( map 、 map_err );
- ソースが実行されたときに別のフューチャーを起動する( then 、 and_then 、 or_else );
- 少なくとも1つの先物が完了したときの実行の継続( 選択 );
- 2つの未来( join )が完了するのを待っています。
- 計算後の
poll
動作の定義( fuse )。
コンビネータの使用は、RustでIterator
を使用するか、Scalaでfutures
を使用することに似ています。 先物でのほとんどの操作は、これらのコンビネータを使用することになります。 すべてのコンビネータのコストはゼロです。つまり、メモリ割り当てはなく、実装は手動で記述したような方法で最適化されます。
Stream
タイプ
以前は、 Future traitを調べました。これは、1つの値のみを計算するときに役立ちます。 ただし、計算を値のストリームとして提示した方が良い場合もあります。 たとえば、TCPリスナーは、その存続期間にわたって多くのTCP接続を確立します。 標準ライブラリのどのエンティティがFutureおよびStreamと同等かを見てみましょう:
#個のアイテム | 同期する | 非同期 | 一般的な操作 |
---|---|---|---|
1 | [結果] | [今後] | [map]、[and_then] |
∞ | [イテレーター] | [ストリーム] | [map] [stream-map]、[fold]、[collect] |
ストリームタイプを見てみましょう。
trait Stream { type Item; type Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; }
StreamのタイプがFutureのタイプと非常に似ていることに気づいたかもしれません。 主な違いは、 pollメソッドがOption<Self::Item>
ではなくOption<Self::Item>
返すことです。
時間の経過に伴うストリームは多くのオプション値を生成し、 Ready(None)
返すことでストリームの終わりを通知します。 コアは、特定の順序で値を生成する非同期ストリームです。
実際、 StreamはFuture トレイトの特別なインスタンスであり、 into_futureメソッドを使用してFutureに変換できます。
返されたフューチャーは、ストリームとストリーム自体から次の値を取得します。これにより、後でより多くの値を取得できます。 また、基本的なフューチャーコンビネーターを使用して、ストリームやその他の任意のフューチャーを作成できます。
Futureトレイトと同様に、 Streamトレイトは多数のコンビネーターを提供します。 futureのようなコンビネーター(例: then )に加えて、 foldなどのストリーム固有のコンビネーターがサポートされています。
Stream
特性の例
先物の使用例については、このチュートリアルの最初で検討しました。次に、 受信メソッドの実装を使用してストリームを使用する例を見てみましょう。 接続を受け入れるこの単純なサーバーは、「Hello!」という単語を書き込みます。 ソケットを閉じます:
extern crate futures; extern crate tokio_core; use futures::stream::Stream; use tokio_core::reactor::Core; use tokio_core::net::TcpListener; fn main() { let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr); let clients = listener.incoming(); let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") }); let server = welcomes.for_each(|(_socket, _welcome)| { Ok(()) }); core.run(server).unwrap(); }
前と同じように、行を見てみましょう。
let mut core = Core::new().unwrap(); let address = "127.0.0.1:8080".parse().unwrap(); let listener = TcpListener::bind(&address, &core.handle()).unwrap();
ここでは、 LoopHandleでTcpListener :: bindメソッドを呼び出して、ソケットを受け入れるTCPリスナーを作成することにより、イベントループを初期化しました。
次に、次のコードを見てください。
let server = listener.and_then(|listener| { // ... });
ここでは、 TcpStream::connect
ようなTcpListener::bind
がTcpListener::bind
返さTcpListener
、むしろ将来がそれを計算することがTcpListener
ます。 次に、 Futureの and_thenメソッドを使用して、TCPリスナーが使用可能になったときに何が起こるかを判断します。
TCPリスナーを取得し、その状態を判断できます。
let addr = listener.local_addr().unwrap(); println!("Listening for connections on {}", addr);
local_addrメソッドを呼び出して 、リスナーが関連付けられているアドレスを出力します。 この時点から、クライアントが接続できるようにポートが正常に接続されます。
let clients = listener.incoming();
ここで、 着信メソッドはTcpListenerとSocketAddrの Streamペアを返します。 これは、標準ライブラリのTcpListenerおよびacceptメソッドに似ていますが、この場合のみ、すべてのイベントをストリームとして受信し、ソケットを手動で受け入れない可能性が高くなります。
clients
スレッドは、常にソケットを生成しclients
。 これはサーバーの動作を反映します-クライアントをループに入れて直接
それらを処理のためにシステムの残りに渡します。
クライアント接続のストリームができたので、標準のStream特性を使用して操作できます。
let welcomes = clients.and_then(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"Hello!\n") });
ここでは、 Streamタイプのand_thenメソッドを使用して、 ストリームの各要素に対してアクションを実行します。 この場合、ストリームの各要素( TcpStream
)の計算のチェーンを形成します。 以前にwrite_allメソッドを見ました。これは、送信されたデータバッファを送信されたソケットに書き込みます。
このブロックは、 welcomes
一連の文字「Hello!」が書き込まれたソケットのストリームであることwelcomes
意味します。 このチュートリアルの一部として、接続を終了するため、 for_eachメソッドを使用して、 welcomes
ストリーム全体welcomes
将来に変換します。
welcomes.for_each(|(_socket, _welcome)| { Ok(()) })
ここで、前の未来の結果write_allを取得し 、それらを破棄して、ソケットが閉じられるようにします。
このサーバーの重要な制限は、並列性の欠如であることに注意してください。 ストリームはデータの整然とした処理であり、この場合、ソースストリームの順序はソケットが受信された順序であり、 and_thenおよびfor_eachメソッドはこの順序を維持します。 したがって、チェーンは、各ソケットがストリームから取得され、それに関連するすべての操作が次のソケットに移動する前に処理されるときに効果を作成します。
代わりに、すべてのクライアントを並行して管理する場合、 spawnメソッドを使用できます。
let clients = listener.incoming(); let welcomes = clients.map(|(socket, _peer_addr)| { tokio_core::io::write_all(socket, b"hello!\n") }); let handle = core.handle(); let server = welcomes.for_each(|future| { handle.spawn(future.then(|_| Ok(()))); Ok(()) });
and_thenメソッドの代わりに、クライアントストリームをfuturesストリームに変換するmapメソッドが使用されます。 次に、 spawnメソッドを使用してfor_eachに渡されるクロージャーを変更します 。これにより、イベントループでfutureを並列に実行できます。 spawnはtype ()
を持つitem / errorを持つfutureを必要とすることに注意してください。
先物とスレッドの具体的な実装
この段階では、 Future
およびStream
タイプ、それらの実装方法、およびそれらを組み合わせる方法について明確に理解しています。 しかし、これらすべての未来はどこから来たのでしょうか?
先物とスレッドのいくつかの特定の実装を見てください。
まず、利用可能な将来の価値はすべて「準備完了」状態です。 これには、 done 、 failed、およびfinishedの機能で十分です。 done関数はResult<T,E>
を受け取りResult<T,E>
Future<Item=T, Error=E>
を返します。 失敗しfinished
機能とfinished
機能については 、 T
またはE
を指定し、別の関連するタイプをテンプレート(ワイルドカード)のままにすることができます。
スレッドの場合、「完成した」ストリーム値の同等の概念はiter関数であり、結果のイテレータの要素を返すストリームを作成します。 値が「準備完了」状態にない状況では、 Future
およびStream
多くの一般的な実装もあります。最初の実装はoneshot関数です。
extern crate futures; use std::thread; use futures::Future; fn expensive_computation() -> u32 { // ... 200 } fn main() { let (tx, rx) = futures::oneshot(); thread::spawn(move || { tx.complete(expensive_computation()); }); let rx = rx.map(|x| x + 3); }
, oneshot , , , mpsc::channel . tx
("transmitter") Complete oneshot
, future . Complete::complete .
, rx
("receiver"), Oneshot , Future . Item
T
, Oneshot
. Error
Canceled
, , Complete .
future ( ) . Send
. , , , future , .
Stream channel . , , , Stream
, .
Sender : , , future, , , . , .
futures
futures — Future . Iterator , .
:
- - ;
- ;
- ;
- impl Trait .
-
, , - :
fn foo() -> Box<Future<Item = u32, Error = io::Error>> { // ... }
. future, future , .
, boxed BoxFuture
, Box<Future + Send>
:
fn foo() -> BoxFuture<u32, u32> { finished(1).boxed() }
, future . Box
, future . , , , future . , , future (. , , ), Box
.
Box
, future .
例:
struct MyFuture { inner: Oneshot<i32>, } fn foo() -> MyFuture { let (tx, rx) = oneshot(); // ... MyFuture { inner: tx } } impl Future for MyFuture { // ... }
MyFuture
Future
. future Oneshot<i32>
, future .
, , Box
- . MyFuture
, .
, . , futures .
— :
fn add_10<F>(f: F) -> Map<F, fn(i32) -> i32> where F: Future<Item = i32>, { fn do_map(i: i32) -> i32 { i + 10 } f.map(do_map) }
future, Box
, .
. - . ( fn(i32) -> i32
), . , .
impl Trait
Rust, impl Trait , future.
例:
fn add_10<F>(f: F) -> impl Future<Item = i32, Error = F::Error> where F: Future<Item = i32>, { f.map(|i| i + 10) }
, — ", Future
" . future .
: Box
, , future , Box
.
, impl Trait Rust. , , , futures, . -, Box
impl Trait
.
Task
Future
, futures, , . , poll
, , poll
NotReady
, , ? , poll
?
Task .
Task , futures. future , . ", !" future, . Task , " " , future .
future . (poll), , future. spawn , puPool::spawn Handle::spawn . spawn poll
.
Task
futures
: Task
, Future
. , futures
. Task
, , .
, future . , futures , , , .
Futures 'static
, futures:
future , future, ;
- , (
Arc / Rc
) , , (Arc<Mutex>
), .
, , .
Task
Future
, Task
, poll
, . API Task
Task
. Task
:
task_local!
,thread_local!
. , ,Task
, ,Task
;
- TaskRc , . ,
Rc
.
, , , .
!
"" — - "", . "" . — .
: "future" ?