Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds random scheduling via threadDelay #130

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion io-sim/io-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ library
si-timers ^>=1.3,
time >=1.9.1 && <1.13,
quiet,
random,
QuickCheck,


Expand Down Expand Up @@ -113,7 +114,8 @@ test-suite test
tasty,
tasty-quickcheck,
tasty-hunit,
time
time,
random
ghc-options: -fno-ignore-asserts
-rtsopts
if impl(ghc >= 9.8)
Expand Down
21 changes: 11 additions & 10 deletions io-sim/src/Control/Monad/IOSim.hs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ import Test.QuickCheck

import System.IO.Unsafe
import qualified Debug.Trace as Debug
import System.Random (StdGen)


selectTraceEvents
Expand Down Expand Up @@ -394,28 +395,28 @@ instance Exception Failure where
, "please report the issue at\n"
, "https://github.com/input-output-hk/io-sim/issues"
]


-- | 'IOSim' is a pure monad.
--
runSim :: forall a. (forall s. IOSim s a) -> Either Failure a
runSim mainAction = traceResult False (runSimTrace mainAction)
runSim :: forall a. StdGen -> (forall s. IOSim s a) -> Either Failure a
runSim stdGen mainAction = traceResult False (runSimTrace stdGen mainAction)

-- | For quick experiments and tests it is often appropriate and convenient to
-- simply throw failures as exceptions.
--
runSimOrThrow :: forall a. (forall s. IOSim s a) -> a
runSimOrThrow mainAction =
case runSim mainAction of
runSimOrThrow :: forall a. StdGen -> (forall s. IOSim s a) -> a
runSimOrThrow stdGen mainAction =
case runSim stdGen mainAction of
Left e -> throw e
Right x -> x

-- | Like 'runSim' but fail when the main thread terminates if there are other
-- threads still running or blocked. If one is trying to follow a strict thread
-- clean-up policy then this helps testing for that.
--
runSimStrictShutdown :: forall a. (forall s. IOSim s a) -> Either Failure a
runSimStrictShutdown mainAction = traceResult True (runSimTrace mainAction)
runSimStrictShutdown :: forall a. StdGen -> (forall s. IOSim s a) -> Either Failure a
runSimStrictShutdown stdGen mainAction = traceResult True (runSimTrace stdGen mainAction)

-- | Fold through the trace and return either a 'Failure' or the simulation
-- result, i.e. the return value of the main thread.
Expand Down Expand Up @@ -484,8 +485,8 @@ ppEvents events =

-- | See 'runSimTraceST' below.
--
runSimTrace :: forall a. (forall s. IOSim s a) -> SimTrace a
runSimTrace mainAction = runST (runSimTraceST mainAction)
runSimTrace :: forall a. StdGen -> (forall s. IOSim s a) -> SimTrace a
runSimTrace stdGen mainAction = runST (runSimTraceST stdGen mainAction)

--
-- IOSimPOR
Expand Down
105 changes: 84 additions & 21 deletions io-sim/src/Control/Monad/IOSim/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ module Control.Monad.IOSim.Internal

import Prelude hiding (read)

import Data.Deque.Strict (Deque)
import qualified Data.Deque.Strict as Deque
import Data.Dynamic
import Data.Foldable (foldlM, toList, traverse_)
import qualified Data.List as List
Expand All @@ -60,8 +62,6 @@ import qualified Data.OrdPSQ as PSQ
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Time (UTCTime (..), fromGregorian)
import Data.Deque.Strict (Deque)
import qualified Data.Deque.Strict as Deque

import Control.Exception (NonTermination (..), assert, throw)
import Control.Monad (join, when)
Expand All @@ -76,12 +76,14 @@ import Control.Monad.Class.MonadSTM hiding (STM)
import Control.Monad.Class.MonadSTM.Internal (TMVarDefault (TMVar))
import Control.Monad.Class.MonadThrow hiding (getMaskingState)
import Control.Monad.Class.MonadTime
import Control.Monad.Class.MonadTimer.SI (TimeoutState (..))
import Control.Monad.Class.MonadTimer.SI (DiffTime, TimeoutState (..),
diffTimeToMicrosecondsAsInt, microsecondsAsIntToDiffTime)

import Control.Monad.IOSim.InternalTypes
import Control.Monad.IOSim.Types hiding (SimEvent (SimPOREvent),
Trace (SimPORTrace))
import Control.Monad.IOSim.Types (SimEvent)
import System.Random (StdGen, randomR, split)

--
-- Simulation interpreter
Expand Down Expand Up @@ -150,19 +152,21 @@ data SimState s a = SimState {
-- | list of clocks
clocks :: !(Map ClockId UTCTime),
nextVid :: !TVarId, -- ^ next unused 'TVarId'
nextTmid :: !TimeoutId -- ^ next unused 'TimeoutId'
nextTmid :: !TimeoutId, -- ^ next unused 'TimeoutId'
stdGen :: !StdGen
}

initialState :: SimState s a
initialState =
initialState :: StdGen -> SimState s a
initialState stdGen =
SimState {
runqueue = mempty,
threads = Map.empty,
curTime = Time 0,
timers = PSQ.empty,
clocks = Map.singleton (ClockId []) epoch1970,
nextVid = TVarId 0,
nextTmid = TimeoutId 0
nextTmid = TimeoutId 0,
stdGen = stdGen
}
where
epoch1970 = UTCTime (fromGregorian 1970 1 1) 0
Expand All @@ -189,6 +193,42 @@ invariant Nothing SimState{runqueue,threads,clocks} =
timeSinceEpoch :: Time -> NominalDiffTime
timeSinceEpoch (Time t) = fromRational (toRational t)

-- | This function receives a delay and adds jitter to it. The amount of
-- jitter added is proportional to how large the delay is so to not greatly
-- affect the indended behaviour of the function that calls it.
--
-- This function is used in order to introduce random delays between
-- concurrent threads so that different thread schedulings might be found.
--
-- This approach is nice because, since time is perfect (due to infinite
-- processing power of IOSim), IOSim will be able to introduce slight delays
-- that might lead to threads being scheduled differently.
--
-- Note that this only enables IOSim to explore different thread schedules for
-- concurrent threads blocked on 'threadDelay'. For threads blocked on STM
-- IOSim employs a way to awake threads in a pseudo random way.
--
-- Also note that it is safe to add jitter to 'threadDelay' because we only
-- have to guarantee that the thread is not woken up earlier than the delay
-- specified.
--
jitterDelay :: StdGen -> DiffTime -> DiffTime
jitterDelay stdGen d =
let -- Convert delay from DiffTime to picoseconds
delayInMicrosecondsAsInt = diffTimeToMicrosecondsAsInt d

-- Define the maximum jitter as a percentage of the delay
-- For example, 10% of the delay
maxJitter = delayInMicrosecondsAsInt `div` 10

-- Generate a random jitter value within the range
(jitterInMicrosecondsAsInt, _) = randomR (0, maxJitter) stdGen

-- Convert jitter back to DiffTime
jitter = microsecondsAsIntToDiffTime jitterInMicrosecondsAsInt

in -- Add jitter to the original delay
d + jitter

-- | Schedule / run a thread.
--
Expand All @@ -205,7 +245,8 @@ schedule !thread@Thread{
timers,
clocks,
nextVid, nextTmid,
curTime = time
curTime = time,
stdGen
} =
invariant (Just thread) simstate $
case action of
Expand Down Expand Up @@ -390,12 +431,15 @@ schedule !thread@Thread{
!tvar <- execNewTVar nextVid
(Just $! "<<timeout " ++ show (unTimeoutId nextTmid) ++ ">>")
False
let !expiry = d `addTime` time
let !expiry = jitterDelay stdGen d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerRegisterDelay tvar) timers
!thread' = thread { threadControl = ThreadControl (k tvar) ctl }
(_, !stdGen') = split stdGen
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ nextVid
, nextTmid = succ nextTmid }
, nextTmid = succ nextTmid
, stdGen = stdGen'
}
return (SimTrace time tid tlbl
(EventRegisterDelayCreated nextTmid nextVid expiry) trace)

Expand All @@ -409,11 +453,14 @@ schedule !thread@Thread{
trace)

ThreadDelay d k -> do
let !expiry = d `addTime` time
let !expiry = jitterDelay stdGen d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerThreadDelay tid nextTmid) timers
!thread' = thread { threadControl = ThreadControl (Return ()) (DelayFrame nextTmid k ctl) }
(_, !stdGen') = split stdGen
!trace <- deschedule (Blocked BlockedOnDelay) thread' simstate { timers = timers'
, nextTmid = succ nextTmid }
, nextTmid = succ nextTmid
, stdGen = stdGen'
}
return (SimTrace time tid tlbl (EventThreadDelay nextTmid expiry) trace)

-- we treat negative timers as cancelled ones; for the record we put
Expand All @@ -432,13 +479,16 @@ schedule !thread@Thread{
!tvar <- execNewTVar nextVid
(Just $! "<<timeout-state " ++ show (unTimeoutId nextTmid) ++ ">>")
TimeoutPending
let !expiry = d `addTime` time
let !expiry = jitterDelay stdGen d `addTime` time
!t = Timeout tvar nextTmid
!timers' = PSQ.insert nextTmid expiry (Timer tvar) timers
!thread' = thread { threadControl = ThreadControl (k t) ctl }
(_, !stdGen') = split stdGen
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ nextVid
, nextTmid = succ nextTmid }
, nextTmid = succ nextTmid
, stdGen = stdGen'
}
return (SimTrace time tid tlbl (EventTimerCreated nextTmid nextVid expiry) trace)

CancelTimeout (Timeout tvar tmid) k -> do
Expand Down Expand Up @@ -800,12 +850,13 @@ reschedule !simstate@SimState{ threads, timers, curTime = time } =
timeoutSTMAction TimerTimeout{} = return ()

unblockThreads :: Bool -> [IOSimThreadId] -> SimState s a -> ([IOSimThreadId], SimState s a)
unblockThreads !onlySTM !wakeup !simstate@SimState {runqueue, threads} =
unblockThreads !onlySTM !wakeup simstate@SimState {runqueue, threads, stdGen} =
-- To preserve our invariants (that threadBlocked is correct)
-- we update the runqueue and threads together here
(unblocked, simstate {
runqueue = runqueue <> Deque.fromList unblocked,
threads = threads'
runqueue = Deque.fromList shuffledUnblocked <> runqueue,
threads = threads',
stdGen = stdGen''
})
where
-- can only unblock if the thread exists and is blocked (not running)
Expand All @@ -818,12 +869,24 @@ unblockThreads !onlySTM !wakeup !simstate@SimState {runqueue, threads} =
-> not onlySTM
_ -> False
]

(!shuffledUnblocked, !stdGen'') = shuffle unblocked stdGen

-- and in which case we mark them as now running
!threads' = List.foldl'
(flip (Map.adjust (\t -> t { threadStatus = ThreadRunning })))
threads
unblocked

shuffle :: [a] -> StdGen -> ([a], StdGen)
shuffle xs0 gen0 = go (length xs0) xs0 gen0
where
go 0 xs gen = (xs, gen)
go n xs gen = let (k, newGen) = randomR (0, n-1) gen
(left, selected:right) = splitAt k xs
(shuffled, finalGen) = go (n-1) (left ++ right) newGen
in (selected:shuffled, finalGen)

-- | This function receives a list of TimerTimeout values that represent threads
-- for which the timeout expired and kills the running thread if needed.
--
Expand Down Expand Up @@ -861,9 +924,9 @@ forkTimeoutInterruptThreads timeoutExpired simState =
where
-- we launch a thread responsible for throwing an AsyncCancelled exception
-- to the thread which timeout expired
throwToThread :: [(Thread s a, TMVar (IOSim s) IOSimThreadId)]
throwToThread :: [(Thread s a, TMVar (IOSim s) IOSimThreadId)]

(simState', throwToThread) = List.mapAccumR fn simState timeoutExpired
(simState', throwToThread) = List.mapAccumR fn simState timeoutExpired
where
fn :: SimState s a
-> (IOSimThreadId, TimeoutId, TMVar (IOSim s) IOSimThreadId)
Expand Down Expand Up @@ -997,8 +1060,8 @@ lookupThreadLabel tid threads = join (threadLabel <$> Map.lookup tid threads)
-- computation with 'Control.Monad.IOSim.traceEvents'. A slightly more
-- convenient way is exposed by 'Control.Monad.IOSim.runSimTrace'.
--
runSimTraceST :: forall s a. IOSim s a -> ST s (SimTrace a)
runSimTraceST mainAction = schedule mainThread initialState
runSimTraceST :: forall s a. StdGen -> IOSim s a -> ST s (SimTrace a)
runSimTraceST stdGen mainAction = schedule mainThread (initialState stdGen)
where
mainThread =
Thread {
Expand Down
3 changes: 3 additions & 0 deletions io-sim/src/Data/Deque/Strict.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ uncons = \case

filter :: (a -> Bool) -> Deque a -> Deque a
filter f (Deque head tail) = Deque (List.filter f head) (List.filter f tail)

toList :: Deque a -> [a]
toList = foldr (:) []
Loading
Loading