मेरा मानना है कि यह किसी के लिए कोई रहस्य नहीं है कि मल्टीथ्रेडेड एप्लिकेशन लिखना कई समस्याओं से जुड़ा है जो एकल-थ्रेडेड प्रोग्राम के विकास में अनुपस्थित हैं।
एक समस्या अनुप्रयोग का परीक्षण कर रही है।
हम उस क्रम को नियंत्रित नहीं कर सकते हैं जिसमें संचालन किया जाता है; इसलिए, कार्यक्रम निष्पादन का परिणाम भी नियंत्रित करने के लिए उत्तरदायी नहीं है। यहां तक कि अगर हमें कोई त्रुटि मिलती है, तो दूसरी बार उसी रेक पर कदम रखना इतना आसान नहीं होगा।
मैं एक multithreaded एप्लिकेशन का परीक्षण करने के तरीके के बारे में एक छोटा सा नुस्खा देना चाहता हूं।
हमें जिन सामग्रियों की आवश्यकता होती है उनमें:
haskell
,
QuickCheck
, कुछ मोनाड्स, नमक / काली मिर्च स्वाद के लिए।
काम करने का उदाहरण
एक काम के उदाहरण के रूप में, हम भोजन दार्शनिकों की समस्या को लेते हैं।
MVar a
ऐसा संदर्भ है जिसमें या तो टाइप का मान होता है या खाली होता है।
putMVar ref x
रेफरी लिंक पर वैल्यू
putMVar ref x
डालता है।
takeMVar ref
लिंक की सामग्री को पढ़ता है, इसके बाद इसे खाली छोड़ देता है।
यदि यह पहले से ही खाली था, तो धारा सो जाती है जब तक कि कुछ और इसे नहीं लिखता।
()
एक प्रकार है जिसका एकल मान है, जिसे उसी प्रकार से दर्शाया जाता है जैसे -
()
।
हम
MVar ()
जैसे लिंक के साथ कांटे
MVar ()
।
इस प्रकार, एक कांटा में दो राज्य हो सकते हैं: यदि किसी भी दार्शनिक द्वारा कांटा पर कब्जा किया जाता है, तो यह खाली है; यदि प्लग मुक्त है, तो इसमें मूल्य
()
।
import System.Random import Control.Monad import Control.Concurrent import Control.Monad.Cont import Control.Monad.Trans import Data.IORef import Test.QuickCheck import Test.QuickCheck.Gen import Test.QuickCheck.Monadic -- sleep ( 0 0.3) sleep :: IO () sleep = randomRIO (0, 300000) >>= threadDelay phil :: Int -- . -> MVar () -- . -> MVar () -- . -> IO () phil n leftFork rightFork = forever $ do putStrLn $ show n ++ " is awaiting" sleep takeMVar leftFork putStrLn $ show n ++ " took left fork" -- sleep takeMVar rightFork putStrLn $ show n ++ " took right fork" sleep putMVar leftFork () putMVar rightFork () putStrLn $ show n ++ " put forks" sleep runPhil :: Int -> IO () runPhil n = do -- , . forks <- replicateM n $ newMVar () -- 5 , phil. forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n)) main = do runPhil 5 -- , , . forever (threadDelay 1000000000)
इस कार्यक्रम में गतिरोध हो सकता है।
इसका आनंद लेने के लिए, आप लाइन को असहज कर सकते हैं -
sleep
और थोड़ा इंतजार करो।
हमारा लक्ष्य परीक्षण लिखना है जो इस त्रुटि का पता लगाएगा।
लेकिन इससे पहले कि हम ऐसा कर सकें, यह समझने लायक है कि हम संचालन के क्रम को कैसे प्रबंधित करेंगे। इसके लिए, IO के बजाय, हम एक और monad का उपयोग करते हैं।
हम
sleep
,
phil
और
runPhil
की परिभाषा को संक्षेप में प्रस्तुत करते हैं ताकि वे अन्य
runPhil
लिए भी काम करें।
sleep :: MonadIO m => m () sleep = do r <- liftIO $ randomRIO (0, 100) r `times` liftIO (threadDelay 300) where times :: Monad m => Int -> m () -> m () times ra = mapM_ (\_ -> a) [1..r]
अब
sleep
समारोह किसी भी मोनोड के साथ काम कर सकता है जो IO संचालन का समर्थन करता है।
MonadIO
वर्ग में, केवल एक
liftIO
फ़ंक्शन को
liftIO
जाता
liftIO
जो इसे अनुमति देता है।
ध्यान दें कि यादृच्छिक संख्या में एक बार सो जाने के बजाय, हम 0.3 मिलीसेकंड के लिए यादृच्छिक संख्या में सो जाते हैं। कारण यह है कि हमारे
liftIO
,
liftIO
अंदर की क्रियाएं परमाणु रूप से की जाती हैं। तदनुसार, जो समय हम सोते हैं वह किसी भी चीज को प्रभावित नहीं करता है, यह केवल महत्वपूर्ण है कि हम इसे कितनी बार करते हैं।
चूंकि हमारा
MVar
एक धागे में काम करेगा,
MVar
हमारे लिए बेकार है, और हम बाद में अपने प्रकार के लिंक को परिभाषित करेंगे, इस तथ्य के आधार पर कि
MVar
और अन्य प्रकार के लिंक के साथ
phil
फ़ंक्शन काम कर सकता है।
ऐसा करने के लिए, हम
MonadConcurrent
को परिभाषित करते हैं, जिसमें संदर्भ बनाने, पढ़ने और लिखने के साथ-साथ थ्रेड बनाने के लिए ऑपरेशन होंगे।
class Monad m => MonadConcurrent m where type CVar m :: * -> * newCVar :: a -> m (CVar ma) takeCVar :: CVar ma -> ma putCVar :: CVar ma -> a -> m () fork :: m () -> m ()
यहां हमने टाइप परिवारों का इस्तेमाल किया, जो भाषा का विस्तार हैं।
इस मामले में, हमें इस विस्तार की आवश्यकता है ताकि हम विभिन्न साधुओं के लिए विभिन्न प्रकार के लिंक परिभाषित कर सकें।
एक्सटेंशन का उपयोग करने के लिए, आपको फ़ाइल की शुरुआत में निम्नलिखित पंक्ति को जोड़ना होगा (और उसी समय उन एक्सटेंशनों को कनेक्ट करें जिनकी आवश्यकता होगी):
{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}
IO सनक के लिए इस वर्ग के एक
instance
परिभाषित करें।
यहां सब कुछ आसान है: हम सिर्फ
MVar
लिए उपयुक्त संचालन का उपयोग करते हैं।
instance MonadConcurrent IO where type CVar IO = MVar newCVar = newMVar takeCVar = takeMVar putCVar = putMVar fork m = forkIO m >> return ()
कार्यों को सारांशित करें और
runPhil
।
phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m () phil n leftFork rightFork = forever $ do liftIO $ putStrLn $ show n ++ " is awaiting" sleep takeCVar leftFork liftIO $ putStrLn $ show n ++ " took left fork" takeCVar rightFork liftIO $ putStrLn $ show n ++ " took right fork" sleep putCVar leftFork () putCVar rightFork () liftIO $ putStrLn $ show n ++ " put forks" sleep runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m () runPhil n = do forks <- replicateM n $ newCVar () forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))
प्रोग्राम को चलाएं और सुनिश्चित करें कि यह पहले की तरह काम करता है।
मोनाड समवर्ती
और अब मजा शुरू होता है।
उस मोनाड को परिभाषित करें जिसमें हम काम करेंगे (आगे देखते हुए, मैं कहूंगा कि इसे
Cont
कहा जाता है)। मैं यह सुझाव देने के लिए भी उद्यम करता हूं कि
Cont
एक ही समय में सबसे जटिल और सबसे शक्तिशाली साधुओं में से एक है।
इस सनक का उपयोग करते हुए, आप नियंत्रण प्रवाह के साथ कुछ भी कर सकते हैं: उदाहरण के लिए, क्रिया करने के बजाय, आप उन्हें एक संरचना में सहेज सकते हैं (इस उद्देश्य के लिए, एक
Action
प्रकार घोषित करें) और उन्हें बाद में निष्पादित करें, संभवतः एक अलग क्रम में।
data Action = Atom (IO Action) | forall a. ReadRef (MaybeRef a) (a -> Action) | forall a. WriteRef (MaybeRef a) a Action | Fork Action Action | Stop
आइए प्रत्येक निर्माता के साथ अलग से व्यवहार करें।
Stop
एक्शन का मतलब है कि गणना पूरी हो गई है।
Fork
कार्रवाई का अर्थ है कि गणना शाखा, यानी अब हमारे पास दो धागे हैं जिन्हें एक साथ निष्पादित किया जा सकता है।
Atom
एक्शन एक परमाणु आईओ ऑपरेशन करता है, हमारे पास नया
Action
होता है जिसमें एक्शन होता है, जिसे अगले चरण में किया जाना चाहिए।
उदाहरण के लिए:
getSum
फ़ंक्शन एक क्रिया को परिभाषित करता है जो कीबोर्ड से दो नंबर पढ़ता है, उनकी राशि प्रिंट करता है, और समाप्त होता है।
getSum :: Action getSum = Atom $ do x <- readLn -- return $ Atom $ do -- y <- readLn -- return $ Atom $ do -- print (x + y) -- return Stop --
अगला:
WriteRef ref val act
एक्शन
WriteRef ref val act
ref
लिंक पर
val
के मूल्य को रिकॉर्ड करता है, निरंतरता
act
।
ReadRef ref act
कार्रवाई
ReadRef ref act
संदर्भ
ref
द्वारा मूल्य पढ़ता है,
act
इस मूल्य को लेता है और एक निरंतरता देता है।
ताकि
Action
आप अनियंत्रित प्रकार के लिंक को बचा सकें, हम भाषा के एक और विस्तार का उपयोग करते हैं - अस्तित्वगत मात्रा का ठहराव।
MaybeRef
प्रकार उन लिंक के प्रकार का प्रतिनिधित्व करता है,
MVar
उपयोग हम
MVar
बजाय करेंगे, और इसे
Maybe
संदर्भ के रूप में परिभाषित किया गया है।
newtype MaybeRef a = MaybeRef (IORef (Maybe a))
अब हम अपने मठ को परिभाषित कर सकते हैं।
जैसा कि मैंने वादा किया था, हम सिर्फ
Cont
मोनड लपेटते हैं।
newtype Concurrent a = Concurrent (Cont Action a) deriving Monad
Cont Action
मोनड का आयोजन निम्नानुसार है।
एक प्रकार के मान को वापस करने के बजाय, यह प्रकार
(a -> Action)
की निरंतरता लेता है, इस फ़ंक्शन के लिए एक मान पारित करता है, और परिणाम लौटाता है।
अर्थात्, हम यह मान सकते हैं कि
Cont Action a = (a -> Action) -> Action
।
विशेष रूप से, हमारे पास निम्नलिखित फ़ंक्शन हैं जो अनुवाद करते हैं
(a -> Action) -> Action
इन
Cont Action a
और इसके विपरीत।
cont :: ((a -> Action) -> Action) -> Cont Action a. runCont :: Cont Action a -> (a -> Action) -> Action
अब हम
MonadIO
और
MonadConcurrent
एक उदाहरण को परिभाषित कर सकते हैं।
instance MonadIO Concurrent where liftIO m = Concurrent $ cont $ \c -> Atom $ do a <- m return (ca)
देखते हैं कि यहां क्या होता है।
liftIO
एक IO ऑपरेशन को स्वीकार करता है और इसे परमाणु कार्रवाई में लपेटता है। अर्थात्: हम
cont
एक फंक्शन पास करते हैं जो एक निरंतरता लेता है (वह यह है कि, c
a -> Action
प्रकार का
a -> Action
) और एक परमाणु क्रिया करता है जो IO ऑपरेशन
m
।
हमने
Atom
परिभाषित किया ताकि एक परमाणु ऑपरेशन एक
Action
लौटाए, जो एक निरंतरता है।
वास्तव में, यह वही है जो हम कर रहे हैं: जब हमने
m
निष्पादित किया है, तो हम
c
कॉल करते हैं, जो आवश्यक निरंतरता को वापस करता है।
अब
instance MonadConcurrent
को परिभाषित करें
instance MonadConcurrent
।
बस परिभाषित
liftIO
फ़ंक्शन का उपयोग करके
newCVar
में एक लिंक बनाएं।
takeCVar
और
putCVar
इसी कार्रवाई को वापस करते हैं, और हम इस संरचना के अंदर जारी रखते हैं।
कांटा में, हम उस कार्रवाई को वापस करते हैं जिसमें दोनों धागे संग्रहीत होते हैं: एक को
fork
फ़ंक्शन के तर्कों में पारित किया जाता है, दूसरा निरंतरता से आता है।
instance MonadConcurrent Concurrent where type CVar Concurrent = MaybeRef newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a) takeCVar v = Concurrent $ cont (ReadRef v) putCVar va = Concurrent $ cont $ \c -> WriteRef va $ c () fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()
हमारा मोनाड लगभग तैयार है, यह केवल यह सीखना है कि इसे कैसे लॉन्च किया जाए।
शुरुआत करने के लिए, हम एक फ़ंक्शन लिखेंगे जो
Action
लॉन्च करेगा। यह क्रियाओं की एक सूची लेता है, प्रत्येक तत्व जिसमें एक अलग धागा होता है।
कार्रवाई शुरू करने के लिए रणनीतियाँ अलग हो सकती हैं। हम दो बिंदुओं पर निर्णय लेंगे: थ्रेड्स को निष्पादित करने के लिए किस क्रम में, और यदि हम किसी वैरिएबल से मान को खाली करने का प्रयास करते हैं तो क्या करें। आपको याद दिला दूं कि चर में कुछ भी नहीं हो सकता है, और फिर हमें दूसरे धागे के लिए कुछ करने की प्रतीक्षा करनी होगी।
चलो पहले एक सरल संस्करण लिखते हैं जहां हम बदले में धागे निष्पादित करेंगे; और एक खाली चर से पढ़ने की कोशिश कर रहे धागे को कतार के अंत में ले जाया जाएगा।
runAction :: [Action] -> IO () -- , . runAction [] = return () -- , , , . runAction (Atom m : as) = do a' <- m runAction $ as ++ [a'] -- . runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2] -- . runAction (Stop : as) = runAction as runAction (ReadRef (MaybeRef ref) c : as) = do -- . ma <- readIORef ref case ma of -- -, Just a -> do -- . writeIORef ref Nothing -- . runAction (as ++ [ca]) -- , , . Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c]) -- , . runAction (WriteRef (MaybeRef ref) val a : as) = do writeIORef ref (Just val) runAction (as ++ [a])
ध्यान दें कि
putMVar
हमारे
WriteRef
कार्यान्वयन की तुलना में थोड़ा अलग काम करता है।
यदि लिंक में पहले से ही कुछ मूल्य था, तो
putMVar
धागे को तब तक फ्रीज
putMVar
जब तक कि चर को मुक्त नहीं किया जाता है। इस स्थिति में, मान को पुन: लिखें।
यह एक संस्करण बनाने के लायक नहीं है जो इस स्थिति में
putMVar
रूप में काम करता है, ताकि कोड को
putMVar
न करें।
अगला, एक फ़ंक्शन लिखें जो
Concurrent
, और
main
फिर से परिभाषित करता है।
runConcurrent :: Concurrent () -> IO () runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop] main = runConcurrent (runPhil 5)
चूंकि अब हम एक ही थ्रेड में काम कर रहे हैं, और
threadDelay
सभी काम करना बंद कर देता है, गति थोड़ी कम हो गई है।
लेखन परीक्षण
समय आ गया है कि "पकवान के लिए मसाला जोड़ें" - हमारे उदाहरण के लिए परीक्षण लिखें।
ऐसा करने के लिए, हम
QuickCheck
लाइब्रेरी का उपयोग करते हैं, जो परीक्षणों के लिए यादृच्छिक इनपुट उत्पन्न करता है। चूंकि हम अपने थ्रेड को अलग-अलग क्रम में चलाना चाहते हैं, हमारे परीक्षणों का इनपुट वह क्रम है जिसमें हम सूची से अगले थ्रेड का चयन करते हैं।
इनपुट को संख्याओं की सूची के साथ एनकोड करना संभव है, लेकिन समस्या यह है कि हम पहले से नहीं जानते हैं कि इन नंबरों को किस सीमा से चुना जाना चाहिए, क्योंकि थ्रेड्स की संख्या भिन्न हो सकती है।
इसलिए, हम इनपुट डेटा को
Int -> Int
के प्रकारों की एक सूची के साथ एन्कोड करेंगे, जो नंबर
n
लेते हैं और अंतराल से एक नंबर लौटाते हैं
[0,n-1]
।
newtype Route = Route [Int -> Int]
QuickCheck
लाइब्रेरी द्वारा प्रदान की जाने वाली
Arbitrary
क्लास का उद्देश्य उन प्रकारों का वर्णन करना है जो यादृच्छिक तत्वों को उत्पन्न करने की अनुमति देते हैं।
इस वर्ग में दो कार्य बताए गए हैं -
shrink
और
arbitrary
।
shrink
का डिफ़ॉल्ट कार्यान्वयन है, इसलिए हमने इसे फिर से परिभाषित नहीं किया है।
arbitrary
कार्य में, हम यादृच्छिक कार्यों की एक सूची तैयार करते हैं, जहाँ प्रत्येक फ़ंक्शन अंतराल से एक नंबर देता है
[0,n-1]
।
instance Arbitrary Route where arbitrary = fmap Route (listOf arbitraryFun) where arbitraryFun = MkGen $ \qsn -> unGen (choose (0, n - 1)) qs
हम
Route
लिए
instance Show
को भी परिभाषित करते हैं, क्योंकि
QuickCheck
को
QuickCheck
आवश्यकता
QuickCheck
।
दुर्भाग्य से, हम एक
show
भी उपयोगी नहीं लिख सकते
show
। इसके अलावा, इस फ़ंक्शन का उपयोग नहीं किया जाएगा, इसलिए हम इसे अपरिभाषित छोड़ देते हैं।
instance Show Route where show = undefined
अब आप
runAction
का एक स्मार्ट संस्करण लिखना शुरू कर सकते हैं।
पहला अंतर यह है कि हम परमाणु कार्यों के निष्पादन और लिंक के साथ काम को अलग करते हैं।
शुरू करने के लिए, हम एक सहायक फ़ंक्शन
skipAtoms
जो परमाणु क्रियाएं करता है: फ़ंक्शन क्रियाओं की एक सूची को स्वीकार करता है,
Atom
,
Fork
और
Stop
, और परिणामस्वरूप एक परिणाम देता है।
skipAtoms :: [Action] -> IO [Action] skipAtoms [] = return [] skipAtoms (Atom m : as) = do a <- m skipAtoms (as ++ [a]) skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2]) skipAtoms (Stop : as) = skipAtoms as skipAtoms (a : as) = fmap (a:) (skipAtoms as)
runAction
के नए संस्करण और पिछले एक के बीच दूसरा अंतर यह है कि हम गतिरोध की प्राप्ति को ट्रैक करते हैं।
ऐसा करने के लिए, क्रियाओं की दो सूचियाँ प्रारंभ करें। पहले भंडार सक्रिय (हमारे द्वारा निष्पादित) धागे। दूसरे में, किसी भी लिंक को अपडेट करने के लिए थ्रेड इंतजार कर रहे हैं।
यदि सक्रिय थ्रेड्स की सूची रिक्त है, लेकिन कोई प्रतीक्षा सूची नहीं है, तो हमें एक गतिरोध मिला, और इस मामले में हम एक अपवाद फेंकते हैं।
तीसरा नवाचार
Route
नंबर का एक तर्क है जिसका उपयोग धारा संख्या को चुनने के लिए किया जाता है जिसे वर्तमान चरण में किया जाना चाहिए।
runAction :: Route -> [Action] -> [Action] -> IO () runAction _ [] [] = return () runAction _ [] _ = fail "Deadlock" runAction (Route []) _ _ = return () runAction (Route (r:rs)) as bs = do as <- skipAtoms as let n = length as case splitAt (rn) as of (as1, ReadRef (MaybeRef ref) c : as2) -> do ma <- readIORef ref case ma of Just a -> do writeIORef ref Nothing runAction (Route rs) (as1 ++ [ca] ++ as2) bs Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c]) (as1, WriteRef (MaybeRef ref) xc : as2) -> do writeIORef ref (Just x) runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []
runConcurrent
फ़ंक्शन बहुत अधिक नहीं बदला गया है।
runConcurrent :: Route -> Concurrent () -> IO () runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []
आप पहले तर्क के रूप में
round_robin
पास करके यह जांच सकते हैं कि नया संस्करण कैसे काम करता है। यह एक साधारण निष्पादन रणनीति है, जो
runAction
फ़ंक्शन से पहले काम करता है। यहां हम बस एक अनंत सूची बनाते हैं और प्रत्येक तत्व के लिए हम शेष मॉडुलो को थ्रेड्स की संख्या लेते हैं।
round_robin :: Route round_robin = Route $ map rem [0..]
इस इनपुट पर गणनाओं को चलाने से, हमें जल्दी से गतिरोध प्राप्त होने की संभावना है - इस तथ्य के कारण कि हमारे उदाहरण का काम यादृच्छिक संख्या जनरेटर पर आधारित है - इसलिए, इस तथ्य के बावजूद कि इनपुट हमेशा समान होता है, निष्पादन आदेश निकलता है बेतरतीब ढंग से।
यदि हमारे उदाहरण अधिक निर्धारक थे, तो हमें इनपुट डेटा को यादृच्छिक रूप से अलग करना होगा, जो अब हम करेंगे।
main = quickCheck $ monadicIO $ do r <- pick arbitrary run $ runConcurrent r (runPhil 5)
हम पूर्व में लागू किए गए
arbitrary
फ़ंक्शन का उपयोग करके
Route
प्रकार का एक मनमाना तत्व चुनते हैं। फिर हम इस इनपुट पर अपनी गणना शुरू करते हैं।
QuickCheck
का ध्यान रखेगा, अर्थात्: यह हमारे परीक्षण को 100 बार चलाएगा, हर बार इनपुट डेटा के आकार को बढ़ाएगा।
कार्यक्रम को चलाने पर, हम निम्नलिखित देखेंगे:
... 3 took left fork 4 put forks 4 is awaiting 5 took left fork 4 took left fork 1 took right fork 1 put forks 1 is awaiting 1 took left fork 2 took left fork *** Failed! Exception: 'user error (Deadlock)' (after 36 tests):
क्या पाना जरूरी था!
निष्कर्ष
हमने उन परीक्षणों को लिखना सीखा जो एक बहु-थ्रेडेड एप्लिकेशन में गतिरोध की स्थिति का पता लगा सकते हैं।
इस प्रक्रिया में, हमने
Cont
मोनड, टाइप परिवारों, अस्तित्वगत मात्रा का उपयोग और
QuickCheck
लाइब्रेरी के उदाहरणों को देखा।
इसके अलावा, हमने सीखा कि कैसे कामचलाऊ सामग्री से बहु-थ्रेडेड प्रोग्राम निष्पादन के एक मॉडल को इकट्ठा करना है।