1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
{-|
Module : Test.Utils.Jobs
Description :
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Test.Utils.Jobs (test) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM
import Data.Sequence ((|>), fromList)
import Data.Time
import Debug.RecoverRTTI (anythingToString)
import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
import Network.HTTP.Client (Manager)
import Network.HTTP.Client.TLS (newTlsManager)
import Prelude qualified
import Servant.Job.Core qualified as SJ
import Servant.Job.Types qualified as SJ
import System.IO.Unsafe
import Test.Hspec
import Test.Hspec.Expectations.Contrib (annotate)
import Test.Utils (waitUntil)
data JobT = A
| B
| C
| D
deriving (Eq, Ord, Show, Enum, Bounded)
-- | This type models the schedule picked up by the orchestrator.
newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)
addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
addJobToSchedule jobt mvar = do
modifyMVar_ mvar $ \js -> do
let js' = js { _JobSchedule = _JobSchedule js |> jobt }
pure js'
data Counts = Counts { countAs :: Int, countBs :: Int }
deriving (Eq, Show)
-- | In ms
jobDuration :: Int
jobDuration = 100
type Timer = TVar Bool
-- | Use in conjuction with 'registerDelay' to create an 'STM' transaction
-- that will simulate the duration of a job by waiting the timeout registered
-- by 'registerDelay' before continuing.
waitTimerSTM :: Timer -> STM ()
waitTimerSTM tv = do
v <- readTVar tv
check v
-- | Samples the running jobs from the first 'TVar' and write them
-- in the queue.
sampleRunningJobs :: Timer -> TVar [Prelude.String] -> TQueue [Prelude.String]-> STM ()
sampleRunningJobs timer runningJs samples = do
waitTimerSTM timer
runningNow <- readTVar runningJs
case runningNow of
[] -> pure () -- ignore empty runs, when the system is kickstarting.
xs -> writeTQueue samples xs
-- | The aim of this test is to ensure that the \"max runners\" setting is
-- respected, i.e. we have no more than \"N\" jobs running at the same time.
testMaxRunners :: IO ()
testMaxRunners = do
-- max runners = 2 with default settings
let num_jobs = 4
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
now <- getCurrentTime
runningJs <- newTVarIO []
samples <- newTQueueIO
remainingJs <- newTVarIO num_jobs
-- Not the most elegant solution, but in order to test the \"max runners\"
-- parameter we start an asynchronous computation that continuously reads the content
-- of 'runningJs' and at the end ensures that this value was
-- always <= \"max_runners" (but crucially not 0).
asyncReader <- async $ forever $ do
samplingFrequency <- registerDelay 100_000
atomically $ sampleRunningJobs samplingFrequency runningJs samples
let duration = 1_000_000
j num _jHandle _inp _l = do
durationTimer <- registerDelay duration
-- NOTE: We do the modification of the 'runningJs' and the rest
-- in two transactions on purpose, to give a chance to the async
-- sampler to sample the status of the world.
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
atomically $ do
waitTimerSTM durationTimer
modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
modifyTVar remainingJs pred
jobs = [ (A, j n) | n <- [1..num_jobs::Int] ]
atomically $ forM_ jobs $ \(t, f) -> void $
pushJobWithTime now t () f settings st
let waitFinished = atomically $ do
x <- readTVar remainingJs
check (x == 0)
-- Wait for the jobs to finish, then stop the sampler.
waitFinished
cancel asyncReader
-- Check that we got /some/ samples and for each of them,
-- let's check only two runners at max were alive.
allSamples <- atomically $ flushTQueue samples
length allSamples `shouldSatisfy` (> 0)
forM_ allSamples $ \runLog -> do
annotate "predicate to satisfy: (x `isInfixOf` [\"Job #1\", \"Job #2\"] || x `isInfixOf` [\"Job #3\", \"Job #4\"]" $
shouldSatisfy (sort runLog)
(\x -> x `isInfixOf` ["Job #1", "Job #2"]
|| x `isInfixOf` ["Job #3", "Job #4"])
testPrios :: IO ()
testPrios = do
k <- genSecret
-- Use a single runner, so that we can check the order of execution
-- without worrying about the runners competing with each other.
let settings = defaultJobSettings 1 k
prios = [(B, 10), (C, 1), (D, 5)]
st :: JobsState JobT [Prelude.String] () <- newJobsState settings $
applyPrios prios defaultPrios -- B has the highest priority
pickedSchedule <- newMVar (JobSchedule mempty)
let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
jobs = [ (A, j A)
, (C, j C)
, (B, j B)
, (D, j D)
]
-- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
-- the time 'popQueue' gets called.
now <- getCurrentTime
atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
-- wait for the jobs to finish, waiting for more than the total duration,
-- so that we are sure that all jobs have finished, then check the schedule.
-- threadDelay jobDuration
waitUntil (do
finalSchedule <- readMVar pickedSchedule
pure $ finalSchedule == JobSchedule (fromList [B, D, C, A])) jobDuration
testExceptions :: IO ()
testExceptions = do
k <- genSecret
let settings = defaultJobSettings 1 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
-- Wait 1 second to make sure the job is finished.
threadDelay $ 1_000_000
mjob <- lookupJob jid (jobsData st)
case mjob of
Nothing -> Prelude.fail "lookupJob failed, job not found!"
Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True
unexpected -> Prelude.fail $ "Expected job to be done, but got: " <> anythingToString unexpected
return ()
testFairness :: IO ()
testFairness = do
k <- genSecret
let settings = defaultJobSettings 1 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
pickedSchedule <- newMVar (JobSchedule mempty)
let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (A, j A)
, (A, j A)
]
time <- getCurrentTime
-- in this scenario we simulate two types of jobs all with
-- all the same level of priority: our queue implementation
-- will behave as a classic FIFO, keeping into account the
-- time of arrival.
atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
-- threadDelay jobDuration
waitUntil (do
finalSchedule <- readMVar pickedSchedule
pure $ finalSchedule == JobSchedule (fromList [A, A, B, A, A])) jobDuration
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: GargM Env BackendInternalError a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
getJobEnv = MyDummyMonad getJobEnv
instance MonadJobStatus MyDummyMonad where
type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle BackendInternalError
type JobType MyDummyMonad = GargJob
type JobOutputType MyDummyMonad = JobLog
type JobEventType MyDummyMonad = JobLog
noJobHandle _ = noJobHandle (Proxy :: Proxy (GargM Env BackendInternalError))
getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
withTracer _ jh n = n jh
markStarted n jh = MyDummyMonad (markStarted n jh)
markProgress steps jh = MyDummyMonad (markProgress steps jh)
markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
markComplete jh = MyDummyMonad (markComplete jh)
markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
addMoreSteps steps jh = MyDummyMonad (addMoreSteps steps jh)
runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
runMyDummyMonad env m = do
res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
case res of
Left e -> throwIO e
Right x -> pure x
testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}
withJob :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO (SJ.JobStatus 'SJ.Safe JobLog)
withJob env f = runMyDummyMonad env $ MyDummyMonad $
-- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
newJob @_ mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
withJob_ :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO ()
withJob_ env f = void (withJob env f)
newTestEnv :: IO Env
newTestEnv = do
k <- genSecret
let settings = defaultJobSettings 1 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
pure $ Env
{ _env_settings = Prelude.error "env_settings not needed, but forced somewhere (check StrictData)"
, _env_logger = Prelude.error "env_logger not needed, but forced somewhere (check StrictData)"
, _env_pool = Prelude.error "env_pool not needed, but forced somewhere (check StrictData)"
, _env_nodeStory = Prelude.error "env_nodeStory not needed, but forced somewhere (check StrictData)"
, _env_manager = testTlsManager
, _env_self_url = Prelude.error "self_url not needed, but forced somewhere (check StrictData)"
, _env_scrapers = Prelude.error "scrapers not needed, but forced somewhere (check StrictData)"
, _env_jobs = myEnv
, _env_config = Prelude.error "config not needed, but forced somewhere (check StrictData)"
, _env_mail = Prelude.error "mail not needed, but forced somewhere (check StrictData)"
, _env_nlp = Prelude.error "nlp not needed, but forced somewhere (check StrictData)"
, _env_central_exchange = Prelude.error "central exchange not needed, but forced somewhere (check StrictData)"
, _env_dispatcher = Prelude.error "dispatcher not needed, but forced somewhere (check StrictData)"
}
testFetchJobStatus :: IO ()
testFetchJobStatus = do
myEnv <- newTestEnv
evts <- newMVar []
withJob_ myEnv $ \hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
markStarted 10 hdl
mb_status' <- getLatestJobStatus hdl
markProgress 5 hdl
mb_status'' <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure ()
-- threadDelay 500_000
-- Check the events
-- readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
waitUntil (do
evts' <- readMVar evts
pure $ map _scst_remaining evts' == [Nothing, Just 10, Just 5]
) 1000
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
myEnv <- newTestEnv
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 100 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure ()
let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 50 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure ()
Async.forConcurrently_ [job1, job2] ($ ())
-- threadDelay 500_000
-- Check the events
waitUntil (do
evts1' <- readMVar evts1
evts2' <- readMVar evts2
pure $ (map _scst_remaining evts1' == [Just 100]) &&
(map _scst_remaining evts2' == [Just 50])
) 500
testMarkProgress :: IO ()
testMarkProgress = do
myEnv <- newTestEnv
evts <- newTBQueueIO 7
let getStatus hdl = do
liftIO $ threadDelay 100_000
st <- getLatestJobStatus hdl
liftIO $ atomically $ writeTBQueue evts st
readAllEvents = do
allEventsArrived <- isFullTBQueue evts
if allEventsArrived then flushTBQueue evts else retry
withJob_ myEnv $ \hdl _input -> do
markStarted 10 hdl
getStatus hdl
markProgress 1 hdl
getStatus hdl
markFailureNoErr 1 hdl
getStatus hdl
markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
getStatus hdl
markComplete hdl
getStatus hdl
markStarted 5 hdl
markProgress 1 hdl
getStatus hdl
markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
getStatus hdl
[jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents
-- Check the events are what we expect
jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 9
, _scst_events = Just []
}
jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 1
, _scst_remaining = Just 8
, _scst_events = Just []
}
jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 2
, _scst_remaining = Just 7
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
, _scst_failed = Just 2
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 4
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "kaboom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
test :: Spec
test = do
describe "job queue" $ do
it "respects max runners limit" $
testMaxRunners
it "respects priorities" $
testPrios
it "can handle exceptions" $
testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $
testFairness
describe "job status update and tracking" $ do
it "can fetch the latest job status" $
testFetchJobStatus
it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention
it "marking stuff behaves as expected" $
testMarkProgress