Commit a617a5a0 authored by Fabien Maniere's avatar Fabien Maniere

Merge branch '495-dev-job-progress-fix-mark-started' into 'dev'

Resolve "[Server slowness] With the dev branch on the dev instance, we're experiencing a real slowness" (JobInfo changes)

See merge request !429
parents 150350f9 8806d152
Pipeline #7934 passed with stages
in 65 minutes and 14 seconds
...@@ -753,6 +753,7 @@ common commonTestDependencies ...@@ -753,6 +753,7 @@ common commonTestDependencies
, generic-arbitrary >= 1.0.1 && < 2 , generic-arbitrary >= 1.0.1 && < 2
, graphviz ^>= 2999.20.1.0 , graphviz ^>= 2999.20.1.0
, haskell-bee , haskell-bee
, haskell-bee-pgmq
, hspec ^>= 2.11.1 , hspec ^>= 2.11.1
, hspec-expectations >= 0.8 && < 0.9 , hspec-expectations >= 0.8 && < 0.9
, hspec-expectations-lifted < 0.11 , hspec-expectations-lifted < 0.11
......
...@@ -23,7 +23,7 @@ module Gargantext.API.Job ( ...@@ -23,7 +23,7 @@ module Gargantext.API.Job (
, addWarningEvent , addWarningEvent
) where ) where
import Control.Lens (over, _Just) import Control.Lens ((%~), over, _Just)
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.Orchestrator.Types import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.Prelude import Gargantext.Prelude
...@@ -66,7 +66,12 @@ jobLogComplete jl = ...@@ -66,7 +66,12 @@ jobLogComplete jl =
& over scst_remaining (const (Just 0)) & over scst_remaining (const (Just 0))
jobLogAddMore :: Int -> JobLog -> JobLog jobLogAddMore :: Int -> JobLog -> JobLog
jobLogAddMore moreSteps jl = jl & over (scst_remaining . _Just) (+ moreSteps) jobLogAddMore moreSteps jl =
jl & scst_remaining %~ (maybe (Just 0) Just)
& scst_succeeded %~ (maybe (Just 0) Just)
& scst_failed %~ (maybe (Just 0) Just)
& scst_events %~ (maybe (Just []) Just)
& (scst_remaining . _Just) %~ (+ moreSteps)
jobLogFailures :: Int -> JobLog -> JobLog jobLogFailures :: Int -> JobLog -> JobLog
jobLogFailures n jl = over (scst_failed . _Just) (+ n) $ jobLogFailures n jl = over (scst_failed . _Just) (+ n) $
......
...@@ -9,6 +9,7 @@ Portability : POSIX ...@@ -9,6 +9,7 @@ Portability : POSIX
-} -}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE UndecidableInstances #-}
...@@ -25,6 +26,7 @@ import Control.Lens.TH ...@@ -25,6 +26,7 @@ import Control.Lens.TH
import Control.Monad.Trans.Control (MonadBaseControl) import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Maybe (fromJust) import Data.Maybe (fromJust)
import Data.Pool qualified as Pool import Data.Pool qualified as Pool
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog) import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
import Gargantext.API.Errors (BackendInternalError) import Gargantext.API.Errors (BackendInternalError)
...@@ -42,7 +44,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET ...@@ -42,7 +44,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (HasConnectionPool(..)) import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to) import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, withLoggerIO) import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, logLocM, withLoggerIO)
import Gargantext.System.Logging.Loggers import Gargantext.System.Logging.Loggers
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle ) import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle )
import System.Log.FastLogger qualified as FL import System.Log.FastLogger qualified as FL
...@@ -182,9 +184,21 @@ instance MonadJobStatus WorkerMonad where ...@@ -182,9 +184,21 @@ instance MonadJobStatus WorkerMonad where
type JobEventType WorkerMonad = JobLog type JobEventType WorkerMonad = JobLog
noJobHandle Proxy = WorkerNoJobHandle noJobHandle Proxy = WorkerNoJobHandle
getLatestJobStatus _ = WorkerMonad (pure noJobLog) getLatestJobStatus WorkerNoJobHandle = pure noJobLog
getLatestJobStatus (WorkerJobHandle ji) = do
stateTVar <- asks _w_env_job_state
state' <- liftIO $ readTVarIO stateTVar
pure $ case state' of
Nothing -> noJobLog
Just wjs ->
if _wjs_job_info wjs == ji
then
_wjs_job_log wjs
else
noJobLog
withTracer _ jh n = n jh withTracer _ jh n = n jh
markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n) markStarted n jh =
updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markProgress steps jh = updateJobProgress jh (jobLogProgress steps) markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh = markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of updateJobProgress jh (\latest -> case mb_msg of
...@@ -208,7 +222,9 @@ updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do ...@@ -208,7 +222,9 @@ updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do
case state' of case state' of
Nothing -> pure () Nothing -> pure ()
Just wjs -> do Just wjs -> do
CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs) (CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs))
`CES.catch` (\(e :: SomeException) ->
$(logLocM) WARNING $ T.pack $ displayException e)
where where
updateState mwjs = updateState mwjs =
let initJobLog = let initJobLog =
......
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE StandaloneDeriving #-}
{-| {-|
Module : Gargantext.Core.Worker.Types Module : Gargantext.Core.Worker.Types
Description : Some useful worker types Description : Some useful worker types
......
...@@ -14,7 +14,16 @@ Portability : POSIX ...@@ -14,7 +14,16 @@ Portability : POSIX
module Test.Utils.Jobs ( test ) where module Test.Utils.Jobs ( test ) where
import Async.Worker.Broker.PGMQ qualified as PGMQ
import Async.Worker.Broker.Types qualified as BT
import Data.Aeson qualified as Aeson
import Data.Maybe (fromJust)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), ScraperEvent(..))
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Types
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Test.Hspec import Test.Hspec
import Test.Instances () -- arbitrary instances import Test.Instances () -- arbitrary instances
...@@ -27,7 +36,6 @@ import Test.Instances () -- arbitrary instances ...@@ -27,7 +36,6 @@ import Test.Instances () -- arbitrary instances
-- via the dispatcher notifications mechanism. -- via the dispatcher notifications mechanism.
test :: Spec test :: Spec
test = do test = do
pure ()
-- describe "job queue" $ do -- describe "job queue" $ do
-- it "respects max runners limit" $ -- it "respects max runners limit" $
-- testMaxRunners -- testMaxRunners
...@@ -37,13 +45,13 @@ test = do ...@@ -37,13 +45,13 @@ test = do
-- testExceptions -- testExceptions
-- it "fairly picks equal-priority-but-different-kind jobs" $ -- it "fairly picks equal-priority-but-different-kind jobs" $
-- testFairness -- testFairness
-- describe "job status update and tracking" $ sequential $ do describe "job status update and tracking" $ sequential $ do
-- it "can fetch the latest job status" $ -- it "can fetch the latest job status" $
-- testFetchJobStatus -- testFetchJobStatus
-- it "can spin two separate jobs and track their status separately" $ -- it "can spin two separate jobs and track their status separately" $
-- testFetchJobStatusNoContention -- testFetchJobStatusNoContention
-- it "marking stuff behaves as expected" $ it "marking stuff behaves as expected" $
-- testMarkProgress testMarkProgress
...@@ -270,110 +278,104 @@ data Counts = Counts { countAs :: Int, countBs :: Int } ...@@ -270,110 +278,104 @@ data Counts = Counts { countAs :: Int, countBs :: Int }
-- (map _scst_remaining evts2' == [Just 50]) -- (map _scst_remaining evts2' == [Just 50])
-- ) 500 -- ) 500
-- testMarkProgress :: IO () testMarkProgress :: IO ()
-- testMarkProgress = do testMarkProgress = do
-- myEnv <- newTestEnv withWorkerEnv "./test-data/test_config.toml" $ \env -> runWorkerMonad env $ do
-- -- evts <- newTBQueueIO 7 let msgId = (fromJust $ Aeson.decode "0") :: BT.MessageId PGMQ.PGMQBroker
-- evts <- newTVarIO [] let hdl = WorkerJobHandle { _w_job_info = JobInfo { _ji_message_id = msgId
-- let expectedEvents = 7 , _ji_mNode_id = Nothing } }
-- let getStatus hdl = do
-- liftIO $ threadDelay 100_000 markStarted 10 hdl
-- st <- getLatestJobStatus hdl jl0 <- getLatestJobStatus hdl
-- -- liftIO $ atomically $ writeTBQueue evts st liftBase $ jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
-- liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st]) , _scst_failed = Just 0
-- readAllEvents = do , _scst_remaining = Just 10
-- -- We will get thread blocking if there is ANY error in the job , _scst_events = Just []
-- -- Hence we assert the `readAllEvents` test doesn't take too long }
-- mRet <- timeout 5_000_000 $ atomically $ do
-- -- allEventsArrived <- isFullTBQueue evts markProgress 1 hdl
-- evts' <- readTVar evts jl1 <- getLatestJobStatus hdl
-- -- STM retry if things failed liftBase $ jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
-- -- check allEventsArrived , _scst_failed = Just 0
-- check (length evts' == expectedEvents) , _scst_remaining = Just 9
-- -- flushTBQueue evts , _scst_events = Just []
-- pure evts' }
-- case mRet of
-- Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events." markFailure 1 (Nothing :: Maybe Void) hdl
-- Just xs jl2 <- getLatestJobStatus hdl
-- | length xs == expectedEvents liftBase $ jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
-- -> pure xs , _scst_failed = Just 1
-- | otherwise , _scst_remaining = Just 8
-- -> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs , _scst_events = Just []
}
-- withJob_ myEnv $ \hdl _input -> do
-- markStarted 10 hdl markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
-- getStatus hdl jl3 <- getLatestJobStatus hdl
liftBase $ jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
-- markProgress 1 hdl , _scst_failed = Just 2
-- getStatus hdl , _scst_remaining = Just 7
, _scst_events = Just [
-- markFailureNoErr 1 hdl ScraperEvent { _scev_message = Just "boom"
-- getStatus hdl , _scev_level = Just "ERROR"
, _scev_date = Nothing }
-- markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl ]
}
-- getStatus hdl
-- markComplete hdl markComplete hdl
jl4 <- getLatestJobStatus hdl
-- getStatus hdl liftBase $ jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
-- markStarted 5 hdl , _scst_failed = Just 2
-- markProgress 1 hdl , _scst_remaining = Just 0
, _scst_events = Just [
-- getStatus hdl ScraperEvent { _scev_message = Just "boom"
-- markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl , _scev_level = Just "ERROR"
, _scev_date = Nothing }
-- getStatus hdl ]
}
-- evts' <- readAllEvents
-- -- This pattern match should never fail, because the precondition is markStarted 5 hdl
-- -- checked in 'readAllEvents'. markProgress 1 hdl
-- let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts' jl5 <- getLatestJobStatus hdl
liftBase $ jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
-- -- Check the events are what we expect , _scst_failed = Just 0
-- jl0 `shouldBe` JobLog { _scst_succeeded = Just 0 , _scst_remaining = Just 4
-- , _scst_failed = Just 0 , _scst_events = Just []
-- , _scst_remaining = Just 10 }
-- , _scst_events = Just []
-- } markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
-- jl1 `shouldBe` JobLog { _scst_succeeded = Just 1 jl6 <- getLatestJobStatus hdl
-- , _scst_failed = Just 0 liftBase $ jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_remaining = Just 9 , _scst_failed = Just 4
-- , _scst_events = Just [] , _scst_remaining = Just 0
-- } , _scst_events = Just [
-- jl2 `shouldBe` JobLog { _scst_succeeded = Just 1 ScraperEvent { _scev_message = Just "kaboom"
-- , _scst_failed = Just 1 , _scev_level = Just "ERROR"
-- , _scst_remaining = Just 8 , _scev_date = Nothing }
-- , _scst_events = Just [] ]
-- } }
-- jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 2 let msgId2 = (fromJust $ Aeson.decode "2") :: BT.MessageId PGMQ.PGMQBroker
-- , _scst_remaining = Just 7 let hdl2 = WorkerJobHandle { _w_job_info = JobInfo { _ji_message_id = msgId2
-- , _scst_events = Just [ , _ji_mNode_id = Nothing } }
-- ScraperEvent { _scev_message = Just "boom" addMoreSteps 11 hdl2
-- , _scev_level = Just "ERROR" jl7 <- getLatestJobStatus hdl2
-- , _scev_date = Nothing } liftBase $ jl7 `shouldBe` JobLog { _scst_succeeded = Just 0
-- ] , _scst_failed = Just 0
-- } , _scst_remaining = Just 11
-- jl4 `shouldBe` JobLog { _scst_succeeded = Just 8 , _scst_events = Just []
-- , _scst_failed = Just 2 }
-- , _scst_remaining = Just 0
-- , _scst_events = Just [ markStarted 1 hdl2
-- ScraperEvent { _scev_message = Just "boom" jl8 <- getLatestJobStatus hdl2
-- , _scev_level = Just "ERROR" liftBase $ jl8 `shouldBe` JobLog { _scst_succeeded = Just 0
-- , _scev_date = Nothing } , _scst_failed = Just 0
-- ] , _scst_remaining = Just 1
-- } , _scst_events = Just []
-- jl5 `shouldBe` JobLog { _scst_succeeded = Just 1 }
-- , _scst_failed = Just 0 addMoreSteps 11 hdl2
-- , _scst_remaining = Just 4 jl9 <- getLatestJobStatus hdl2
-- , _scst_events = Just [] liftBase $ jl9 `shouldBe` JobLog { _scst_succeeded = Just 0
-- } , _scst_failed = Just 0
-- jl6 `shouldBe` JobLog { _scst_succeeded = Just 1 , _scst_remaining = Just 12
-- , _scst_failed = Just 4 , _scst_events = Just []
-- , _scst_remaining = Just 0 }
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "kaboom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment