非同期モナドの例

2つのプログラムがネットワークを介して互いに通信しているが、回答を待つことを望まないため、回答はランダムな順序で到着するとします。 何が起こっているのかを理解するために、メッセージと共に番号が送信され、応答は元の(応答する)メッセージの番号と、その後の通信のための応答番号を送信します。



私たちの目標は、ある対話者と通信するときのメッセージの送受信のシーケンスを記述し、メッセージの送受信間でI / O(たとえば、データベースへのアクセス)を使用できるようにすることです。



たとえば、あなたの好みの言語でのコード内のダイアログのように見えます。これは、いつでも(これらのアイテムの間で)他のリクエストも処理される必要があるが、誤ってこのダイアログに混入しないという事実を考慮したものです:

1.番号を送信する

2.番号が応答します

3.第2項の番号を2乗して送信します

4.答えは再び数字です

5.条項2と条項4の数値の合計をコンソールに表示します



これはHaskellでどのように見えるかです(もちろん、関数のexample



はノンブロッキングです):

example :: Int -> AIO () <br>

example v = do <br>

x <- request v<br>

y <- request ( x * x ) <br>

io $ print ( x + y ) <br>







これを、たとえばユーザーからの応答を要求する同様のブロック機能と比較してください。

example :: Int -> IO () <br>

example v = do <br>

x <- request v<br>

y <- request ( x * x ) <br>

print ( x + y ) <br>









不要な詳細に気を取られず、複数のプログラムを実行する必要がないように、この記事のタスクを簡略化します。 チャネル( Chan a



)の読み取りと書き込みを行い、メッセージのタイプは(Int, String)



、つまり メッセージ番号とシリアル化された値。



必要なすべてのモジュールを接続します。



> module Test ( <br>

> ) where <br>

> <br>

> import Control . Arrow <br>

> import Control . Monad <br>

> import Control . Concurrent . MVar <br>

> import Control . Concurrent . Chan <br>

> import Control . Concurrent <br>

> import Data . List <br>

> import Data . Maybe <br>







モナドを書く前に、まずコールバックですべてを実行してみてください。

メッセージを送信するとき、何らかの種類の番号を生成し、リストにコールバックを追加する必要があります。 つまり 変数番号とペアのリストnumber- > callbackが必要です。 実際、チャネル自体も必要です。 ソケットとは異なり、一方に書き込み、他方から読み取るため、2つ必要です。 これらすべてを個別のタイプとして配置します。



> data AState = AState { <br>

> aCurrent :: MVar Int , <br>

> aWait :: MVar [ ( Int , String -> IO () ) ] , <br>

> aChanOut :: Chan ( Int , String ) , <br>

> aChanIn :: Chan ( Int , String ) } <br>

> <br>

> newA = liftM4 AState ( newMVar 0 ) ( newMVar [] ) newChan newChan<br>







クライアント側のメッセージハンドラは、チャネルから読み取り、メッセージ番号に従ってコールバックを呼び出す必要があります。

チャンネルから読み取り、適切なコールバックを探して(リストから削除しながら)呼び出します。 簡単です:



> listener ( AState _ w _ chIn ) = forever $ do <br>

> ( i , s ) <- readChan chIn<br>

> -- modifyMVar a -> IO (a, b) <br>

> -- .. , . <br>

> -- callback. <br>

> callback <- modifyMVar w $ \ callbacks -> do <br>

> -- callback' . <br>

> let ( past , ok ) = partition ( ( /= i ) . fst ) callbacks<br>

> -- ( ). <br>

> case ok of <br>

> ( ( _ , f ) : _ ) -> return ( past , f ) -- callback ( ). <br>

> _ -> return ( past , \ s -> return () ) -- , <br>

> callback s -- callback. <br>







「サーバー」に到着したメッセージを直接観察できるように、すべての着信メッセージを対話者( aChanOut



チャネル)に出力するハンドラーを作成します。

aChanOut



チャネルから読み取り、表示します。



> tracer ( AState _ _ chOut _ ) = forever $ readChan chOut >>= print<br>







宣伝的な方法


そもそも、モナドなしでやろう。 メッセージ送信関数を書きましょう。

メッセージ番号を生成し、メッセージを文字列にシリアル化し、コールバックを登録する必要があります。

sendAndReceive1 :: AState -> String -> ( String -> IO () ) -> IO () <br>

sendAndReceive1 ( AState cur w chOut _ ) msg onMsg = do <br>

i <- modifyMVar cur ( return . ( succ &&& id ) ) -- 1 . <br>

modifyMVar_ w ( return . ( ( i , onMsg ) : ) ) -- callback. <br>

writeChan chOut ( i , msg ) -- . <br>







原則として使用は許容されますが、いくつかの欠陥があります。

sendAndReceive1 a ( show 123 ) $ \ ans -> do <br>

let x = read ans -- . <br>

print x<br>

sendAndReceive1 a ( show x ) $ \ ans2 -> do <br>

-- ... <br>







まず、シリアル化と逆シリアル化の機能を渡すことができます。これにより、たとえばデフォルトで標準のread



show



を使用して、コールバックに書き込まsendAndReceive2



を書き込むことがsendAndReceive2



ます。

sendAndReceive1 :: AState -> a -> ( a -> String ) -> ( String -> b ) -> ( b -> IO () ) -> IO () <br>

sendAndReceive1 ( AState cur w chOut _ ) msg show_ read_ onMsg = do <br>

i <- modifyMVar cur ( return . ( succ &&& id ) ) <br>

modifyMVar_ w ( return . ( ( i , onMsg . read_ ) : ) ) <br>

writeChan chOut ( i , show_ msg ) <br>

<br>

sendAndReceive2 :: ( Show a , Read b ) => AState -> a -> ( b -> IO () ) -> IO () <br>

sendAndReceive2 a msg onMsg = sendAndReceive1 a msg show read onMsg<br>

<br>

-- . <br>

sendAndReceive2 a 23 $ \ x -> do <br>

print x<br>

sendAndReceive2 a ( x + 10 ) $ \ z -> ... <br>







これについて詳しく説明することは可能ですが、アセンブラーを完全に使用することは可能です。そこで、より強力な抽象化を使用してみませんか。



ユーモナディックウェイ


モナドのメイン関数(TCではなくHaskell)のタイプがma -> (a -> mb) -> mb



であることを思い出すと、コールバックは2番目の引数として始まります。 しかし、そこにはprint



型の定期的な計算を渡す必要もありprint





それらを何らかの方法で区別するには、2つのオプションを使用して新しいタイプを作成します。

1.メッセージ+コールバック

2.純粋な価値



> data AS a = Send String ( String -> AIO a ) | Pure a<br>







そして、それをIO



モナドでラップします。



> data AIO a = AIO { aio :: IO ( AS a ) } <br>







したがって、計算は2つのキャンプに分割されます。メッセージの送信とその他のすべてです。



通常のIO



をモナドに「レイズ」する関数を作成します。 IO



と同じ値を返すだけですが、 Pure



コンストラクターでラップする必要があります



> io :: IO a -> AIO a<br>

<br>

io act = AIO $ do <br>

v <- act<br>

return ( Pure v ) <br>







または簡単:



> io = AIO . liftM Pure <br>







メッセージを送信するための関数は、2番目のコンストラクタSend



を使用します。基本的には、引数をコンストラクタにパックするだけです。



> sendAndReceive :: a -> ( a -> String ) -> ( String -> b ) -> AIO b<br>

> sendAndReceive msg to from = AIO $ return $ Send ( to msg ) ( return . from ) <br>







そして、それに似たrequest



show



を使用して、シリアル化のためにread







> request :: ( Show a , Read b ) => a -> AIO b<br>

> request msg = sendAndReceive msg show read<br>







いくつかのトリックは、モナドでは何も計算せず、計算ツリーのようなもののみを構築することです。 これらの関数自体は、 AIO



タイプのみを作成します。

このガベージをすべて計算できる関数を使用します。 つまり 私たちによって記述されたダイアログを実行するために(たとえば、 example



)。 作成されるダイアログには、2つのオプションがあります。

1. Pure



-値を削除して返すだけです。

2. Send



-ここで主な作業は完了です-番号を生成し、コールバックを登録してメッセージを送信します。



> run :: AState -> AIO () -> IO () <br>

> run a @ ( AState cur w chOut chIn ) act = run' act where <br>

> run' ( AIO actIO ) = do <br>

> as <- actIO<br>

> case as of <br>

> Pure value -> return value<br>

> Send msg onMsg -> do <br>

> i <- modifyMVar cur ( return . ( succ &&& id ) ) -- <br>

> modifyMVar_ w ( return . ( ( i , run' . onMsg ) : ) ) -- callback. <br>

> writeChan chOut ( i , msg ) -- . <br>







これで、 instance



モナドを作成する準備がすべて整いました。



> instance Monad AIO where <br>

> return = AIO . return . Pure -- <br>

> AIO v >>= f = AIO $ do <br>

> x <- v -- AS, Send Pure? <br>

> case x of <br>

> -- Pure, callback . <br>

> Pure value -> aio $ f value<br>

> -- "" callback . <br>

> Send msg onMsg -> return $ Send msg ( \ s -> onMsg s >>= f ) <br>







最後に必要なのは、操作性をチェックする関数です。これは、 listener



スレッドを起動してクライアントへの着信メッセージとtracer



を処理し、着信メッセージをサーバーに出力し、サーバーからクライアントにメッセージを送信する関数を返します。 つまり この場合、私たち自身が対談者として行動し、クライアントに送信したいものを印刷します。



> start :: IO ( AState , ( Int , String ) -> IO () ) <br>

> start = newA >>= forks where <br>

> forks a = mapM_ forkIO [ listener a , tracer a ] >> return ( a , writeChan ( aChanIn a ) ) <br>







千!


これで、元の例を使用してインタープリターでこれを確認できます。

-- , - <br>

-- , <br>

ghci > ( a , f ) <- start<br>

-- <br>

ghci > run a ( example 10 ) <br>

-- <br>

( 0 , "10" ) <br>

-- <br>

ghci > run a ( example 20 ) <br>

-- , <br>

( 1 , "20" ) <br>

-- "" <br>

ghci > f ( 0 , "11" ) <br>

-- <br>

( 2 , "121" ) <br>

-- "" <br>

ghci > f ( 1 , "21" ) <br>

-- <br>

( 3 , "441" ) <br>

-- "", , "" <br>

ghci > f ( 3 , "444" ) <br>

-- <br>

465 <br>

-- "" <br>

ghci > f ( 2 , "122" ) <br>

133 <br>

ghci > <br>







ご覧のとおり、2つのexample



同時に起動しても、それらのダイアログは交差しません。



ところで、このメッセージはすべてtest.lhs



Haskellのプログラムです。これをtest.lhs



コピーして、自分でテストできます。



PSこの記事の改善に協力してくれたpechlambdaに感謝します。



All Articles