Skip to content

Commit ef1cec9

Browse files
committed
Catch exceptions in jobs, so worker continues.
For now, we just print to stdout what the exception was. Relates to #1.
1 parent 51f99a4 commit ef1cec9

File tree

3 files changed

+37
-29
lines changed

3 files changed

+37
-29
lines changed

periodic.cabal

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ test-suite periodic-test
3434
, time
3535
, hedis
3636
, hspec
37+
, cereal
3738
ghc-options: -threaded -rtsopts -with-rtsopts=-N
3839
default-language: Haskell2010
3940

src/System/Periodic.hs

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
{-# LANGUAGE OverloadedStrings #-}
22
{-# LANGUAGE PartialTypeSignatures #-}
3+
{-# LANGUAGE ScopedTypeVariables #-}
34

45
module System.Periodic where
56

6-
import Control.Concurrent (threadDelay)
7+
import Control.Concurrent (forkIO, threadDelay)
78
import Control.Concurrent.MVar
9+
import Control.Exception (SomeException, catch)
810
import Control.Monad (forever, join, when)
911
import Data.Monoid
1012
import Data.Ratio
@@ -126,9 +128,13 @@ tryRunTask timeout pname rconn now (name, period, task) =
126128
,maybe "0" renderUnixTime lastStarted
127129
,renderUnixTime now])
128130
when gotLock $
129-
do task
130-
-- TODO(dbp 2016-05-26): Run task in
131-
-- thread to handle failure, log it
132-
-- somehow...
131+
do x <- newEmptyMVar
132+
jt <- forkIO (catch (task >> putMVar x Nothing)
133+
(\(e::SomeException) ->
134+
putMVar x (Just e)))
135+
res <- takeMVar x
136+
case res of
137+
Just e -> print $ "Exception raised: " <> show e
138+
Nothing -> return ()
133139
R.runRedis rconn $ R.del [lockedKey pname name]
134140
return ()

test/Spec.hs

+25-24
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
{-# LANGUAGE DeriveGeneric #-}
21
{-# LANGUAGE MultiParamTypeClasses #-}
32
{-# LANGUAGE OverloadedStrings #-}
43
import Control.Concurrent (forkIO, killThread, threadDelay)
54
import Control.Concurrent.MVar (MVar, modifyMVarMasked_, newMVar,
65
readMVar, takeMVar)
6+
import Control.Monad (replicateM)
77
import qualified Data.Text as T
88
import Data.Time.Calendar
99
import Data.Time.Clock
@@ -19,6 +19,10 @@ date y m d = UTCTime (fromGregorian y m d) 0
1919
time :: UTCTime -> Int -> Int -> Int -> UTCTime
2020
time t h m s = t { utctDayTime = fromIntegral $ h * 60 * 60 + m * 60 + s }
2121

22+
runNThreads :: Int -> Int -> Scheduler -> IO ()
23+
runNThreads n delay sched = do threads <- replicateM n (forkIO $ run sched)
24+
threadDelay delay
25+
mapM_ killThread threads
2226
main :: IO ()
2327
main = hspec $
2428
do describe "shouldRun" $
@@ -39,9 +43,8 @@ main = hspec $
3943
scheduler <- create (Name "simple-1") rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000))
4044
addTask scheduler "job" (Every (Seconds 100)) (modifyMVarMasked_ mvar (return . (+1)))
4145

42-
wthread <- forkIO (run scheduler)
43-
threadDelay 30000
44-
killThread wthread
46+
runNThreads 1 30000 scheduler
47+
4548
destroy scheduler
4649
v <- takeMVar mvar
4750
1 `shouldBe` v
@@ -50,13 +53,7 @@ main = hspec $
5053
rconn <- R.connect R.defaultConnectInfo
5154
scheduler <- create (Name "simple-2") rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000))
5255
addTask scheduler "job" (Every (Seconds 100)) (modifyMVarMasked_ mvar (return . (+1)))
53-
wthread1 <- forkIO (run scheduler)
54-
wthread2 <- forkIO (run scheduler)
55-
wthread3 <- forkIO (run scheduler)
56-
threadDelay 100000
57-
killThread wthread1
58-
killThread wthread2
59-
killThread wthread3
56+
runNThreads 3 100000 scheduler
6057
destroy scheduler
6158
v <- takeMVar mvar
6259
v `shouldBe` 1
@@ -65,13 +62,7 @@ main = hspec $
6562
rconn <- R.connect R.defaultConnectInfo
6663
scheduler <- create (Name "simple-3") rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000))
6764
addTask scheduler "job" (Every (Seconds 2)) (modifyMVarMasked_ mvar (return . (+1)))
68-
wthread1 <- forkIO (run scheduler)
69-
wthread2 <- forkIO (run scheduler)
70-
wthread3 <- forkIO (run scheduler)
71-
threadDelay 4000000
72-
killThread wthread1
73-
killThread wthread2
74-
killThread wthread3
65+
runNThreads 3 4000000 scheduler
7566
destroy scheduler
7667
v <- takeMVar mvar
7768
-- NOTE(dbp 2016-05-26): Precise timing is hard
@@ -83,9 +74,7 @@ main = hspec $
8374
scheduler <- create (Name "simple-4") rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000))
8475
seconds <- utctDayTime <$> getCurrentTime
8576
addTask scheduler "job" (Daily (Time seconds)) (modifyMVarMasked_ mvar (return . (+1)))
86-
wthread <- forkIO (run scheduler)
87-
threadDelay 4000000
88-
killThread wthread
77+
runNThreads 1 4000000 scheduler
8978
destroy scheduler
9079
v <- takeMVar mvar
9180
v `shouldBe` 1
@@ -96,9 +85,21 @@ main = hspec $
9685
now <- getCurrentTime
9786
let seconds = utctDayTime $ addUTCTime 3600 now
9887
addTask scheduler "job" (Daily (Time seconds)) (modifyMVarMasked_ mvar (return . (+1)))
99-
wthread <- forkIO (run scheduler)
100-
threadDelay 2000000
101-
killThread wthread
88+
runNThreads 1 2000000 scheduler
10289
destroy scheduler
10390
v <- takeMVar mvar
10491
v `shouldBe` 0
92+
describe "error handling" $
93+
do it "should keep running jobs if one throws an exception" $
94+
do mvar <- newMVar 0
95+
rconn <- R.connect R.defaultConnectInfo
96+
scheduler <- create (Name "error-1") rconn (CheckInterval (Seconds 1)) (LockTimeout (Seconds 1000))
97+
addTask scheduler "job-error" (Every (Seconds 100)) (error "blowing up")
98+
thread <- forkIO $ run scheduler
99+
threadDelay 300000
100+
addTask scheduler "job-success" (Every (Seconds 100)) (modifyMVarMasked_ mvar (return . (+1)))
101+
threadDelay 2000000
102+
killThread thread
103+
destroy scheduler
104+
v <- takeMVar mvar
105+
1 `shouldBe` v

0 commit comments

Comments
 (0)