1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE NumericUnderscores #-}
module Main where
import Control.Concurrent
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Data.Aeson
import Data.Either
import Data.List
import Data.Sequence (Seq)
import GHC.Generics
import GHC.Stack
import Prelude
import System.IO.Unsafe
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Client (Manager)
import Test.Hspec
import qualified Servant.Job.Types as SJ
import qualified Servant.Job.Core as SJ
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
data Counts = Counts { countAs :: Int, countBs :: Int }
deriving (Eq, Show)
inc, dec :: JobT -> Counts -> Counts
inc A cs = cs { countAs = countAs cs + 1 }
inc B cs = cs { countBs = countBs cs + 1 }
dec A cs = cs { countAs = countAs cs - 1 }
dec B cs = cs { countBs = countBs cs - 1 }
jobDuration, initialDelay :: Int
jobDuration = 100000
initialDelay = 20000
testMaxRunners :: IO ()
testMaxRunners = do
-- max runners = 2 with default settings
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO []
let j num _jHandle _inp _l = do
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
threadDelay jobDuration
atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
jobs = [ j n | n <- [1..4::Int] ]
_jids <- forM jobs $ \f -> pushJob A () f settings st
threadDelay initialDelay
r1 <- readTVarIO runningJs
sort r1 `shouldBe` ["Job #1", "Job #2"]
threadDelay jobDuration
r2 <- readTVarIO runningJs
sort r2 `shouldBe` ["Job #3", "Job #4"]
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` []
testPrios :: IO ()
testPrios = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios [(B, 10)] defaultPrios -- B has higher priority
runningJs <- newTVarIO (Counts 0 0)
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (B, j B)
]
_jids <- forM jobs $ \(t, f) -> do
pushJob t () f settings st
threadDelay (2*initialDelay)
r1 <- readTVarIO runningJs
r1 `shouldBe` (Counts 0 2)
threadDelay jobDuration
r2 <- readTVarIO runningJs
r2 `shouldBe` (Counts 2 0)
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` (Counts 0 0)
testExceptions :: IO ()
testExceptions = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
threadDelay initialDelay
mjob <- lookupJob jid (jobsData st)
case mjob of
Nothing -> error "boo"
Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True
_ -> error "boo2"
return ()
testFairness :: IO ()
testFairness = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO (Counts 0 0)
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (A, j A)
, (A, j A)
]
_jids <- forM jobs $ \(t, f) -> do
pushJob t () f settings st
threadDelay initialDelay
r1 <- readTVarIO runningJs
r1 `shouldBe` (Counts 2 0)
threadDelay jobDuration
r2 <- readTVarIO runningJs
r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
-- two As, because it was inserted right after them
-- and has equal priority.
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` (Counts 1 0)
threadDelay jobDuration
r4 <- readTVarIO runningJs
r4 `shouldBe` (Counts 0 0)
data MyDummyJob
= MyDummyJob
deriving (Show, Eq, Ord, Enum, Bounded)
data MyDummyError
= SomethingWentWrong JobError
deriving (Show)
instance Exception MyDummyError where
toException _ = toException (userError "SomethingWentWrong")
instance ToJSON MyDummyError where
toJSON (SomethingWentWrong _) = String "SomethingWentWrong"
type Progress = Int
data MyDummyLog =
Step_0 !Progress
| Step_1 !Progress
deriving (Show, Eq, Ord, Generic)
instance Monoid MyDummyLog where
mempty = Step_0 0
instance Semigroup MyDummyLog where
_ <> _ = error "not needed"
instance ToJSON MyDummyLog
newtype MyDummyEnv = MyDummyEnv { _MyDummyEnv :: JobEnv MyDummyJob (Seq MyDummyLog) () }
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv)
runMyDummyMonad :: MyDummyEnv -> MyDummyMonad a -> IO a
runMyDummyMonad env = flip runReaderT env . _MyDummyMonad
instance MonadJob MyDummyMonad MyDummyJob (Seq MyDummyLog) () where
getJobEnv = asks _MyDummyEnv
instance MonadJobStatus MyDummyMonad where
type JobType MyDummyMonad = MyDummyJob
type JobOutputType MyDummyMonad = ()
type JobEventType MyDummyMonad = MyDummyLog
testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}
shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
shouldBeE a b = liftIO (shouldBe a b)
type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
withJob :: TheEnv
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO (SJ.JobStatus 'SJ.Safe MyDummyLog)
withJob myEnv f = runMyDummyMonad (MyDummyEnv myEnv) $
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
runMyDummyMonad (MyDummyEnv myEnv) $ f env hdl input) (SJ.JobInput () Nothing)
withJob_ :: TheEnv
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO ()
withJob_ env f = void (withJob env f)
testFetchJobStatus :: IO ()
testFetchJobStatus = do
k <- genSecret
let settings = defaultJobSettings 2 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts <- newMVar []
withJob_ myEnv $ \_ hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
updateJobProgress hdl (const $ Step_0 20)
mb_status' <- getLatestJobStatus hdl
updateJobProgress hdl (\(Step_0 x) -> Step_0 (x + 5))
mb_status'' <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure $ Right ()
threadDelay 500_000
-- Check the events
readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just (Step_0 20), Just (Step_0 25)]
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
k <- genSecret
let settings = defaultJobSettings 2 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_1 100)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure $ Right ()
let job2 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_0 50)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure $ Right ()
Async.forConcurrently_ [job1, job2] ($ ())
threadDelay 500_000
-- Check the events
readMVar evts1 >>= \expected -> expected `shouldBe` [Just (Step_1 100)]
readMVar evts2 >>= \expected -> expected `shouldBe` [Just (Step_0 50)]
main :: IO ()
main = hspec $ do
describe "job queue" $ do
it "respects max runners limit" $
testMaxRunners
it "respects priorities" $
testPrios
it "can handle exceptions" $
testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $
testFairness
describe "job status update and tracking" $ do
it "can fetch the latest job status" $
testFetchJobStatus
it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention