[jobs] implemen tests for JobInfo

Also, implement getLatestJobStatus.

Don't fail critically if notification can't be sent.
parent c0b775e0
Pipeline #7761 passed with stages
in 49 minutes and 23 seconds
......@@ -750,6 +750,7 @@ common commonTestDependencies
, generic-arbitrary >= 1.0.1 && < 2
, graphviz ^>= 2999.20.1.0
, haskell-bee
, haskell-bee-pgmq
, hspec ^>= 2.11.1
, hspec-expectations >= 0.8 && < 0.9
, hspec-expectations-lifted < 0.11
......
......@@ -9,8 +9,9 @@ Portability : POSIX
-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError BackendInternalError
......@@ -25,6 +26,7 @@ import Control.Lens.TH
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Maybe (fromJust)
import Data.Pool qualified as Pool
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
import Gargantext.API.Errors (BackendInternalError)
......@@ -42,7 +44,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, withLoggerIO)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, logLocM, withLoggerIO)
import Gargantext.System.Logging.Loggers
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle )
import System.Log.FastLogger qualified as FL
......@@ -182,9 +184,21 @@ instance MonadJobStatus WorkerMonad where
type JobEventType WorkerMonad = JobLog
noJobHandle Proxy = WorkerNoJobHandle
getLatestJobStatus _ = WorkerMonad (pure noJobLog)
getLatestJobStatus WorkerNoJobHandle = pure noJobLog
getLatestJobStatus (WorkerJobHandle ji) = do
stateTVar <- asks _w_env_job_state
state' <- liftIO $ readTVarIO stateTVar
pure $ case state' of
Nothing -> noJobLog
Just wjs ->
if _wjs_job_info wjs == ji
then
_wjs_job_log wjs
else
noJobLog
withTracer _ jh n = n jh
markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markStarted n jh =
updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
......@@ -208,7 +222,9 @@ updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do
case state' of
Nothing -> pure ()
Just wjs -> do
CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs)
(CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs))
`CES.catch` (\(e :: SomeException) ->
$(logLocM) ERROR $ T.pack $ displayException e)
where
updateState mwjs =
let initJobLog =
......
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE StandaloneDeriving #-}
{-|
Module : Gargantext.Core.Worker.Types
Description : Some useful worker types
......
......@@ -14,7 +14,16 @@ Portability : POSIX
module Test.Utils.Jobs ( test ) where
import Async.Worker.Broker.PGMQ qualified as PGMQ
import Async.Worker.Broker.Types qualified as BT
import Data.Aeson qualified as Aeson
import Data.Maybe (fromJust)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), ScraperEvent(..))
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Types
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Test.Hspec
import Test.Instances () -- arbitrary instances
......@@ -27,7 +36,6 @@ import Test.Instances () -- arbitrary instances
-- via the dispatcher notifications mechanism.
test :: Spec
test = do
pure ()
-- describe "job queue" $ do
-- it "respects max runners limit" $
-- testMaxRunners
......@@ -37,13 +45,13 @@ test = do
-- testExceptions
-- it "fairly picks equal-priority-but-different-kind jobs" $
-- testFairness
-- describe "job status update and tracking" $ sequential $ do
-- it "can fetch the latest job status" $
-- testFetchJobStatus
describe "job status update and tracking" $ sequential $ do
-- it "can fetch the latest job status" $
-- testFetchJobStatus
-- it "can spin two separate jobs and track their status separately" $
-- testFetchJobStatusNoContention
-- it "marking stuff behaves as expected" $
-- testMarkProgress
it "marking stuff behaves as expected" $
testMarkProgress
......@@ -270,110 +278,79 @@ data Counts = Counts { countAs :: Int, countBs :: Int }
-- (map _scst_remaining evts2' == [Just 50])
-- ) 500
-- testMarkProgress :: IO ()
-- testMarkProgress = do
-- myEnv <- newTestEnv
-- -- evts <- newTBQueueIO 7
-- evts <- newTVarIO []
-- let expectedEvents = 7
-- let getStatus hdl = do
-- liftIO $ threadDelay 100_000
-- st <- getLatestJobStatus hdl
-- -- liftIO $ atomically $ writeTBQueue evts st
-- liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st])
-- readAllEvents = do
-- -- We will get thread blocking if there is ANY error in the job
-- -- Hence we assert the `readAllEvents` test doesn't take too long
-- mRet <- timeout 5_000_000 $ atomically $ do
-- -- allEventsArrived <- isFullTBQueue evts
-- evts' <- readTVar evts
-- -- STM retry if things failed
-- -- check allEventsArrived
-- check (length evts' == expectedEvents)
-- -- flushTBQueue evts
-- pure evts'
-- case mRet of
-- Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events."
-- Just xs
-- | length xs == expectedEvents
-- -> pure xs
-- | otherwise
-- -> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs
testMarkProgress :: IO ()
testMarkProgress = do
withWorkerEnv "./test-data/test_config.toml" $ \env -> runWorkerMonad env $ do
let msgId = (fromJust $ Aeson.decode "0") :: BT.MessageId PGMQ.PGMQBroker
let hdl = WorkerJobHandle { _w_job_info = JobInfo { _ji_message_id = msgId
, _ji_mNode_id = Nothing } }
markStarted 10 hdl
jl0 <- getLatestJobStatus hdl
liftBase $ jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
markProgress 1 hdl
jl1 <- getLatestJobStatus hdl
liftBase $ jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 9
, _scst_events = Just []
}
markFailure 1 (Nothing :: Maybe Void) hdl
jl2 <- getLatestJobStatus hdl
liftBase $ jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 1
, _scst_remaining = Just 8
, _scst_events = Just []
}
markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
jl3 <- getLatestJobStatus hdl
liftBase $ jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 2
, _scst_remaining = Just 7
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
markComplete hdl
jl4 <- getLatestJobStatus hdl
liftBase $ jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
, _scst_failed = Just 2
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
markStarted 5 hdl
markProgress 1 hdl
jl5 <- getLatestJobStatus hdl
liftBase $ jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
jl6 <- getLatestJobStatus hdl
liftBase $ jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 4
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "kaboom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
-- withJob_ myEnv $ \hdl _input -> do
-- markStarted 10 hdl
-- getStatus hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailureNoErr 1 hdl
-- getStatus hdl
-- markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
-- getStatus hdl
-- markComplete hdl
-- getStatus hdl
-- markStarted 5 hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
-- getStatus hdl
-- evts' <- readAllEvents
-- -- This pattern match should never fail, because the precondition is
-- -- checked in 'readAllEvents'.
-- let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts'
-- -- Check the events are what we expect
-- jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 10
-- , _scst_events = Just []
-- }
-- jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 9
-- , _scst_events = Just []
-- }
-- jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 1
-- , _scst_remaining = Just 8
-- , _scst_events = Just []
-- }
-- jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 7
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 4
-- , _scst_events = Just []
-- }
-- jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 4
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "kaboom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment