{-| Module : Test.Utils.Jobs Description : Copyright : (c) CNRS, 2017-Present License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX -} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeFamilies #-} module Test.Utils.Jobs ( test ) where import Gargantext.Prelude import Test.Hspec import Test.Instances () -- arbitrary instances -- | TODO This suite did test some old-style worker internals. We -- moved functionality to new-style workers. Some of these tests are -- obsolete now. We could implement tests only to check that we wired -- things properly. -- E.g. we could check that the job progress is reflected correctly -- via the dispatcher notifications mechanism. test :: Spec test = do pure () -- describe "job queue" $ do -- it "respects max runners limit" $ -- testMaxRunners -- it "respects priorities" $ -- testPrios -- it "can handle exceptions" $ -- testExceptions -- it "fairly picks equal-priority-but-different-kind jobs" $ -- testFairness -- describe "job status update and tracking" $ sequential $ do -- it "can fetch the latest job status" $ -- testFetchJobStatus -- it "can spin two separate jobs and track their status separately" $ -- testFetchJobStatusNoContention -- it "marking stuff behaves as expected" $ -- testMarkProgress data JobT = A | B | C | D deriving (Eq, Ord, Show, Enum, Bounded) -- | This type models the schedule picked up by the orchestrator. newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show) -- addJobToSchedule :: JobT -> MVar JobSchedule -> IO () -- addJobToSchedule jobt mvar = do -- modifyMVar_ mvar $ \js -> do -- let js' = js { _JobSchedule = _JobSchedule js |> jobt } -- pure js' data Counts = Counts { countAs :: Int, countBs :: Int } deriving (Eq, Show) -- | In ms -- jobDuration :: Int -- jobDuration = 100 -- type Timer = TVar Bool -- -- | Use in conjuction with 'registerDelay' to create an 'STM' transaction -- -- that will simulate the duration of a job by waiting the timeout registered -- -- by 'registerDelay' before continuing. -- waitTimerSTM :: Timer -> STM () -- waitTimerSTM tv = do -- v <- readTVar tv -- check v -- -- | Samples the running jobs from the first 'TVar' and write them -- -- in the queue. -- sampleRunningJobs :: Timer -> TVar [Prelude.String] -> TQueue [Prelude.String]-> STM () -- sampleRunningJobs timer runningJs samples = do -- waitTimerSTM timer -- runningNow <- readTVar runningJs -- case runningNow of -- [] -> pure () -- ignore empty runs, when the system is kickstarting. -- xs -> writeTQueue samples xs -- testPrios :: IO () -- testPrios = do -- k <- genSecret -- -- Use a single runner, so that we can check the order of execution -- -- without worrying about the runners competing with each other. -- let settings = defaultJobSettings 1 k -- prios = [(B, 10), (C, 1), (D, 5)] -- st :: JobsState JobT [Prelude.String] () <- newJobsState settings $ -- applyPrios prios defaultPrios -- B has the highest priority -- pickedSchedule <- newMVar (JobSchedule mempty) -- let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule -- jobs = [ (A, j A) -- , (C, j C) -- , (B, j B) -- , (D, j D) -- ] -- -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by -- -- the time 'popQueue' gets called. -- now <- getCurrentTime -- atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st -- -- wait for the jobs to finish, waiting for more than the total duration, -- -- so that we are sure that all jobs have finished, then check the schedule. -- -- threadDelay jobDuration -- waitUntil (do -- finalSchedule <- readMVar pickedSchedule -- pure $ finalSchedule == JobSchedule (fromList [B, D, C, A])) jobDuration -- testExceptions :: IO () -- testExceptions = do -- k <- genSecret -- let settings = defaultJobSettings 1 k -- st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios -- jid <- pushJob A () -- (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn) -- settings st -- -- Wait 1 second to make sure the job is finished. -- threadDelay $ 1_000_000 -- mjob <- lookupJob jid (jobsData st) -- case mjob of -- Nothing -> Prelude.fail "lookupJob failed, job not found!" -- Just je -> case jTask je of -- DoneJ _ r -> isLeft r `shouldBe` True -- unexpected -> Prelude.fail $ "Expected job to be done, but got: " <> anythingToString unexpected -- return () -- testFairness :: IO () -- testFairness = do -- k <- genSecret -- let settings = defaultJobSettings 1 k -- st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios -- pickedSchedule <- newMVar (JobSchedule mempty) -- let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule -- jobs = [ (A, j A) -- , (A, j A) -- , (B, j B) -- , (A, j A) -- , (A, j A) -- ] -- time <- getCurrentTime -- -- in this scenario we simulate two types of jobs all with -- -- all the same level of priority: our queue implementation -- -- will behave as a classic FIFO, keeping into account the -- -- time of arrival. -- atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $ -- pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st -- -- threadDelay jobDuration -- waitUntil (do -- finalSchedule <- readMVar pickedSchedule -- pure $ finalSchedule == JobSchedule (fromList [A, A, B, A, A])) jobDuration -- testTlsManager :: Manager -- testTlsManager = unsafePerformIO newTlsManager -- {-# NOINLINE testTlsManager #-} -- withJob :: Env -- -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ()) -- -> IO (SJ.JobStatus 'SJ.Safe JobLog) -- withJob env f = runMyDummyMonad env $ MyDummyMonad $ -- -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'. -- newJob @_ mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input -> -- runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing) -- withJob_ :: Env -- -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ()) -- -> IO () -- withJob_ env f = void (withJob env f) -- newTestEnv :: IO Env -- newTestEnv = do -- let fmt_error v = Prelude.error $ "[Test.Utils.Jobs.Env] " <> v <> " not needed, but forced somewhere (check StrictData)" -- let _gc_notifications_config = -- NotificationsConfig { _nc_central_exchange_bind = fmt_error "nc_central_exchange_bind" -- , _nc_central_exchange_connect = "tcp://localhost:15510" -- , _nc_dispatcher_bind = fmt_error "nc_dispatcher_bind" -- , _nc_dispatcher_connect = fmt_error "nc_dispatcher_connect" } -- let _env_config = -- GargConfig { _gc_datafilepath = fmt_error "gc_datafilepath" -- , _gc_frontend_config = fmt_error "gc_frontend_config" -- , _gc_mail_config = fmt_error "gc_mail_config" -- , _gc_database_config = fmt_error "gc_database_config" -- , _gc_nlp_config = fmt_error "gc_nlp_config" -- , _gc_notifications_config -- , _gc_frames = fmt_error "gc_frames not needed" -- , _gc_jobs = fmt_error "gc_jobs not needed" -- , _gc_secrets = fmt_error "gc_secrets" -- , _gc_apis = fmt_error "gc_apis" -- , _gc_log_level = fmt_error "gc_log_level" -- } -- pure $ Env -- { _env_logger = fmt_error "env_logger" -- , _env_pool = fmt_error "env_pool" -- , _env_nodeStory = fmt_error "env_nodeStory" -- , _env_manager = testTlsManager -- , _env_self_url = fmt_error "self_url" -- , _env_scrapers = fmt_error "scrapers" -- , _env_config -- , _env_central_exchange = fmt_error "central exchange" -- , _env_dispatcher = fmt_error "dispatcher" -- , _env_jwt_settings = fmt_error "jwt_settings" -- } -- testFetchJobStatus :: IO () -- testFetchJobStatus = do -- myEnv <- newTestEnv -- evts <- newMVar [] -- withJob_ myEnv $ \hdl _input -> do -- mb_status <- getLatestJobStatus hdl -- -- now let's log something -- markStarted 10 hdl -- mb_status' <- getLatestJobStatus hdl -- markProgress 5 hdl -- mb_status'' <- getLatestJobStatus hdl -- liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs) -- pure () -- -- threadDelay 500_000 -- -- Check the events -- -- readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5] -- waitUntil (do -- evts' <- readMVar evts -- pure $ map _scst_remaining evts' == [Nothing, Just 10, Just 5] -- ) 1000 -- testFetchJobStatusNoContention :: IO () -- testFetchJobStatusNoContention = do -- myEnv <- newTestEnv -- evts1 <- newMVar [] -- evts2 <- newMVar [] -- let job1 = \() -> withJob_ myEnv $ \hdl _input -> do -- markStarted 100 hdl -- mb_status <- getLatestJobStatus hdl -- liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs) -- pure () -- let job2 = \() -> withJob_ myEnv $ \hdl _input -> do -- markStarted 50 hdl -- mb_status <- getLatestJobStatus hdl -- liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs) -- pure () -- Async.forConcurrently_ [job1, job2] ($ ()) -- -- threadDelay 500_000 -- -- Check the events -- waitUntil (do -- evts1' <- readMVar evts1 -- evts2' <- readMVar evts2 -- pure $ (map _scst_remaining evts1' == [Just 100]) && -- (map _scst_remaining evts2' == [Just 50]) -- ) 500 -- testMarkProgress :: IO () -- testMarkProgress = do -- myEnv <- newTestEnv -- -- evts <- newTBQueueIO 7 -- evts <- newTVarIO [] -- let expectedEvents = 7 -- let getStatus hdl = do -- liftIO $ threadDelay 100_000 -- st <- getLatestJobStatus hdl -- -- liftIO $ atomically $ writeTBQueue evts st -- liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st]) -- readAllEvents = do -- -- We will get thread blocking if there is ANY error in the job -- -- Hence we assert the `readAllEvents` test doesn't take too long -- mRet <- timeout 5_000_000 $ atomically $ do -- -- allEventsArrived <- isFullTBQueue evts -- evts' <- readTVar evts -- -- STM retry if things failed -- -- check allEventsArrived -- check (length evts' == expectedEvents) -- -- flushTBQueue evts -- pure evts' -- case mRet of -- Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events." -- Just xs -- | length xs == expectedEvents -- -> pure xs -- | otherwise -- -> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs -- withJob_ myEnv $ \hdl _input -> do -- markStarted 10 hdl -- getStatus hdl -- markProgress 1 hdl -- getStatus hdl -- markFailureNoErr 1 hdl -- getStatus hdl -- markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl -- getStatus hdl -- markComplete hdl -- getStatus hdl -- markStarted 5 hdl -- markProgress 1 hdl -- getStatus hdl -- markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl -- getStatus hdl -- evts' <- readAllEvents -- -- This pattern match should never fail, because the precondition is -- -- checked in 'readAllEvents'. -- let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts' -- -- Check the events are what we expect -- jl0 `shouldBe` JobLog { _scst_succeeded = Just 0 -- , _scst_failed = Just 0 -- , _scst_remaining = Just 10 -- , _scst_events = Just [] -- } -- jl1 `shouldBe` JobLog { _scst_succeeded = Just 1 -- , _scst_failed = Just 0 -- , _scst_remaining = Just 9 -- , _scst_events = Just [] -- } -- jl2 `shouldBe` JobLog { _scst_succeeded = Just 1 -- , _scst_failed = Just 1 -- , _scst_remaining = Just 8 -- , _scst_events = Just [] -- } -- jl3 `shouldBe` JobLog { _scst_succeeded = Just 1 -- , _scst_failed = Just 2 -- , _scst_remaining = Just 7 -- , _scst_events = Just [ -- ScraperEvent { _scev_message = Just "boom" -- , _scev_level = Just "ERROR" -- , _scev_date = Nothing } -- ] -- } -- jl4 `shouldBe` JobLog { _scst_succeeded = Just 8 -- , _scst_failed = Just 2 -- , _scst_remaining = Just 0 -- , _scst_events = Just [ -- ScraperEvent { _scev_message = Just "boom" -- , _scev_level = Just "ERROR" -- , _scev_date = Nothing } -- ] -- } -- jl5 `shouldBe` JobLog { _scst_succeeded = Just 1 -- , _scst_failed = Just 0 -- , _scst_remaining = Just 4 -- , _scst_events = Just [] -- } -- jl6 `shouldBe` JobLog { _scst_succeeded = Just 1 -- , _scst_failed = Just 4 -- , _scst_remaining = Just 0 -- , _scst_events = Just [ -- ScraperEvent { _scev_message = Just "kaboom" -- , _scev_level = Just "ERROR" -- , _scev_date = Nothing } -- ] -- }