マルチスレッドアプリケーションの作成が、シングルスレッドプログラムの開発にない多くの問題に関連していることは誰にとっても秘密ではないと思います。
1つの問題は、アプリケーションのテストです。
操作の実行順序を制御することはできないため、プログラム実行の結果も制御できません。 エラーが発生した場合でも、同じレーキをもう一度踏むのはそれほど簡単ではありません。
マルチスレッドアプリケーションのテスト方法に関する小さなレシピを提供したいと思います。
必要な成分のうち、
haskell
、
QuickCheck
、いくつかのモナド、味のある塩/
haskell
taste。
実施例
実用的な例として、食事する哲学者の問題を取り上げます。
MVar a
は、タイプaの値を含むか、空の参照です。
putMVar ref x
は、値xを参照リンクに配置します。
takeMVar ref
はリンクのコンテンツを読み取り、その後は空のままにします。
既に空だった場合、ストリームは何か他の書き込みが行われるまでスリープ状態になります。
()
単一の値を持つ型で、型自体と同じ方法で示されます-
()
。
MVar ()
ようなリンクを持つフォークをモデル化します。
したがって、フォークには2つの状態があります。フォークが哲学者によって占有されている場合、フォークは空です。 プラグが空いている場合、値
()
が含まれます。
import System.Random import Control.Monad import Control.Concurrent import Control.Monad.Cont import Control.Monad.Trans import Data.IORef import Test.QuickCheck import Test.QuickCheck.Gen import Test.QuickCheck.Monadic -- sleep ( 0 0.3) sleep :: IO () sleep = randomRIO (0, 300000) >>= threadDelay phil :: Int -- . -> MVar () -- . -> MVar () -- . -> IO () phil n leftFork rightFork = forever $ do putStrLn $ show n ++ " is awaiting" sleep takeMVar leftFork putStrLn $ show n ++ " took left fork" -- sleep takeMVar rightFork putStrLn $ show n ++ " took right fork" sleep putMVar leftFork () putMVar rightFork () putStrLn $ show n ++ " put forks" sleep runPhil :: Int -> IO () runPhil n = do -- , . forks <- replicateM n $ newMVar () -- 5 , phil. forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) main = do runPhil 5 -- , , . forever (threadDelay 1000000000)
このプログラムではデッドロックが発生する可能性があります。
それを楽しむために、あなたはラインのコメントを外すことができます-
sleep
して少し待ってください。
私たちの目標は、このエラーを検出するテストを作成することです。
しかし、これを行う前に、運用の順序をどのように管理するかを理解する価値があります。 このために、IOの代わりに、別のモナドを使用します。
sleep
、
phil
runPhil
の定義をまとめて、他のモナドでも
runPhil
ようにします。
sleep :: MonadIO m => m () sleep = do r <- liftIO $ randomRIO (0, 100) r `times` liftIO (threadDelay 300) where times :: Monad m => Int -> m () -> m () times ra = mapM_ (\_ -> a) [1..r]
これで、
sleep
機能はIO操作をサポートする任意のモナドで動作できます。
MonadIO
クラスでは、これを許可する
liftIO
関数が1つだけ
liftIO
ています。
ランダムな秒数で1回スリープする代わりに、0.3ミリ秒間ランダムな数回スリープすることに注意してください。 その理由は、モナドでは、
liftIO
内の
liftIO
がアトミックに実行されるためです。 したがって、私たちが眠りにつく時間は何にも影響を与えません。何回それをするかが重要です。
モナドは1つのスレッドで
MVar
ため、
MVar
は役に立たず、後で
phil
関数が
MVar
および他のタイプのリンクで機能できるという事実に基づいて、リンクのタイプを決定します。
これを行うには、
MonadConcurrent
モナド
MonadConcurrent
を定義します。この
MonadConcurrent
では、参照による作成、読み取り、書き込みの操作、およびスレッドの作成が行われます。
class Monad m => MonadConcurrent m where type CVar m :: * -> * newCVar :: a -> m (CVar ma) takeCVar :: CVar ma -> ma putCVar :: CVar ma -> a -> m () fork :: m () -> m ()
ここでは、言語の拡張であるタイプファミリを使用しました。
この場合、異なるモナドに対して異なるタイプのリンクを定義できるように、この拡張が必要です。
拡張機能を使用するには、次の行をファイルの先頭に追加する必要があります(同時に、後で必要になる拡張機能を接続します)。
{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}
IOモナドのこのクラスの
instance
定義し
instance
。
ここではすべてが簡単です
MVar
適切な操作を使用するだけ
MVar
。
instance MonadConcurrent IO where type CVar IO = MVar newCVar = newMVar takeCVar = takeMVar putCVar = putMVar fork m = forkIO m >> return ()
関数
phil
と
runPhil
要約します。
phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m () phil n leftFork rightFork = forever $ do liftIO $ putStrLn $ show n ++ " is awaiting" sleep takeCVar leftFork liftIO $ putStrLn $ show n ++ " took left fork" takeCVar rightFork liftIO $ putStrLn $ show n ++ " took right fork" sleep putCVar leftFork () putCVar rightFork () liftIO $ putStrLn $ show n ++ " put forks" sleep runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m () runPhil n = do forks <- replicateM n $ newCVar () forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))
プログラムを実行し、以前と同じように機能することを確認します。
モナドコンカレント
そして今、楽しみが始まります。
作業するモナドを定義します(先を見て、
Cont
と呼ばれます)。 また、
Cont
が同時に最も複雑で強力なモナドの1つであることを提案しようと思います。
このモナドを使用すると、制御フローで何でもできます。たとえば、アクションを実行する代わりに、構造に保存して(このため、タイプ
Action
宣言します)、後で異なる順序で実行することができます。
data Action = Atom (IO Action) | forall a. ReadRef (MaybeRef a) (a -> Action) | forall a. WriteRef (MaybeRef a) a Action | Fork Action Action | Stop
各コンストラクターを個別に扱いましょう。
Stop
アクションは、計算が完了したことを意味します。
Fork
アクションは、計算ブランチ、つまり、同時に実行できる2つのスレッドがあることを意味します。
Atom
アクションはアトミックIO操作を実行し、次のステップで実行する必要がある
Action
を含む新しい
Action
を返します。
例:
getSum
関数は、キーボードから2つの数値を読み取り、それらの合計を出力して終了するアクションを定義します。
getSum :: Action getSum = Atom $ do x <- readLn -- return $ Atom $ do -- y <- readLn -- return $ Atom $ do -- print (x + y) -- return Stop --
次:
WriteRef ref val act
アクション
WriteRef ref val act
は、
ref
リンクで
val
の値を記録します。継続は
act
です。
ReadRef ref act
アクション
ReadRef ref act
は、参照
ref
によって値を読み取り、この値を取得して継続を返します。
Action
任意のタイプのリンクを保存できるように、言語の別の拡張機能である実存的数量化を使用します。
MaybeRef
タイプは、
MaybeRef
代わりに使用するリンクのタイプを表し、
Maybe
への参照として定義されます。
newtype MaybeRef a = MaybeRef (IORef (Maybe a))
これでモナドを定義できます。
約束したとおり、
Cont
モナドをラップするだけです。
newtype Concurrent a = Concurrent (Cont Action a) deriving Monad
Cont Action
モナドは次のように構成されています。
タイプaの値を返す代わりに、タイプ
a
継続
(a -> Action)
a-
(a -> Action)
を取得し、この関数に値を渡し、結果を返します。
つまり、
Cont Action a = (a -> Action) -> Action
と仮定できます。
具体的には、
(a -> Action) -> Action
を
Cont Action a
またはその逆に変換する次の関数ペアがあります。
cont :: ((a -> Action) -> Action) -> Cont Action a. runCont :: Cont Action a -> (a -> Action) -> Action
これで、
MonadIO
および
MonadConcurrent
インスタンスを定義できます。
instance MonadIO Concurrent where liftIO m = Concurrent $ cont $ \c -> Atom $ do a <- m return (ca)
ここで何が起こるか見てみましょう。
liftIO
はIO操作を受け入れ、それをアトミックアクションにラップします。 つまり、継続して(つまり、cのタイプ
a -> Action
)関数を渡し、IO操作
m
を実行するアトミックアクションを返します。
アトミック操作が
Action
を返すように
Atom
を定義しました。これは継続です。
実際、これは私たちがやっていることです:
m
を実行した後、
c
を呼び出して必要な継続を返します。
次に、
instance MonadConcurrent
定義し
instance MonadConcurrent
。
定義したばかりの
liftIO
関数を使用して、
newCVar
リンクを作成します。
takeCVar
および
putCVar
、対応するアクション
putCVar
返し、この構造内で継続を継続します。
forkでは、両方のスレッドが格納されているアクションを返します。1つは
fork
関数への引数で渡され、もう1つは継続から取得されます。
instance MonadConcurrent Concurrent where type CVar Concurrent = MaybeRef newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a) takeCVar v = Concurrent $ cont (ReadRef v) putCVar va = Concurrent $ cont $ \c -> WriteRef va $ c () fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()
私たちのモナドはほとんど準備ができています、それを起動する方法を学ぶためだけに残ります。
始めるために、
Action
を起動する関数を書きましょう。 アクションのリストを取り、各要素は個別のスレッドです。
アクションを起動するための戦略は異なる場合があります。 2つのポイントを決定します。スレッドを実行する順序と、空の変数から値を読み取ろうとした場合の対処方法です。 変数に何も含まれていない可能性があることを思い出させてください。その後、他のスレッドが変数に何かを入れるのを待つ必要があります。
最初に、スレッドを順番に実行する単純なバージョンを作成しましょう。 空の変数から読み取ろうとするスレッドはキューの最後に移動します。
runAction :: [Action] -> IO () -- , . runAction [] = return () -- , , , . runAction (Atom m : as) = do a' <- m runAction $ as ++ [a'] -- . runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2] -- . runAction (Stop : as) = runAction as runAction (ReadRef (MaybeRef ref) c : as) = do -- . ma <- readIORef ref case ma of -- -, Just a -> do -- . writeIORef ref Nothing -- . runAction (as ++ [ca]) -- , , . Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c]) -- , . runAction (WriteRef (MaybeRef ref) val a : as) = do writeIORef ref (Just val) runAction (as ++ [a])
putMVar
動作は、
WriteRef
実装とは若干異なることに注意してください。
リンクにすでに値がある場合、
putMVar
変数が解放されるまでスレッド
putMVar
フリーズします。 この場合、値を書き換えます。
コードを過度に複雑にしないために、この状況では
putMVar
として
putMVar
するバージョンを作成する価値はありません。
次に、
Concurrent
を起動する関数を記述し、
main
を再定義します。
runConcurrent :: Concurrent () -> IO () runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop] main = runConcurrent (runPhil 5)
現在、同じスレッドで作業しており、
threadDelay
がすべての作業を停止しているため、速度はわずかに低下しています。
テストを書く
「料理に調味料を加える」時が来ました-私たちの例のためにテストを書きます。
これを行うには、テスト用のランダム入力を生成する
QuickCheck
ライブラリを使用します。 スレッドをさまざまな順序で実行するため、テストの入力はリストから次のスレッドを選択する順序です。
入力を数字のリストでエンコードすることは可能ですが、問題は、スレッドの数が変化する可能性があるため、これらの数字がどの範囲から選択されるべきかを事前に知らないことです。
したがって、入力データを
Int -> Int
型の関数のリストでエンコードします。これらの関数は、数値
n
を取得し、区間
[0,n-1]
から数値を返します。
newtype Route = Route [Int -> Int]
QuickCheck
ライブラリーによって提供される
Arbitrary
クラスは、ランダム要素を生成できるタイプを記述することを目的としています。
このクラスでは、2つの関数が宣言されています-
shrink
と
arbitrary
。
shrink
にはデフォルトの実装があるため、再定義しません。
arbitrary
関数では、各関数が区間
[0,n-1]
から数値を返すランダム関数のリストを生成します。
instance Arbitrary Route where arbitrary = fmap Route (listOf arbitraryFun) where arbitraryFun = MkGen $ \qsn -> unGen (choose (0, n - 1)) qs
QuickCheck
が必要
QuickCheck
するため、
Route
instance Show
も定義し
instance Show
。
残念ながら、あまり有用な
show
書くことはできません。 さらに、この関数は使用されないため、未定義のままにします。
instance Show Route where show = undefined
これで、
runAction
よりスマートなバージョンの作成を開始できます。
最初の違いは、アトミックアクションの実行とリンクを使用した作業を分離することです。
まず、アトミックアクションを実行する補助関数
skipAtoms
を
skipAtoms
します。この関数はアクションのリストを受け入れ、
Atom
、
Fork
、および
Stop
実行し、残りは結果として返されます。
skipAtoms :: [Action] -> IO [Action] skipAtoms [] = return [] skipAtoms (Atom m : as) = do a <- m skipAtoms (as ++ [a]) skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2]) skipAtoms (Stop : as) = skipAtoms as skipAtoms (a : as) = fmap (a:) (skipAtoms as)
runAction
の新しいバージョンと以前のバージョンとの2番目の違いは、デッドロックの受信を追跡することです。
これを行うには、アクションの2つのリストを開始します。 最初は、アクティブな(当社が実行した)スレッドを保存します。 2番目には、リンクの更新を待機しているスレッドがあります。
アクティブなスレッドのリストが空で、待機リストがない場合、デッドロックが発生し、この場合は例外がスローされます。
3番目のイノベーションは、現在のステップで実行するストリーム番号を選択するために使用される
Route
タイプの引数です。
runAction :: Route -> [Action] -> [Action] -> IO () runAction _ [] [] = return () runAction _ [] _ = fail "Deadlock" runAction (Route []) _ _ = return () runAction (Route (r:rs)) as bs = do as <- skipAtoms as let n = length as case splitAt (rn) as of (as1, ReadRef (MaybeRef ref) c : as2) -> do ma <- readIORef ref case ma of Just a -> do writeIORef ref Nothing runAction (Route rs) (as1 ++ [ca] ++ as2) bs Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c]) (as1, WriteRef (MaybeRef ref) xc : as2) -> do writeIORef ref (Just x) runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []
runConcurrent
関数はあまり変更されていません。
runConcurrent :: Route -> Concurrent () -> IO () runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []
round_robin
を最初の引数として渡すことで、新しいバージョンがどのように機能するかを確認できます。 これは、以前の
runAction
関数の
runAction
似た単純な実行戦略です。 ここでは、単に無限リストを生成し、各要素について、スレッド数を法とする剰余を取得します。
round_robin :: Route round_robin = Route $ map rem [0..]
この入力データに対して計算を実行すると、この例の作業は乱数ジェネレーターに基づいているため、すぐにデッドロックが発生する可能性があります。したがって、入力データが常に同じであるにもかかわらず、実行順序は次のようになります。ランダム。
この例がより確定的である場合、入力データをランダムに変化させる必要がありますが、これは今から行います。
main = quickCheck $ monadicIO $ do r <- pick arbitrary run $ runConcurrent r (runPhil 5)
先ほど実装した
arbitrary
関数を使用して、
Route
タイプの任意の要素を選択します。 次に、この入力で計算を開始します。
QuickCheck
は
QuickCheck
処理します。つまり、入力データのサイズを増やすたびにテストを100回実行します。
プログラムを実行すると、次のように表示されます。
... 3 took left fork 4 put forks 4 is awaiting 5 took left fork 4 took left fork 1 took right fork 1 put forks 1 is awaiting 1 took left fork 2 took left fork *** Failed! Exception: 'user error (Deadlock)' (after 36 tests):
取得に必要なもの!
おわりに
マルチスレッドアプリケーションでデッドロック状態を検出できるテストの作成方法を学びました。
その過程で、
Cont
モナド、型族、実存的数量化、および
QuickCheck
ライブラリーの使用例を見ました。
さらに、即興の素材からマルチスレッドプログラム実行のモデルを組み立てる方法を学びました。