1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE NumericUnderscores #-}
module Main where
import Control.Concurrent
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Except
import Data.Either
import Data.List
import Data.Sequence (Seq)
import GHC.Stack
import Prelude
import System.IO.Unsafe
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Client (Manager)
import Test.Hspec
import qualified Servant.Job.Types as SJ
import qualified Servant.Job.Core as SJ
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
import Gargantext.API.Prelude
import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.API.Admin.Orchestrator.Types
data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
data Counts = Counts { countAs :: Int, countBs :: Int }
deriving (Eq, Show)
inc, dec :: JobT -> Counts -> Counts
inc A cs = cs { countAs = countAs cs + 1 }
inc B cs = cs { countBs = countBs cs + 1 }
dec A cs = cs { countAs = countAs cs - 1 }
dec B cs = cs { countBs = countBs cs - 1 }
jobDuration, initialDelay :: Int
jobDuration = 100000
initialDelay = 20000
testMaxRunners :: IO ()
testMaxRunners = do
-- max runners = 2 with default settings
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO []
let j num _jHandle _inp _l = do
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
threadDelay jobDuration
atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
jobs = [ j n | n <- [1..4::Int] ]
_jids <- forM jobs $ \f -> pushJob A () f settings st
threadDelay initialDelay
r1 <- readTVarIO runningJs
sort r1 `shouldBe` ["Job #1", "Job #2"]
threadDelay jobDuration
r2 <- readTVarIO runningJs
sort r2 `shouldBe` ["Job #3", "Job #4"]
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` []
testPrios :: IO ()
testPrios = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios [(B, 10)] defaultPrios -- B has higher priority
runningJs <- newTVarIO (Counts 0 0)
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (B, j B)
]
_jids <- forM jobs $ \(t, f) -> do
pushJob t () f settings st
threadDelay (2*initialDelay)
r1 <- readTVarIO runningJs
r1 `shouldBe` (Counts 0 2)
threadDelay jobDuration
r2 <- readTVarIO runningJs
r2 `shouldBe` (Counts 2 0)
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` (Counts 0 0)
testExceptions :: IO ()
testExceptions = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
threadDelay initialDelay
mjob <- lookupJob jid (jobsData st)
case mjob of
Nothing -> error "boo"
Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True
_ -> error "boo2"
return ()
testFairness :: IO ()
testFairness = do
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO (Counts 0 0)
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (A, j A)
, (A, j A)
]
_jids <- forM jobs $ \(t, f) -> do
pushJob t () f settings st
threadDelay initialDelay
r1 <- readTVarIO runningJs
r1 `shouldBe` (Counts 2 0)
threadDelay jobDuration
r2 <- readTVarIO runningJs
r2 `shouldBe` (Counts 1 1) -- MOST IMPORTANT CHECK: the B got picked after the
-- two As, because it was inserted right after them
-- and has equal priority.
threadDelay jobDuration
r3 <- readTVarIO runningJs
r3 `shouldBe` (Counts 1 0)
threadDelay jobDuration
r4 <- readTVarIO runningJs
r4 `shouldBe` (Counts 0 0)
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: GargM Env GargError a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
getJobEnv = MyDummyMonad getJobEnv
instance MonadJobStatus MyDummyMonad where
type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle GargError
type JobType MyDummyMonad = GargJob
type JobOutputType MyDummyMonad = JobLog
type JobEventType MyDummyMonad = JobLog
getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
withTracer _ jh n = n jh
markStarted n jh = MyDummyMonad (markStarted n jh)
markProgress steps jh = MyDummyMonad (markProgress steps jh)
markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
markComplete jh = MyDummyMonad (markComplete jh)
markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
runMyDummyMonad env m = do
res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
case res of
Left e -> throwIO e
Right x -> pure x
testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}
shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
shouldBeE a b = liftIO (shouldBe a b)
withJob :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO (SJ.JobStatus 'SJ.Safe JobLog)
withJob env f = runMyDummyMonad env $ MyDummyMonad $
-- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
newJob @_ @GargError mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
withJob_ :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO ()
withJob_ env f = void (withJob env f)
newTestEnv :: IO Env
newTestEnv = do
k <- genSecret
let settings = defaultJobSettings 2 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
pure $ Env
{ _env_settings = error "env_settings not needed, but forced somewhere (check StrictData)"
, _env_logger = error "env_logger not needed, but forced somewhere (check StrictData)"
, _env_pool = error "env_pool not needed, but forced somewhere (check StrictData)"
, _env_nodeStory = error "env_nodeStory not needed, but forced somewhere (check StrictData)"
, _env_manager = testTlsManager
, _env_self_url = error "self_url not needed, but forced somewhere (check StrictData)"
, _env_scrapers = error "scrapers not needed, but forced somewhere (check StrictData)"
, _env_jobs = myEnv
, _env_config = error "config not needed, but forced somewhere (check StrictData)"
, _env_mail = error "mail not needed, but forced somewhere (check StrictData)"
, _env_nlp = error "nlp not needed, but forced somewhere (check StrictData)"
}
testFetchJobStatus :: IO ()
testFetchJobStatus = do
myEnv <- newTestEnv
evts <- newMVar []
withJob_ myEnv $ \hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
markStarted 10 hdl
mb_status' <- getLatestJobStatus hdl
markProgress 5 hdl
mb_status'' <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure ()
threadDelay 500_000
-- Check the events
readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
myEnv <- newTestEnv
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 100 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure ()
let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 50 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure ()
Async.forConcurrently_ [job1, job2] ($ ())
threadDelay 500_000
-- Check the events
readMVar evts1 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 100]
readMVar evts2 >>= \expected -> map _scst_remaining expected `shouldBe` [Just 50]
testMarkProgress :: IO ()
testMarkProgress = do
myEnv <- newTestEnv
evts <- newMVar []
withJob_ myEnv $ \hdl _input -> do
markStarted 10 hdl
jl0 <- getLatestJobStatus hdl
markProgress 1 hdl
jl1 <- getLatestJobStatus hdl
markFailure 1 Nothing hdl
jl2 <- getLatestJobStatus hdl
markFailure 1 (Just "boom") hdl
jl3 <- getLatestJobStatus hdl
markComplete hdl
jl4 <- getLatestJobStatus hdl
markStarted 5 hdl
markProgress 1 hdl
jl5 <- getLatestJobStatus hdl
markFailed (Just "kaboom") hdl
jl6 <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (const (pure [jl0, jl1, jl2, jl3, jl4, jl5, jl6]))
threadDelay 500_000
[jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- readMVar evts
-- Check the events are what we expect
jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 9
, _scst_events = Just []
}
jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 1
, _scst_remaining = Just 8
, _scst_events = Just []
}
jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 2
, _scst_remaining = Just 7
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
, _scst_failed = Just 2
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 4
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "kaboom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
main :: IO ()
main = hspec $ do
describe "job queue" $ do
it "respects max runners limit" $
testMaxRunners
it "respects priorities" $
testPrios
it "can handle exceptions" $
testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $
testFairness
describe "job status update and tracking" $ do
it "can fetch the latest job status" $
testFetchJobStatus
it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention
it "marking stuff behaves as expected" $
testMarkProgress