Jobs.hs 16.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
{-|
Module      : Test.Utils.Jobs
Description :
Copyright   : (c) CNRS, 2017-Present
License     : AGPL + CECILL v3
Maintainer  : team@gargantext.org
Stability   : experimental
Portability : POSIX
-}

{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications    #-}
{-# LANGUAGE TypeFamilies        #-}

module Test.Utils.Jobs (test) where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM
import Data.Sequence ((|>), fromList)
import Data.Time
import Debug.RecoverRTTI (anythingToString)
import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
import Network.HTTP.Client (Manager)
import Network.HTTP.Client.TLS (newTlsManager)
import Prelude qualified
import Servant.Job.Core qualified as SJ
import Servant.Job.Types qualified as SJ
import System.IO.Unsafe
import Test.Hspec
import Test.Hspec.Expectations.Contrib (annotate)
import Test.Utils (waitUntil)


data JobT = A
          | B
          | C
          | D
          deriving (Eq, Ord, Show, Enum, Bounded)

-- | This type models the schedule picked up by the orchestrator.
newtype JobSchedule = JobSchedule { _JobSchedule :: Seq JobT } deriving (Eq, Show)

addJobToSchedule :: JobT -> MVar JobSchedule -> IO ()
addJobToSchedule jobt mvar = do
  modifyMVar_ mvar $ \js -> do
    let js' = js { _JobSchedule = _JobSchedule js |> jobt }
    pure js'

data Counts = Counts { countAs :: Int, countBs :: Int }
  deriving (Eq, Show)

-- | In ms
jobDuration :: Int
jobDuration = 100

type Timer = TVar Bool

-- | Use in conjuction with 'registerDelay' to create an 'STM' transaction
-- that will simulate the duration of a job by waiting the timeout registered
-- by 'registerDelay' before continuing.
waitTimerSTM :: Timer -> STM ()
waitTimerSTM tv = do
  v <- readTVar tv
  check v

-- | Samples the running jobs from the first 'TVar' and write them
-- in the queue.
sampleRunningJobs :: Timer -> TVar [Prelude.String] -> TQueue [Prelude.String]-> STM ()
sampleRunningJobs timer runningJs samples = do
  waitTimerSTM timer
  runningNow <- readTVar runningJs
  case runningNow of
    [] -> pure () -- ignore empty runs, when the system is kickstarting.
    xs -> writeTQueue samples xs

-- | The aim of this test is to ensure that the \"max runners\" setting is
-- respected, i.e. we have no more than \"N\" jobs running at the same time.
testMaxRunners :: IO ()
testMaxRunners = do
  -- max runners = 2 with default settings
  let num_jobs = 4
  k <- genSecret
  let settings = defaultJobSettings 2 k
  st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
  now <- getCurrentTime
  runningJs   <- newTVarIO []
  samples     <- newTQueueIO
  remainingJs <- newTVarIO num_jobs

  -- Not the most elegant solution, but in order to test the \"max runners\"
  -- parameter we start an asynchronous computation that continuously reads the content
  -- of 'runningJs' and at the end ensures that this value was
  -- always <= \"max_runners" (but crucially not 0).

  asyncReader <- async $ forever $ do 
    samplingFrequency <- registerDelay 100_000
    atomically $ sampleRunningJobs samplingFrequency runningJs samples
  
  let duration = 1_000_000
      j num _jHandle _inp _l = do
        durationTimer <- registerDelay duration
        -- NOTE: We do the modification of the 'runningJs' and the rest
        -- in two transactions on purpose, to give a chance to the async
        -- sampler to sample the status of the world.
        atomically $  modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
        atomically $ do
          waitTimerSTM durationTimer
          modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
          modifyTVar remainingJs pred
      jobs = [ (A, j n) | n <- [1..num_jobs::Int] ]

  atomically $ forM_ jobs $ \(t, f) -> void $
    pushJobWithTime now t () f settings st

  let waitFinished = atomically $ do
        x <- readTVar remainingJs
        check (x == 0)

  -- Wait for the jobs to finish, then stop the sampler.
  waitFinished
  cancel asyncReader

  -- Check that we got /some/ samples and for each of them,
  -- let's check only two runners at max were alive.
  allSamples <- atomically $ flushTQueue samples
  length allSamples `shouldSatisfy` (> 0)

  forM_ allSamples $ \runLog -> do
    annotate "predicate to satisfy: (x `isInfixOf` [\"Job #1\", \"Job #2\"] || x `isInfixOf` [\"Job #3\", \"Job #4\"]" $
      shouldSatisfy (sort runLog)
                    (\x -> x `isInfixOf` ["Job #1", "Job #2"]
                      || x `isInfixOf` ["Job #3", "Job #4"])

testPrios :: IO ()
testPrios = do
  k <- genSecret
  -- Use a single runner, so that we can check the order of execution
  -- without worrying about the runners competing with each other.
  let settings = defaultJobSettings 1 k
      prios    = [(B, 10), (C, 1), (D, 5)]
  st :: JobsState JobT [Prelude.String] () <- newJobsState settings $
    applyPrios prios defaultPrios -- B has the highest priority
  pickedSchedule <- newMVar (JobSchedule mempty)
  let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
      jobs = [ (A, j A)
             , (C, j C)
             , (B, j B)
             , (D, j D)
             ]

  -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
  -- the time 'popQueue' gets called.
  now <- getCurrentTime
  atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st

  -- wait for the jobs to finish, waiting for more than the total duration,
  -- so that we are sure that all jobs have finished, then check the schedule.
  -- threadDelay jobDuration
  waitUntil (do
    finalSchedule <- readMVar pickedSchedule
    pure $ finalSchedule == JobSchedule (fromList [B, D, C, A])) jobDuration

testExceptions :: IO ()
testExceptions = do
  k <- genSecret
  let settings = defaultJobSettings 1 k
  st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
  jid <- pushJob A ()
    (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
    settings st
  -- Wait 1 second to make sure the job is finished.
  threadDelay $ 1_000_000
  mjob <- lookupJob jid (jobsData st)
  case mjob of
    Nothing ->  Prelude.fail "lookupJob failed, job not found!"
    Just je ->  case jTask je of
      DoneJ _ r   -> isLeft r `shouldBe` True
      unexpected  -> Prelude.fail $ "Expected job to be done, but got: " <> anythingToString unexpected
  return ()

testFairness :: IO ()
testFairness = do
  k <- genSecret
  let settings = defaultJobSettings 1 k
  st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
  pickedSchedule <- newMVar (JobSchedule mempty)
  let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
      jobs = [ (A, j A)
             , (A, j A)
             , (B, j B)
             , (A, j A)
             , (A, j A)
             ]
  time <- getCurrentTime
  -- in this scenario we simulate two types of jobs all with
  -- all the same level of priority: our queue implementation
  -- will behave as a classic FIFO, keeping into account the
  -- time of arrival.
  atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
    pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st

  -- threadDelay jobDuration
  waitUntil (do
    finalSchedule <- readMVar pickedSchedule
    pure $ finalSchedule == JobSchedule (fromList [A, A, B, A, A])) jobDuration


newtype MyDummyMonad a =
  MyDummyMonad { _MyDummyMonad :: GargM Env BackendInternalError a }
  deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)

instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
  getJobEnv = MyDummyMonad getJobEnv

instance MonadJobStatus MyDummyMonad where
  type JobHandle      MyDummyMonad = EnvTypes.ConcreteJobHandle BackendInternalError
  type JobType        MyDummyMonad = GargJob
  type JobOutputType  MyDummyMonad = JobLog
  type JobEventType   MyDummyMonad = JobLog

  noJobHandle _               = noJobHandle (Proxy :: Proxy (GargM Env BackendInternalError))
  getLatestJobStatus jId      = MyDummyMonad (getLatestJobStatus jId)
  withTracer _ jh n           = n jh
  markStarted n jh            = MyDummyMonad (markStarted n jh)
  markProgress steps jh       = MyDummyMonad (markProgress steps jh)
  markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
  markComplete jh             = MyDummyMonad (markComplete jh)
  markFailed mb_msg jh        = MyDummyMonad (markFailed mb_msg jh)
  addMoreSteps steps jh       = MyDummyMonad (addMoreSteps steps jh)

runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
runMyDummyMonad env m = do
  res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
  case res of
    Left e -> throwIO e
    Right x -> pure x

testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}

withJob :: Env
        -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
        -> IO (SJ.JobStatus 'SJ.Safe JobLog)
withJob env f = runMyDummyMonad env $ MyDummyMonad $
  -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
  newJob @_ mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
    runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)

withJob_ :: Env
         -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
         -> IO ()
withJob_ env f = void (withJob env f)

newTestEnv :: IO Env
newTestEnv = do
  k <- genSecret
  let settings = defaultJobSettings 1 k
  myEnv <- newJobEnv settings defaultPrios testTlsManager
  pure $ Env
       { _env_settings  = Prelude.error "env_settings not needed, but forced somewhere (check StrictData)"
       , _env_logger    = Prelude.error "env_logger not needed, but forced somewhere (check StrictData)"
       , _env_pool      = Prelude.error "env_pool not needed, but forced somewhere (check StrictData)"
       , _env_nodeStory = Prelude.error "env_nodeStory not needed, but forced somewhere (check StrictData)"
       , _env_manager   = testTlsManager
       , _env_self_url  = Prelude.error "self_url not needed, but forced somewhere (check StrictData)"
       , _env_scrapers  = Prelude.error "scrapers not needed, but forced somewhere (check StrictData)"
       , _env_jobs      = myEnv
       , _env_config    = Prelude.error "config not needed, but forced somewhere (check StrictData)"
       , _env_mail      = Prelude.error "mail not needed, but forced somewhere (check StrictData)"
       , _env_nlp       = Prelude.error "nlp not needed, but forced somewhere (check StrictData)"
       , _env_central_exchange = Prelude.error "central exchange not needed, but forced somewhere (check StrictData)"
       , _env_dispatcher = Prelude.error "dispatcher not needed, but forced somewhere (check StrictData)"
       }

testFetchJobStatus :: IO ()
testFetchJobStatus = do
  myEnv <- newTestEnv
  evts <- newMVar []

  withJob_ myEnv $ \hdl _input -> do
    mb_status <- getLatestJobStatus hdl

    -- now let's log something
    markStarted 10 hdl
    mb_status' <- getLatestJobStatus hdl
    markProgress 5 hdl
    mb_status'' <- getLatestJobStatus hdl

    liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
    pure ()

  -- threadDelay 500_000
  -- Check the events
  -- readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
  waitUntil (do
    evts' <- readMVar evts
    pure $ map _scst_remaining evts' == [Nothing, Just 10, Just 5]
    ) 1000

testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
  myEnv <- newTestEnv

  evts1 <- newMVar []
  evts2 <- newMVar []

  let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
        markStarted 100 hdl
        mb_status <- getLatestJobStatus hdl
        liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
        pure ()

  let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
        markStarted 50 hdl
        mb_status <- getLatestJobStatus hdl
        liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
        pure ()

  Async.forConcurrently_ [job1, job2] ($ ())
  -- threadDelay 500_000
  -- Check the events
  waitUntil (do
    evts1' <- readMVar evts1
    evts2' <- readMVar evts2
    pure $ (map _scst_remaining evts1' == [Just 100]) &&
           (map _scst_remaining evts2' == [Just 50])
            ) 500

testMarkProgress :: IO ()
testMarkProgress = do
  myEnv <- newTestEnv
  evts  <- newTBQueueIO 7
  let getStatus hdl = do
        liftIO $ threadDelay 100_000
        st <- getLatestJobStatus hdl
        liftIO $ atomically $ writeTBQueue evts st
      readAllEvents = do
        allEventsArrived <- isFullTBQueue evts
        if allEventsArrived then flushTBQueue evts else retry

  withJob_ myEnv $ \hdl _input -> do
    markStarted 10 hdl
    getStatus hdl

    markProgress 1 hdl
    getStatus hdl

    markFailureNoErr 1 hdl
    getStatus hdl

    markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl

    getStatus hdl
    markComplete hdl

    getStatus hdl
    markStarted 5 hdl
    markProgress 1 hdl

    getStatus hdl
    markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl

    getStatus hdl

  [jl0, jl1, jl2, jl3, jl4, jl5, jl6] <- atomically readAllEvents

  -- Check the events are what we expect
  jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
                        , _scst_failed    = Just 0
                        , _scst_remaining = Just 10
                        , _scst_events    = Just []
                        }
  jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
                        , _scst_failed    = Just 0
                        , _scst_remaining = Just 9
                        , _scst_events    = Just []
                        }
  jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
                        , _scst_failed    = Just 1
                        , _scst_remaining = Just 8
                        , _scst_events    = Just []
                        }
  jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
                        , _scst_failed    = Just 2
                        , _scst_remaining = Just 7
                        , _scst_events    = Just [
                            ScraperEvent { _scev_message = Just "boom"
                                         , _scev_level = Just "ERROR"
                                         , _scev_date = Nothing }
                        ]
                        }
  jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
                        , _scst_failed    = Just 2
                        , _scst_remaining = Just 0
                        , _scst_events    = Just [
                            ScraperEvent { _scev_message = Just "boom"
                                         , _scev_level = Just "ERROR"
                                         , _scev_date = Nothing }
                        ]
                        }
  jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
                        , _scst_failed    = Just 0
                        , _scst_remaining = Just 4
                        , _scst_events    = Just []
                        }
  jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
                        , _scst_failed    = Just 4
                        , _scst_remaining = Just 0
                        , _scst_events    = Just [
                            ScraperEvent { _scev_message = Just "kaboom"
                                         , _scev_level = Just "ERROR"
                                         , _scev_date = Nothing }
                        ]
                        }

test :: Spec
test = do
  describe "job queue" $ do
    it "respects max runners limit" $
      testMaxRunners
    it "respects priorities" $
      testPrios
    it "can handle exceptions" $
      testExceptions
    it "fairly picks equal-priority-but-different-kind jobs" $
      testFairness
  describe "job status update and tracking" $ do
    it "can fetch the latest job status" $
      testFetchJobStatus
    it "can spin two separate jobs and track their status separately" $
      testFetchJobStatusNoContention
    it "marking stuff behaves as expected" $
      testMarkProgress