[worker] tasty tests pass now

parent b3830a99
Pipeline #6935 canceled with stages
......@@ -127,12 +127,16 @@ library
Gargantext.API.Ngrams.Tools
Gargantext.API.Ngrams.Types
Gargantext.API.Node
Gargantext.API.Node.Corpus.Annuaire
Gargantext.API.Node.Corpus.New
Gargantext.API.Node.Corpus.New.Types
Gargantext.API.Node.Corpus.Types
Gargantext.API.Node.Corpus.Update
Gargantext.API.Node.DocumentsFromWriteNodes.Types
Gargantext.API.Node.DocumentUpload.Types
Gargantext.API.Node.File
Gargantext.API.Node.File.Types
Gargantext.API.Node.FrameCalcUpload.Types
Gargantext.API.Node.Share
Gargantext.API.Node.Share.Types
Gargantext.API.Node.ShareURL
......@@ -247,6 +251,7 @@ library
Gargantext.Core.Worker.Env
Gargantext.Core.Worker.Jobs
Gargantext.Core.Worker.Jobs.Types
Gargantext.Core.Worker.PGMQTypes
Gargantext.Core.Worker.Types
Gargantext.Database.Action.Flow
Gargantext.Database.Action.Flow.Types
......@@ -310,7 +315,6 @@ library
Gargantext.API.Ngrams.NgramsTree
Gargantext.API.Node.Contact
Gargantext.API.Node.Contact.Types
Gargantext.API.Node.Corpus.Annuaire
Gargantext.API.Node.Corpus.Export
Gargantext.API.Node.Corpus.Export.Types
Gargantext.API.Node.Corpus.Searx
......@@ -319,11 +323,8 @@ library
Gargantext.API.Node.Phylo.Export
Gargantext.API.Node.Phylo.Export.Types
Gargantext.API.Node.DocumentUpload
Gargantext.API.Node.DocumentUpload.Types
Gargantext.API.Node.DocumentsFromWriteNodes
Gargantext.API.Node.DocumentsFromWriteNodes.Types
Gargantext.API.Node.FrameCalcUpload
Gargantext.API.Node.FrameCalcUpload.Types
Gargantext.API.Node.Get
Gargantext.API.Node.New
Gargantext.API.Node.New.Types
......@@ -853,6 +854,8 @@ test-suite garg-test-tasty
Test.Utils.Crypto
Test.Utils.Db
Test.Utils.Jobs
Test.Utils.Jobs.Types
Test.Utils.Notifications
hs-source-dirs:
test bin/gargantext-cli
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
......@@ -886,6 +889,7 @@ test-suite garg-test-hspec
Test.Types
Test.Utils
Test.Utils.Db
Test.Utils.Notifications
hs-source-dirs:
test
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
......
{-|
Module : Gargantext.API.Node.FrameCalcUpload
Module : Gargantext.API.Node.FrameCalcUpload.Types
Description : Frame calc upload types
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
......
......@@ -55,30 +55,22 @@ import StmContainers.Set as SSet
-- | A topic is sent, when a client wants to subscribe to specific
-- | types of notifications
data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
UpdateJobProgress (JobID 'Safe)
-- | New, worker version for updating job state
| UpdateWorkerProgress JobInfo
UpdateWorkerProgress JobInfo
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
| UpdateTree NodeId
deriving (Eq, Ord)
instance Prelude.Show Topic where
show (UpdateJobProgress jId) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId)
show (UpdateWorkerProgress ji) = "UpdateWorkerProgress " <> show ji
show (UpdateTree nodeId) = "UpdateTree " <> show nodeId
instance Hashable Topic where
hashWithSalt salt (UpdateJobProgress jId) = hashWithSalt salt ("update-job-progress" :: Text, Aeson.encode jId)
hashWithSalt salt (UpdateWorkerProgress ji) = hashWithSalt salt ("update-worker-progress" :: Text, Aeson.encode ji)
hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId)
instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_job_progress" -> do
jId <- o .: "j_id"
pure $ UpdateJobProgress jId
"update_worker_progress" -> do
ji <- o .: "ji"
pure $ UpdateWorkerProgress ji
......@@ -87,10 +79,6 @@ instance FromJSON Topic where
pure $ UpdateTree node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON Topic where
toJSON (UpdateJobProgress jId) = Aeson.object [
"type" .= toJSON ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
]
toJSON (UpdateWorkerProgress ji) = Aeson.object [
"type" .= toJSON ("update_worker_progress" :: Text)
, "ji" .= toJSON ji
......
......@@ -17,11 +17,9 @@ Portability : POSIX
module Gargantext.Core.Worker where
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage, messageId)
import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
......@@ -42,6 +40,7 @@ import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Viz.Graph.API (graphRecompute)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
......@@ -52,10 +51,10 @@ import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, ma
import System.Posix.Signals (Handler(Catch), installHandler, keyboardSignal)
initWorkerState :: (HasWorkerBroker PGMQBroker Job)
initWorkerState :: HasWorkerBroker
=> WorkerEnv
-> WorkerDefinition
-> IO (W.State PGMQBroker Job)
-> IO WState
initWorkerState env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
......@@ -70,10 +69,10 @@ initWorkerState env (WorkerDefinition { .. }) = do
, onJobError = Just $ notifyJobFailed env
, onWorkerKilledSafely = Just $ notifyJobKilled env }
notifyJobStarted :: (HasWorkerBroker PGMQBroker Job)
notifyJobStarted :: HasWorkerBroker
=> WorkerEnv
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> WState
-> BrokerMessage
-> IO ()
notifyJobStarted env (W.State { name }) bm = do
let j = toA $ getMessage bm
......@@ -85,10 +84,10 @@ notifyJobStarted env (W.State { name }) bm = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markStarted 1 jh
notifyJobFinished :: (HasWorkerBroker PGMQBroker Job)
notifyJobFinished :: HasWorkerBroker
=> WorkerEnv
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> WState
-> BrokerMessage
-> IO ()
notifyJobFinished env (W.State { name }) bm = do
let j = toA $ getMessage bm
......@@ -100,10 +99,10 @@ notifyJobFinished env (W.State { name }) bm = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markComplete jh
notifyJobTimeout :: (HasWorkerBroker PGMQBroker Job)
notifyJobTimeout :: HasWorkerBroker
=> WorkerEnv
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> WState
-> BrokerMessage
-> IO ()
notifyJobTimeout env (W.State { name }) bm = do
let j = toA $ getMessage bm
......@@ -115,10 +114,10 @@ notifyJobTimeout env (W.State { name }) bm = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job timed out!") jh
notifyJobFailed :: (HasWorkerBroker PGMQBroker Job, HasCallStack)
notifyJobFailed :: (HasWorkerBroker, HasCallStack)
=> WorkerEnv
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> WState
-> BrokerMessage
-> SomeException
-> IO ()
notifyJobFailed env (W.State { name }) bm exc = do
......@@ -131,10 +130,10 @@ notifyJobFailed env (W.State { name }) bm exc = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job failed") jh
notifyJobKilled :: (HasWorkerBroker PGMQBroker Job, HasCallStack)
notifyJobKilled :: (HasWorkerBroker, HasCallStack)
=> WorkerEnv
-> W.State PGMQBroker Job
-> Maybe (BrokerMessage PGMQBroker (W.Job Job))
-> WState
-> Maybe BrokerMessage
-> IO ()
notifyJobKilled _ _ Nothing = pure ()
notifyJobKilled env (W.State { name }) (Just bm) = do
......@@ -154,20 +153,20 @@ notifyJobKilled env (W.State { name }) (Just bm) = do
-- - progress report via notifications
-- - I think there is no point to save job result, as usually there is none (we have side-effects only)
-- - replace Servant.Job to use workers instead of garg API threads
withPGMQWorker :: (HasWorkerBroker PGMQBroker Job)
withPGMQWorker :: HasWorkerBroker
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> (Async () -> WState -> IO ())
-> IO ()
withPGMQWorker env wd cb = do
state' <- initWorkerState env wd
withAsync (W.run state') (\a -> cb a state')
withPGMQWorkerSingle :: (HasWorkerBroker PGMQBroker Job)
withPGMQWorkerSingle :: HasWorkerBroker
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> (Async () -> WState -> IO ())
-> IO ()
withPGMQWorkerSingle env wd cb = do
state' <- initWorkerState env wd
......@@ -175,10 +174,10 @@ withPGMQWorkerSingle env wd cb = do
withAsync (W.runSingle state') (\a -> cb a state')
withPGMQWorkerCtrlC :: (HasWorkerBroker PGMQBroker Job)
withPGMQWorkerCtrlC :: HasWorkerBroker
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> (Async () -> WState -> IO ())
-> IO ()
withPGMQWorkerCtrlC env wd cb = do
withPGMQWorker env wd $ \a state' -> do
......@@ -186,10 +185,10 @@ withPGMQWorkerCtrlC env wd cb = do
_ <- installHandler keyboardSignal (Catch (throwTo tid W.KillWorkerSafely)) Nothing
cb a state'
withPGMQWorkerSingleCtrlC :: (HasWorkerBroker PGMQBroker Job)
withPGMQWorkerSingleCtrlC :: HasWorkerBroker
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> (Async () -> WState -> IO ())
-> IO ()
withPGMQWorkerSingleCtrlC env wd cb = do
withPGMQWorkerSingle env wd $ \a state' -> do
......@@ -199,10 +198,10 @@ withPGMQWorkerSingleCtrlC env wd cb = do
-- | How the worker should process jobs
performAction :: (HasWorkerBroker PGMQBroker Job)
performAction :: HasWorkerBroker
=> WorkerEnv
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> WState
-> BrokerMessage
-> IO ()
performAction env _state bm = do
let job' = toA $ getMessage bm
......
......@@ -13,15 +13,14 @@ module Gargantext.Core.Worker.Broker
( initBrokerWithDBCreate )
where
import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (Broker, initBroker)
import Async.Worker.Types qualified as W
import Async.Worker.Broker.PGMQ (BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (initBroker)
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.Core.Config (GargConfig(..), gc_worker)
import Gargantext.Core.Config.Worker (WorkerSettings(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, Broker)
import Gargantext.Database.Prelude (createDBIfNotExists)
import Gargantext.Prelude
......@@ -29,9 +28,9 @@ import Gargantext.Prelude
-- | Create DB if not exists, then run 'initBroker' (which, in
-- particular, creates the pgmq extension, if needed)
initBrokerWithDBCreate :: (W.HasWorkerBroker PGMQBroker Job)
initBrokerWithDBCreate :: HasWorkerBroker
=> GargConfig
-> IO (Broker PGMQBroker (W.Job Job))
-> IO Broker
initBrokerWithDBCreate gc@(GargConfig { _gc_database_config }) = do
-- By using gargantext db credentials, we create pgmq db (if needed)
let WorkerSettings { .. } = gc ^. gc_worker
......
......@@ -221,7 +221,6 @@ data WorkerJobHandle =
-- | Worker handles 1 job at a time, hence it's enough to provide
-- simple progress tracking
instance MonadJobStatus WorkerMonad where
-- type JobHandle WorkerMonad = WorkerJobHandle
type JobHandle WorkerMonad = WorkerJobHandle
type JobOutputType WorkerMonad = JobLog
type JobEventType WorkerMonad = JobLog
......
{-|
Module : Gargantext.Core.Worker.PGMQTypes
Description : Worker type aliases for PGMQ
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ConstraintKinds #-}
module Gargantext.Core.Worker.PGMQTypes where
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types qualified as BT
import Async.Worker.Types qualified as W
import Gargantext.Core.Worker.Jobs.Types (Job)
type HasWorkerBroker = W.HasWorkerBroker PGMQBroker Job
type Broker = BT.Broker PGMQBroker (W.Job Job)
type BrokerMessage = BT.BrokerMessage PGMQBroker (W.Job Job)
type WState = W.State PGMQBroker Job
......@@ -19,7 +19,6 @@ import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude
data JobInfo = JobInfo { _ji_message_id :: BT.MessageId PGMQBroker
-- NOTE: Most jobs are associated with node id.
-- The 'node_id' allows the frontend to assign progress bar to a node.
......
......@@ -37,12 +37,9 @@ import Prelude
import Test.API.Setup (withTestDBAndNotifications)
import Test.Hspec
import Test.Instances ()
import Test.Utils.Notifications
instance Eq DT.Notification where
-- simple
(==) n1 n2 = show n1 == show n2
tests :: NotificationsConfig -> D.Dispatcher -> Spec
tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do
......@@ -52,7 +49,7 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect =
withWSConnection ("127.0.0.1", port, "/ws") $ \conn -> do
withWSConnection ("127.0.0.1", port) $ \conn -> do
-- We wait a bit before the server settles
threadDelay (100 * millisecond)
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
......@@ -73,27 +70,3 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc
md <- atomically $ readTChan tchan
md `shouldBe` Just (DT.NUpdateTree nodeId)
millisecond :: Int
millisecond = 1000
-- | Wrap the logic of asynchronous connection closing
-- https://hackage.haskell.org/package/websockets-0.13.0.0/docs/Network-WebSockets-Connection.html#v:sendClose
withWSConnection :: (String, Int, String) -> WS.ClientApp () -> IO ()
withWSConnection (host, port, path) cb =
WS.runClient host port path $ \conn -> do
cb conn
-- shut down gracefully, otherwise a 'ConnectionException' is thrown
WS.sendClose conn ("" :: BS.ByteString)
-- wait for close response, should throw a 'CloseRequest' exception
Exc.catches (void $ WS.receiveDataMessage conn)
[ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ -> putStrLn "[withWSConnection] closeRequest caught"
_ -> Exc.throw err
-- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
......@@ -17,10 +17,12 @@ import Gargantext.API.Routes.Named.Node
import Gargantext.API.Routes.Named.Private hiding (tableNgramsAPI)
import Gargantext.API.Routes.Named.Table
import Gargantext.API.Types () -- MimeUnrender instances
import Gargantext.API.Worker (workerAPIPost)
import Gargantext.Core.Text.Corpus.Query (RawQuery)
import Gargantext.Core.Types (ListId, NodeId, NodeType, NodeTableResult)
import Gargantext.Core.Types.Main (ListType)
import Gargantext.Core.Types.Query (Limit, MaxSize, MinSize, Offset)
import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Database.Admin.Types.Hyperdata
import Gargantext.Database.Query.Facet qualified as Facet
import Gargantext.Prelude
......@@ -80,7 +82,7 @@ toServantToken = S.Token . TE.encodeUtf8
update_node :: Token
-> NodeId
-> UpdateNodeParams
-> ClientM (JobStatus 'Safe JobLog)
-> ClientM JobInfo
update_node (toServantToken -> token) nodeId params =
clientRoutes & apiWithCustomErrorScheme
& ($ GES_new)
......@@ -96,8 +98,8 @@ update_node (toServantToken -> token) nodeId params =
& ($ nodeId)
& updateAPI
& updateNodeEp
& asyncJobsAPI'
& (\(_ :<|> submitForm :<|> _) -> submitForm (JobInput params Nothing))
& workerAPIPost
& (\submitForm -> submitForm params)
get_table_ngrams :: Token
-> NodeId
......
......@@ -23,7 +23,7 @@ import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..), env_dispatcher)
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Config (_gc_secrets, gc_frontend_config, gc_jobs, hasConfig)
import Gargantext.Core.Config (_gc_secrets, gc_frontend_config, hasConfig)
import Gargantext.Core.Config.Types (SettingsFile(..), jc_js_job_timeout, jc_js_id_timeout, fc_appPort, jwtSettings)
import Gargantext.Core.Types.Individu
import Gargantext.Database.Action.Flow
......@@ -36,10 +36,7 @@ import Gargantext.Database.Query.Table.Node (getOrMkList)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..))
import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp)
import Gargantext.System.Logging
import Gargantext.Utils.Jobs qualified as Jobs
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Gargantext.Utils.Jobs.Queue qualified as Jobs
import Gargantext.Utils.Jobs.Settings qualified as Jobs
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Types
import Network.Wai (Application, responseLBS)
......@@ -78,8 +75,6 @@ newTestEnv testEnv logger port = do
!manager_env <- newTlsManager
let config_env = test_config testEnv & (gc_frontend_config . fc_appPort) .~ port
prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs")
let prios' = Jobs.applyPrios prios Jobs.defaultPrios
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
-- dbParam <- pure $ testEnvToPgConnectionInfo testEnv
-- !pool <- newPool dbParam
......@@ -87,11 +82,6 @@ newTestEnv testEnv logger port = do
-- !nodeStory_env <- fromDBNodeStoryEnv pool
!scrapers_env <- ServantAsync.newJobEnv ServantAsync.defaultSettings manager_env
secret <- Jobs.genSecret
let jobs_settings = (Jobs.defaultJobSettings 1 secret)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!_env_jwt_settings <- jwtSettings (_gc_secrets config_env)
-- !central_exchange <- forkIO CE.gServer
......@@ -107,7 +97,6 @@ newTestEnv testEnv logger port = do
, _env_nodeStory = test_nodeStory testEnv
, _env_manager = manager_env
, _env_scrapers = scrapers_env
, _env_jobs = jobs_env
, _env_self_url = self_url_env
, _env_config = config_env
, _env_central_exchange = Prelude.error "[Test.API.Setup.Env] central exchange not needed, but forced somewhere (check StrictData)"
......
......@@ -73,7 +73,7 @@ import Test.Hspec.Wai.Internal (withApplication, WaiSession)
import Test.Hspec.Wai.JSON (json)
import Test.Hspec.Wai (shouldRespondWith)
import Test.Types (JobPollHandle(..))
import Test.Utils (getJSON, pollUntilFinished, postJSONUrlEncoded, protectedJSON, withValidLogin)
import Test.Utils (getJSON, pollUntilFinished, pollUntilWorkFinished, postJSONUrlEncoded, protectedJSON, withValidLogin)
import Text.Printf (printf)
import Web.FormUrlEncoded
......@@ -358,8 +358,8 @@ createDocsList testDataPath testEnv port clientEnv token = do
-- let mkPollUrl jh = "/corpus/" <> fromString (show $ _NodeId corpusId) <> "/add/form/async/" +|_jph_id jh|+ "/poll?limit=1"
-- j' <- pollUntilFinished token port mkPollUrl j
-- liftIO (_jph_status j' `shouldBe` "IsFinished")
j' <- pollUntilWorkFinished token port ji
liftIO $ j' `shouldSatisfy` isRight
ji' <- pollUntilWorkFinished token port ji
liftIO $ ji' `shouldBe` ji
pure corpusId
createFortranDocsList :: TestEnv -> Int -> ClientEnv -> Token -> WaiSession () CorpusId
......@@ -369,10 +369,13 @@ createFortranDocsList testEnv port =
updateNode :: Int -> ClientEnv -> Token -> NodeId -> WaiSession () ()
updateNode port clientEnv token nodeId = do
let params = UpdateNodeParamsTexts Both
(j :: JobPollHandle) <- checkEither $ fmap toJobPollHandle <$> liftIO (runClientM (update_node token nodeId params) clientEnv)
let mkPollUrl jh = "/node/" <> fromString (show $ _NodeId nodeId) <> "/update/" +|_jph_id jh|+ "/poll?limit=1"
j' <- pollUntilFinished token port mkPollUrl j
liftIO (_jph_status j' `shouldBe` "IsFinished")
-- (j :: JobPollHandle) <- checkEither $ fmap toJobPollHandle <$> liftIO (runClientM (update_node token nodeId params) clientEnv)
ji <- checkEither $ liftIO $ runClientM (update_node token nodeId params) clientEnv
-- let mkPollUrl jh = "/node/" <> fromString (show $ _NodeId nodeId) <> "/update/" +|_jph_id jh|+ "/poll?limit=1"
-- j' <- pollUntilFinished token port mkPollUrl j
-- liftIO (_jph_status j' `shouldBe` "IsFinished")
ji' <- pollUntilWorkFinished token port ji
liftIO $ ji' `shouldBe` ji
toJobPollHandle :: JobStatus 'Safe JobLog -> JobPollHandle
toJobPollHandle = either (\x -> panicTrace $ "toJobPollHandle:" <> T.pack x) identity . JSON.eitherDecode . JSON.encode
......
......@@ -37,7 +37,7 @@ import Gargantext.Core.NLP (HasNLPServer(..))
import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.System.Logging (HasLogger(..), Logger, MonadLogger(..))
import Gargantext.Utils.Jobs
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Network.URI (parseURI)
import Prelude qualified
import System.Log.FastLogger qualified as FL
......@@ -77,7 +77,6 @@ data TestJobHandle = TestNoJobHandle
instance MonadJobStatus TestMonad where
type JobHandle TestMonad = TestJobHandle
type JobType TestMonad = GargJob
type JobOutputType TestMonad = JobLog
type JobEventType TestMonad = JobLog
......
......@@ -16,36 +16,33 @@ module Test.Instances
where
import Data.List.NonEmpty qualified as NE
import Data.Map.Strict qualified as Map
import Data.Map.Strict.Patch qualified as PM
import Data.Patch.Class (Replace(Keep), replace)
import Data.Patch.Class (Replace, replace)
import Data.Text qualified as T
import Data.Validity (Validation(..), ValidationChain (..), prettyValidation)
import EPO.API.Client.Types qualified as EPO
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(..), ForgotPasswordAsyncParams(..))
import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.API.Errors.Types qualified as Errors
import Gargantext.API.Ngrams.Types qualified as Ngrams
import Gargantext.API.Node.Corpus.Annuaire (AnnuaireWithForm(..))
import Gargantext.API.Node.Corpus.New (ApiInfo(..))
import Gargantext.API.Node.DocumentsFromWriteNodes.Types qualified as DFWN
import Gargantext.API.Node.DocumentUpload.Types (DocumentUpload(..))
import Gargantext.API.Node.FrameCalcUpload.Types qualified as FCU
import Gargantext.API.Node.Types (NewWithForm(..), RenameNode(..), WithQuery(..))
import Gargantext.API.Node.Types (RenameNode(..), WithQuery(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DET
import Gargantext.Core.NodeStory.Types qualified as NS
import Gargantext.Core.Text.Ngrams (NgramsType(..))
import Gargantext.Core.Types.Individu qualified as Individu
import Gargantext.Core.Types.Main (ListType(CandidateTerm, StopTerm, MapTerm))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Admin.Types.Node (UserId(UnsafeMkUserId))
import Gargantext.Database.Admin.Types.Hyperdata qualified as Hyperdata
import Gargantext.Prelude hiding (replace, Location)
import Servant.Job.Core qualified as SJ
import Servant.Job.Types qualified as SJ
import Test.QuickCheck
import Test.Tasty.QuickCheck hiding (Positive, Negative)
import Text.Parsec.Error (ParseError, Message(..), newErrorMessage)
import Text.Parsec.Pos
import Test.QuickCheck
instance Arbitrary AuthenticatedUser where
......@@ -53,52 +50,6 @@ instance Arbitrary AuthenticatedUser where
<*> arbitrary -- _auth_user_id
instance Arbitrary EnvTypes.GargJob where
arbitrary = do
oneof [ pure AddAnnuaireFormJob
, pure AddContactJob
, pure AddCorpusFileJob
, pure AddCorpusFormJob
, pure AddCorpusQueryJob
, pure AddFileJob
, pure DocumentFromWriteNodeJob
, pure ForgotPasswordJob
, pure NewNodeJob
, pure RecomputeGraphJob
, pure TableNgramsJob
, pure UpdateNgramsListJobJSON
, pure UpdateNgramsListJobTSV
, pure UpdateNodeJob
, pure UploadDocumentJob
, pure UploadFrameCalcJob
]
instance Arbitrary Job where
arbitrary = oneof [ pure Ping
, addCorpusFormAsyncGen
, forgotPasswordAsyncGen
, newNodeAsyncGen
, gargJobGen ]
where
forgotPasswordAsyncGen = do
email <- arbitrary
return $ ForgotPasswordAsync (ForgotPasswordAsyncParams { email })
addCorpusFormAsyncGen = do
_acf_args <- arbitrary
_acf_user <- arbitrary
_acf_cid <- arbitrary
return $ AddCorpusFormAsync { .. }
newNodeAsyncGen = do
_nna_node_id <- arbitrary
_nna_authenticatedUser <- arbitrary
_nna_postNode <- arbitrary
return $ NewNodeAsync { .. }
gargJobGen = do
_gj_garg_job <- arbitrary
return $ GargJob { _gj_garg_job }
instance Arbitrary Message where
arbitrary = do
msgContent <- arbitrary
......@@ -177,6 +128,38 @@ instance Arbitrary WithQuery where
pure $ WithQuery { .. }
-- The endpoint does nothing currently, but if it will, we need to provide some valid _wf_data
instance Arbitrary AnnuaireWithForm where
arbitrary = AnnuaireWithForm <$> arbitrary -- _wf_filetype
<*> arbitrary -- _wf_data
<*> arbitrary -- _wf_lang
instance Arbitrary DFWN.Params where
arbitrary = DFWN.Params <$> arbitrary -- id
<*> arbitrary -- paragraphs
<*> arbitrary -- lang
<*> arbitrary -- selection
instance Arbitrary ForgotPasswordAsyncParams where
arbitrary = ForgotPasswordAsyncParams <$> arbitrary -- TODO fix proper email
instance Arbitrary FCU.FrameCalcUpload where
arbitrary = FCU.FrameCalcUpload <$> arbitrary -- _wf_lang
<*> arbitrary -- _wf_selection
instance Arbitrary Ngrams.UpdateTableNgramsCharts where
arbitrary = Ngrams.UpdateTableNgramsCharts <$> arbitrary -- _utn_tab_type
<*> arbitrary -- _utn_list_id
instance Arbitrary DocumentUpload where
arbitrary = DocumentUpload <$> arbitrary -- _du_abstract
<*> arbitrary -- _du_authors
<*> arbitrary -- _du_sources
<*> arbitrary -- _du_title
<*> arbitrary -- _du_date -- TODO This isn't arbitrary
<*> arbitrary -- _du_language
-- Hyperdata
instance Arbitrary Hyperdata.HyperdataUser where
arbitrary = Hyperdata.HyperdataUser <$> arbitrary
......@@ -197,6 +180,11 @@ instance Arbitrary Hyperdata.HyperdataPublic where
instance Arbitrary a => Arbitrary (SJ.JobOutput a) where
arbitrary = SJ.JobOutput <$> arbitrary
-- instance Arbitrary NewWithFile where
-- arbitrary = NewWithFile <$> arbitrary -- _wfi_b64_data
-- <*> arbitrary -- _wf_lang
-- <*> arbitrary -- _wf_name
instance Arbitrary NewWithForm where
arbitrary = NewWithForm <$> arbitrary -- _wf_filetype
<*> arbitrary -- _wf_fileformat
......@@ -263,16 +251,18 @@ instance Arbitrary DET.WSRequest where
-- Ngrams
instance Arbitrary a => Arbitrary (Ngrams.MSet a)
instance Arbitrary Ngrams.NgramsTerm
instance Arbitrary Ngrams.NgramsTerm where
arbitrary = Ngrams.NgramsTerm <$>
-- we take into accoutn the fact, that tojsonkey strips the text
(arbitrary `suchThat` (\t -> t == T.strip t))
instance Arbitrary Ngrams.TabType where
arbitrary = elements [minBound .. maxBound]
instance Arbitrary Ngrams.NgramsElement where
arbitrary = elements [Ngrams.newNgramsElement Nothing "sport"]
instance Arbitrary Ngrams.NgramsTable where
arbitrary = pure ngramsMockTable
instance Arbitrary Ngrams.OrderBy
where
arbitrary = elements [minBound..maxBound]
instance Arbitrary Ngrams.OrderBy where
arbitrary = elements [minBound..maxBound]
instance (Ord a, Arbitrary a) => Arbitrary (Ngrams.PatchMSet a) where
arbitrary = (Ngrams.PatchMSet . PM.fromMap) <$> arbitrary
instance (Eq a, Arbitrary a) => Arbitrary (Replace a) where
......@@ -440,3 +430,37 @@ genFrontendErr be = do
Errors.EC_500__job_generic_exception
-> do err <- arbitrary
pure $ Errors.mkFrontendErr' txt $ Errors.FE_job_generic_exception err
instance Arbitrary Job where
arbitrary = oneof [ pure Ping
, addContactGen
, addCorpusFormAsyncGen
, addCorpusWithQueryGen
-- , addWithFileGen
, addToAnnuaireWithFormGen
, documentsFromWriteNodesGen
, forgotPasswordAsyncGen
, frameCalcUploadGen
, jsonPostGen
, ngramsPostChartsGen
, postNodeAsyncGen
, recomputeGraphGen
, updateNodeGen
, uploadDocumentGen ]
where
addContactGen = AddContact <$> arbitrary <*> arbitrary <*> arbitrary
addCorpusFormAsyncGen = AddCorpusFormAsync <$> arbitrary <*> arbitrary <*> arbitrary
addCorpusWithQueryGen = AddCorpusWithQuery <$> arbitrary <*> arbitrary <*> arbitrary
-- addWithFileGen = AddWithFile <$> arbitrary <*> arbitrary <*> arbitrary
addToAnnuaireWithFormGen = AddToAnnuaireWithForm <$> arbitrary <*> arbitrary
documentsFromWriteNodesGen = DocumentsFromWriteNodes <$> arbitrary <*> arbitrary <*> arbitrary
forgotPasswordAsyncGen = ForgotPasswordAsync <$> arbitrary
frameCalcUploadGen = FrameCalcUpload <$> arbitrary <*> arbitrary <*> arbitrary
jsonPostGen = JSONPost <$> arbitrary <*> arbitrary
ngramsPostChartsGen = NgramsPostCharts <$> arbitrary <*> arbitrary
postNodeAsyncGen = PostNodeAsync <$> arbitrary <*> arbitrary <*> arbitrary
recomputeGraphGen = RecomputeGraph <$> arbitrary
updateNodeGen = UpdateNode <$> arbitrary <*> arbitrary
uploadDocumentGen = UploadDocument <$> arbitrary <*> arbitrary
......@@ -3,6 +3,7 @@
module Test.Utils where
import Control.Concurrent.STM.TVar (newTVarIO, writeTVar, readTVarIO)
import Control.Exception.Safe ()
import Control.Monad ()
import Data.Aeson qualified as JSON
......@@ -19,6 +20,7 @@ import Fmt (Builder)
import Gargantext.API.Admin.Auth.Types (AuthRequest(..), Token, authRes_token)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Routes.Types (xGargErrorScheme)
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Types.Individu (Username, GargPassword)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
......@@ -28,6 +30,7 @@ import Network.HTTP.Types (Header, Method, status200)
import Network.HTTP.Types.Header (hAccept, hAuthorization, hContentType)
import Network.Wai.Handler.Warp (Port)
import Network.Wai.Test (SResponse(..))
import Network.WebSockets qualified as WS
import Prelude qualified
import Servant.Client (ClientEnv, baseUrlPort, mkClientEnv, parseBaseUrl, runClientM, makeClientRequest, defaultMakeClientRequest)
import Servant.Client.Core (BaseUrl)
......@@ -41,6 +44,7 @@ import Test.Hspec.Wai.JSON (FromValue(..))
import Test.Hspec.Wai.Matcher (MatchHeader(..), ResponseMatcher(..), bodyEquals, formatHeader, match)
import Test.Tasty.HUnit (Assertion, assertBool)
import Test.Types
import Test.Utils.Notifications (withWSConnection, millisecond)
-- | Marks the input 'Assertion' as pending, by ignoring any exception
......@@ -242,29 +246,45 @@ pollUntilWorkFinished :: HasCallStack
-> Port
-> JobInfo
-> WaiSession () JobInfo
pollUntilWorkFinished tkn port = go 60
-- TODO Poll dispatcher for markJobFinished
where
go :: Int -> JobInfo -> WaiSession () JobInfo
go 0 ji = panicTrace $ "pollUntilWorkFinished exhausted attempts. Last found JobInfo: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode ji)
go n ji = case _jph_status h == "IsPending" || _jph_status h == "IsRunning" of
True -> do
liftIO $ threadDelay 1_000_000
h' <- protectedJSON tkn "GET" (mkUrl port $ mkUrlPiece h) ""
go (n-1) h'
False
| _jph_status h == "IsFailure"
-> panicTrace $ "JobPollHandle contains a failure: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode h)
| otherwise
-> case any hasError (_jph_log h) of
True -> panicTrace $ "JobPollHandle contains a failure: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode h)
False -> pure h
pollUntilWorkFinished tkn port ji = do
isFinishedTVar <- liftIO $ newTVarIO False
let wsConnect =
withWSConnection ("localhost", port) $ \conn -> do
-- We wait a bit before the server settles
threadDelay (100 * millisecond)
-- subscribe to notifications about this job
let topic = DT.UpdateWorkerProgress ji
WS.sendTextData conn $ JSON.encode (DT.WSSubscribe topic)
forever $ do
d <- WS.receiveData conn
let dec = JSON.decode d :: Maybe DT.Notification
case dec of
Nothing -> pure ()
Just (DT.NUpdateWorkerProgress ji' jl) ->
if ji' == ji && isFinished jl
then do
atomically $ writeTVar isFinishedTVar True
else
pure ()
_ -> pure ()
liftIO $ withAsync wsConnect $ \a -> do
mRet <- Timeout.timeout (60 * 1000 * millisecond) $ do
let go = do
isFinished <- readTVarIO isFinishedTVar
if isFinished
then return True
else do
threadDelay (1000 * millisecond)
go
go
case mRet of
Nothing -> panicTrace $ "[pollUntilWorkFinished] timed out while waiting to finish job " <> show ji
Just _ -> return ji
-- FIXME(adn) This is wrong, errs should be >= 1.
hasError :: JobLog -> Bool
hasError JobLog{..} = case _scst_failed of
Nothing -> False
Just errs -> errs > 1
where
isFinished (JobLog { .. }) = _scst_remaining == Just 0
-- | Like HUnit's '@?=', but With a nicer error message in case the two entities are not equal.
(@??=) :: (HasCallStack, ToExpr a, Eq a) => a -> a -> Assertion
......
......@@ -12,8 +12,9 @@ Portability : POSIX
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Test.Utils.Jobs ( test, qcTests ) where
module Test.Utils.Jobs ( test ) where
import Async.Worker.Types qualified as WT
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Async qualified as Async
......@@ -28,12 +29,11 @@ import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.Config (GargConfig(..))
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, BrokerMessage, WState)
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
import Network.HTTP.Client (Manager)
import Network.HTTP.Client.TLS (newTlsManager)
import Prelude qualified
......@@ -47,6 +47,39 @@ import Test.Instances () -- arbitrary instances
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.QuickCheck hiding (Positive, Negative)
import Test.Utils (waitUntil)
import Test.Utils.Jobs.Types
-- | TODO This suite did test some old-style worker internals. We
-- moved functionality to new-style workers. Some of these tests are
-- obsolete now. We could implement tests only to check that we wired
-- things properly.
-- E.g. we could check that the job progress is reflected correctly
-- via the dispatcher notifications mechanism.
test :: Spec
test = do
pure ()
-- describe "job queue" $ do
-- it "respects max runners limit" $
-- testMaxRunners
-- it "respects priorities" $
-- testPrios
-- it "can handle exceptions" $
-- testExceptions
-- it "fairly picks equal-priority-but-different-kind jobs" $
-- testFairness
-- describe "job status update and tracking" $ sequential $ do
-- it "can fetch the latest job status" $
-- testFetchJobStatus
-- it "can spin two separate jobs and track their status separately" $
-- testFetchJobStatusNoContention
-- it "marking stuff behaves as expected" $
-- testMarkProgress
data JobT = A
......@@ -91,407 +124,289 @@ sampleRunningJobs timer runningJs samples = do
[] -> pure () -- ignore empty runs, when the system is kickstarting.
xs -> writeTQueue samples xs
-- | The aim of this test is to ensure that the \"max runners\" setting is
-- respected, i.e. we have no more than \"N\" jobs running at the same time.
testMaxRunners :: IO ()
testMaxRunners = do
-- max runners = 2 with default settings
let num_jobs = 4
k <- genSecret
let settings = defaultJobSettings 2 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
now <- getCurrentTime
runningJs <- newTVarIO []
samples <- newTQueueIO
remainingJs <- newTVarIO num_jobs
-- Not the most elegant solution, but in order to test the \"max runners\"
-- parameter we start an asynchronous computation that continuously reads the content
-- of 'runningJs' and at the end ensures that this value was
-- always <= \"max_runners" (but crucially not 0).
asyncReader <- async $ forever $ do
samplingFrequency <- registerDelay 100_000
atomically $ sampleRunningJobs samplingFrequency runningJs samples
let duration = 1_000_000
j num _jHandle _inp _l = do
durationTimer <- registerDelay duration
-- NOTE: We do the modification of the 'runningJs' and the rest
-- in two transactions on purpose, to give a chance to the async
-- sampler to sample the status of the world.
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
atomically $ do
waitTimerSTM durationTimer
modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
modifyTVar remainingJs pred
jobs = [ (A, j n) | n <- [1..num_jobs::Int] ]
atomically $ forM_ jobs $ \(t, f) -> void $
pushJobWithTime now t () f settings st
let waitFinished = atomically $ do
x <- readTVar remainingJs
check (x == 0)
-- Wait for the jobs to finish, then stop the sampler.
waitFinished
cancel asyncReader
-- Check that we got /some/ samples and for each of them,
-- let's check only two runners at max were alive.
allSamples <- atomically $ flushTQueue samples
length allSamples `shouldSatisfy` (> 0)
forM_ allSamples $ \runLog -> do
annotate "predicate to satisfy: (x `isInfixOf` [\"Job #1\", \"Job #2\"] || x `isInfixOf` [\"Job #3\", \"Job #4\"]" $
shouldSatisfy (sort runLog)
(\x -> x `isInfixOf` ["Job #1", "Job #2"]
|| x `isInfixOf` ["Job #3", "Job #4"])
testPrios :: IO ()
testPrios = do
k <- genSecret
-- Use a single runner, so that we can check the order of execution
-- without worrying about the runners competing with each other.
let settings = defaultJobSettings 1 k
prios = [(B, 10), (C, 1), (D, 5)]
st :: JobsState JobT [Prelude.String] () <- newJobsState settings $
applyPrios prios defaultPrios -- B has the highest priority
pickedSchedule <- newMVar (JobSchedule mempty)
let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
jobs = [ (A, j A)
, (C, j C)
, (B, j B)
, (D, j D)
]
-- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
-- the time 'popQueue' gets called.
now <- getCurrentTime
atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
-- wait for the jobs to finish, waiting for more than the total duration,
-- so that we are sure that all jobs have finished, then check the schedule.
-- threadDelay jobDuration
waitUntil (do
finalSchedule <- readMVar pickedSchedule
pure $ finalSchedule == JobSchedule (fromList [B, D, C, A])) jobDuration
testExceptions :: IO ()
testExceptions = do
k <- genSecret
let settings = defaultJobSettings 1 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
-- Wait 1 second to make sure the job is finished.
threadDelay $ 1_000_000
mjob <- lookupJob jid (jobsData st)
case mjob of
Nothing -> Prelude.fail "lookupJob failed, job not found!"
Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True
unexpected -> Prelude.fail $ "Expected job to be done, but got: " <> anythingToString unexpected
return ()
testFairness :: IO ()
testFairness = do
k <- genSecret
let settings = defaultJobSettings 1 k
st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
pickedSchedule <- newMVar (JobSchedule mempty)
let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
jobs = [ (A, j A)
, (A, j A)
, (B, j B)
, (A, j A)
, (A, j A)
]
time <- getCurrentTime
-- in this scenario we simulate two types of jobs all with
-- all the same level of priority: our queue implementation
-- will behave as a classic FIFO, keeping into account the
-- time of arrival.
atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
-- threadDelay jobDuration
waitUntil (do
finalSchedule <- readMVar pickedSchedule
pure $ finalSchedule == JobSchedule (fromList [A, A, B, A, A])) jobDuration
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: GargM Env BackendInternalError a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader Env)
instance MonadJob MyDummyMonad GargJob (Seq JobLog) JobLog where
getJobEnv = MyDummyMonad getJobEnv
instance MonadJobStatus MyDummyMonad where
type JobHandle MyDummyMonad = EnvTypes.ConcreteJobHandle BackendInternalError
type JobType MyDummyMonad = GargJob
type JobOutputType MyDummyMonad = JobLog
type JobEventType MyDummyMonad = JobLog
noJobHandle _ = noJobHandle (Proxy :: Proxy (GargM Env BackendInternalError))
getLatestJobStatus jId = MyDummyMonad (getLatestJobStatus jId)
withTracer _ jh n = n jh
markStarted n jh = MyDummyMonad (markStarted n jh)
markProgress steps jh = MyDummyMonad (markProgress steps jh)
markFailure steps mb_msg jh = MyDummyMonad (markFailure steps mb_msg jh)
markComplete jh = MyDummyMonad (markComplete jh)
markFailed mb_msg jh = MyDummyMonad (markFailed mb_msg jh)
addMoreSteps steps jh = MyDummyMonad (addMoreSteps steps jh)
runMyDummyMonad :: Env -> MyDummyMonad a -> IO a
runMyDummyMonad env m = do
res <- runExceptT . flip runReaderT env $ _MyDummyMonad m
case res of
Left e -> throwIO e
Right x -> pure x
-- testPrios :: IO ()
-- testPrios = do
-- k <- genSecret
-- -- Use a single runner, so that we can check the order of execution
-- -- without worrying about the runners competing with each other.
-- let settings = defaultJobSettings 1 k
-- prios = [(B, 10), (C, 1), (D, 5)]
-- st :: JobsState JobT [Prelude.String] () <- newJobsState settings $
-- applyPrios prios defaultPrios -- B has the highest priority
-- pickedSchedule <- newMVar (JobSchedule mempty)
-- let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
-- jobs = [ (A, j A)
-- , (C, j C)
-- , (B, j B)
-- , (D, j D)
-- ]
-- -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
-- -- the time 'popQueue' gets called.
-- now <- getCurrentTime
-- atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
-- -- wait for the jobs to finish, waiting for more than the total duration,
-- -- so that we are sure that all jobs have finished, then check the schedule.
-- -- threadDelay jobDuration
-- waitUntil (do
-- finalSchedule <- readMVar pickedSchedule
-- pure $ finalSchedule == JobSchedule (fromList [B, D, C, A])) jobDuration
-- testExceptions :: IO ()
-- testExceptions = do
-- k <- genSecret
-- let settings = defaultJobSettings 1 k
-- st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
-- jid <- pushJob A ()
-- (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
-- settings st
-- -- Wait 1 second to make sure the job is finished.
-- threadDelay $ 1_000_000
-- mjob <- lookupJob jid (jobsData st)
-- case mjob of
-- Nothing -> Prelude.fail "lookupJob failed, job not found!"
-- Just je -> case jTask je of
-- DoneJ _ r -> isLeft r `shouldBe` True
-- unexpected -> Prelude.fail $ "Expected job to be done, but got: " <> anythingToString unexpected
-- return ()
-- testFairness :: IO ()
-- testFairness = do
-- k <- genSecret
-- let settings = defaultJobSettings 1 k
-- st :: JobsState JobT [Prelude.String] () <- newJobsState settings defaultPrios
-- pickedSchedule <- newMVar (JobSchedule mempty)
-- let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
-- jobs = [ (A, j A)
-- , (A, j A)
-- , (B, j B)
-- , (A, j A)
-- , (A, j A)
-- ]
-- time <- getCurrentTime
-- -- in this scenario we simulate two types of jobs all with
-- -- all the same level of priority: our queue implementation
-- -- will behave as a classic FIFO, keeping into account the
-- -- time of arrival.
-- atomically $ forM_ (zip [0,2 ..] jobs) $ \(timeDelta, (t, f)) -> void $
-- pushJobWithTime (addUTCTime (fromInteger timeDelta) time) t () f settings st
-- -- threadDelay jobDuration
-- waitUntil (do
-- finalSchedule <- readMVar pickedSchedule
-- pure $ finalSchedule == JobSchedule (fromList [A, A, B, A, A])) jobDuration
testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}
withJob :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO (SJ.JobStatus 'SJ.Safe JobLog)
withJob env f = runMyDummyMonad env $ MyDummyMonad $
-- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
newJob @_ mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
withJob_ :: Env
-> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-> IO ()
withJob_ env f = void (withJob env f)
newTestEnv :: IO Env
newTestEnv = do
k <- genSecret
let settings = defaultJobSettings 1 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
let fmt_error v = Prelude.error $ "[Test.Utils.Jobs.Env] " <> v <> " not needed, but forced somewhere (check StrictData)"
let _gc_notifications_config =
NotificationsConfig { _nc_central_exchange_bind = fmt_error "nc_central_exchange_bind"
, _nc_central_exchange_connect = "tcp://localhost:15510"
, _nc_dispatcher_bind = fmt_error "nc_dispatcher_bind"
, _nc_dispatcher_connect = fmt_error "nc_dispatcher_connect" }
let _env_config =
GargConfig { _gc_datafilepath = fmt_error "gc_datafilepath"
, _gc_frontend_config = fmt_error "gc_frontend_config"
, _gc_mail_config = fmt_error "gc_mail_config"
, _gc_database_config = fmt_error "gc_database_config"
, _gc_nlp_config = fmt_error "gc_nlp_config"
, _gc_notifications_config
, _gc_frames = fmt_error "gc_frames not needed"
, _gc_jobs = fmt_error "gc_jobs not needed"
, _gc_secrets = fmt_error "gc_secrets"
, _gc_apis = fmt_error "gc_apis"
, _gc_log_level = fmt_error "gc_log_level"
}
pure $ Env
{ _env_logger = fmt_error "env_logger"
, _env_pool = fmt_error "env_pool"
, _env_nodeStory = fmt_error "env_nodeStory"
, _env_manager = testTlsManager
, _env_self_url = fmt_error "self_url"
, _env_scrapers = fmt_error "scrapers"
, _env_jobs = myEnv
, _env_config
, _env_central_exchange = fmt_error "central exchange"
, _env_dispatcher = fmt_error "dispatcher"
, _env_jwt_settings = fmt_error "jwt_settings"
}
testFetchJobStatus :: IO ()
testFetchJobStatus = do
myEnv <- newTestEnv
evts <- newMVar []
withJob_ myEnv $ \hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
markStarted 10 hdl
mb_status' <- getLatestJobStatus hdl
markProgress 5 hdl
mb_status'' <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure ()
-- threadDelay 500_000
-- Check the events
-- readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
waitUntil (do
evts' <- readMVar evts
pure $ map _scst_remaining evts' == [Nothing, Just 10, Just 5]
) 1000
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
myEnv <- newTestEnv
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 100 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure ()
let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
markStarted 50 hdl
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure ()
Async.forConcurrently_ [job1, job2] ($ ())
-- threadDelay 500_000
-- Check the events
waitUntil (do
evts1' <- readMVar evts1
evts2' <- readMVar evts2
pure $ (map _scst_remaining evts1' == [Just 100]) &&
(map _scst_remaining evts2' == [Just 50])
) 500
testMarkProgress :: IO ()
testMarkProgress = do
myEnv <- newTestEnv
-- evts <- newTBQueueIO 7
evts <- newTVarIO []
let expectedEvents = 7
let getStatus hdl = do
liftIO $ threadDelay 100_000
st <- getLatestJobStatus hdl
-- liftIO $ atomically $ writeTBQueue evts st
liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st])
readAllEvents = do
-- We will get thread blocking if there is ANY error in the job
-- Hence we assert the `readAllEvents` test doesn't take too long
mRet <- timeout 5_000_000 $ atomically $ do
-- allEventsArrived <- isFullTBQueue evts
evts' <- readTVar evts
-- STM retry if things failed
-- check allEventsArrived
check (length evts' == expectedEvents)
-- flushTBQueue evts
pure evts'
case mRet of
Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events."
Just xs
| length xs == expectedEvents
-> pure xs
| otherwise
-> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs
withJob_ myEnv $ \hdl _input -> do
markStarted 10 hdl
getStatus hdl
markProgress 1 hdl
getStatus hdl
markFailureNoErr 1 hdl
getStatus hdl
markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
getStatus hdl
markComplete hdl
getStatus hdl
markStarted 5 hdl
markProgress 1 hdl
getStatus hdl
markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
getStatus hdl
evts' <- readAllEvents
-- This pattern match should never fail, because the precondition is
-- checked in 'readAllEvents'.
let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts'
-- Check the events are what we expect
jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 9
, _scst_events = Just []
}
jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 1
, _scst_remaining = Just 8
, _scst_events = Just []
}
jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 2
, _scst_remaining = Just 7
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
, _scst_failed = Just 2
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 4
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "kaboom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
test :: Spec
test = do
describe "job queue" $ do
it "respects max runners limit" $
testMaxRunners
it "respects priorities" $
testPrios
it "can handle exceptions" $
testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $
testFairness
describe "job status update and tracking" $ sequential $ do
it "can fetch the latest job status" $
testFetchJobStatus
it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention
it "marking stuff behaves as expected" $
testMarkProgress
qcTests :: TestTree
qcTests = testGroup "jobs qc tests" [
testProperty "GargJob to/from JSON serialization is correct" $
\job -> Aeson.decode (Aeson.encode (job :: EnvTypes.GargJob)) == Just job
]
-- withJob :: Env
-- -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-- -> IO (SJ.JobStatus 'SJ.Safe JobLog)
-- withJob env f = runMyDummyMonad env $ MyDummyMonad $
-- -- the job type doesn't matter in our tests, we use a random one, as long as it's of type 'GargJob'.
-- newJob @_ mkJobHandle (pure env) RecomputeGraphJob (\_ hdl input ->
-- runMyDummyMonad env $ (Right <$> (f hdl input >> getLatestJobStatus hdl))) (SJ.JobInput () Nothing)
-- withJob_ :: Env
-- -> (JobHandle MyDummyMonad -> () -> MyDummyMonad ())
-- -> IO ()
-- withJob_ env f = void (withJob env f)
-- newTestEnv :: IO Env
-- newTestEnv = do
-- let fmt_error v = Prelude.error $ "[Test.Utils.Jobs.Env] " <> v <> " not needed, but forced somewhere (check StrictData)"
-- let _gc_notifications_config =
-- NotificationsConfig { _nc_central_exchange_bind = fmt_error "nc_central_exchange_bind"
-- , _nc_central_exchange_connect = "tcp://localhost:15510"
-- , _nc_dispatcher_bind = fmt_error "nc_dispatcher_bind"
-- , _nc_dispatcher_connect = fmt_error "nc_dispatcher_connect" }
-- let _env_config =
-- GargConfig { _gc_datafilepath = fmt_error "gc_datafilepath"
-- , _gc_frontend_config = fmt_error "gc_frontend_config"
-- , _gc_mail_config = fmt_error "gc_mail_config"
-- , _gc_database_config = fmt_error "gc_database_config"
-- , _gc_nlp_config = fmt_error "gc_nlp_config"
-- , _gc_notifications_config
-- , _gc_frames = fmt_error "gc_frames not needed"
-- , _gc_jobs = fmt_error "gc_jobs not needed"
-- , _gc_secrets = fmt_error "gc_secrets"
-- , _gc_apis = fmt_error "gc_apis"
-- , _gc_log_level = fmt_error "gc_log_level"
-- }
-- pure $ Env
-- { _env_logger = fmt_error "env_logger"
-- , _env_pool = fmt_error "env_pool"
-- , _env_nodeStory = fmt_error "env_nodeStory"
-- , _env_manager = testTlsManager
-- , _env_self_url = fmt_error "self_url"
-- , _env_scrapers = fmt_error "scrapers"
-- , _env_config
-- , _env_central_exchange = fmt_error "central exchange"
-- , _env_dispatcher = fmt_error "dispatcher"
-- , _env_jwt_settings = fmt_error "jwt_settings"
-- }
-- testFetchJobStatus :: IO ()
-- testFetchJobStatus = do
-- myEnv <- newTestEnv
-- evts <- newMVar []
-- withJob_ myEnv $ \hdl _input -> do
-- mb_status <- getLatestJobStatus hdl
-- -- now let's log something
-- markStarted 10 hdl
-- mb_status' <- getLatestJobStatus hdl
-- markProgress 5 hdl
-- mb_status'' <- getLatestJobStatus hdl
-- liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
-- pure ()
-- -- threadDelay 500_000
-- -- Check the events
-- -- readMVar evts >>= \expected -> map _scst_remaining expected `shouldBe` [Nothing, Just 10, Just 5]
-- waitUntil (do
-- evts' <- readMVar evts
-- pure $ map _scst_remaining evts' == [Nothing, Just 10, Just 5]
-- ) 1000
-- testFetchJobStatusNoContention :: IO ()
-- testFetchJobStatusNoContention = do
-- myEnv <- newTestEnv
-- evts1 <- newMVar []
-- evts2 <- newMVar []
-- let job1 = \() -> withJob_ myEnv $ \hdl _input -> do
-- markStarted 100 hdl
-- mb_status <- getLatestJobStatus hdl
-- liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
-- pure ()
-- let job2 = \() -> withJob_ myEnv $ \hdl _input -> do
-- markStarted 50 hdl
-- mb_status <- getLatestJobStatus hdl
-- liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
-- pure ()
-- Async.forConcurrently_ [job1, job2] ($ ())
-- -- threadDelay 500_000
-- -- Check the events
-- waitUntil (do
-- evts1' <- readMVar evts1
-- evts2' <- readMVar evts2
-- pure $ (map _scst_remaining evts1' == [Just 100]) &&
-- (map _scst_remaining evts2' == [Just 50])
-- ) 500
-- testMarkProgress :: IO ()
-- testMarkProgress = do
-- myEnv <- newTestEnv
-- -- evts <- newTBQueueIO 7
-- evts <- newTVarIO []
-- let expectedEvents = 7
-- let getStatus hdl = do
-- liftIO $ threadDelay 100_000
-- st <- getLatestJobStatus hdl
-- -- liftIO $ atomically $ writeTBQueue evts st
-- liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st])
-- readAllEvents = do
-- -- We will get thread blocking if there is ANY error in the job
-- -- Hence we assert the `readAllEvents` test doesn't take too long
-- mRet <- timeout 5_000_000 $ atomically $ do
-- -- allEventsArrived <- isFullTBQueue evts
-- evts' <- readTVar evts
-- -- STM retry if things failed
-- -- check allEventsArrived
-- check (length evts' == expectedEvents)
-- -- flushTBQueue evts
-- pure evts'
-- case mRet of
-- Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events."
-- Just xs
-- | length xs == expectedEvents
-- -> pure xs
-- | otherwise
-- -> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs
-- withJob_ myEnv $ \hdl _input -> do
-- markStarted 10 hdl
-- getStatus hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailureNoErr 1 hdl
-- getStatus hdl
-- markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
-- getStatus hdl
-- markComplete hdl
-- getStatus hdl
-- markStarted 5 hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
-- getStatus hdl
-- evts' <- readAllEvents
-- -- This pattern match should never fail, because the precondition is
-- -- checked in 'readAllEvents'.
-- let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts'
-- -- Check the events are what we expect
-- jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 10
-- , _scst_events = Just []
-- }
-- jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 9
-- , _scst_events = Just []
-- }
-- jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 1
-- , _scst_remaining = Just 8
-- , _scst_events = Just []
-- }
-- jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 7
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 4
-- , _scst_events = Just []
-- }
-- jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 4
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "kaboom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
{-|
Module : Test.Utils.Jobs.Types
Description :
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Test.Utils.Jobs.Types
( TestJobEnv(..)
, initTestJobEnv
, initTestWorkerState
)
where
import Async.Worker.Broker.Types (toA, getMessage)
import Async.Worker.Types qualified as WT
import Control.Concurrent.STM
import Data.Text qualified as T
import Gargantext.Core.Config (hasConfig)
import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Worker (performAction)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env (WorkerEnv(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, BrokerMessage, WState)
import Gargantext.Prelude
data TestJobEnv =
TestJobEnv { inProgress :: Maybe Job
, done :: [Job]
, failed :: [(Job, SomeException)]
, killed :: [Job]
, timedOut :: [Job]
}
initTestJobEnv :: TestJobEnv
initTestJobEnv =
TestJobEnv { inProgress = Nothing
, done = []
, failed = []
, killed = []
, timedOut = [] }
-- | Test worker state. Normally, the message notifications go through
-- the dispatcher system. Here we make a short-cut and just use a
-- TVar to store the processes worker jobs.
-- Job progress, however, is sent via the notifications mechanism,
-- because the worker itself doesn't implement it.
initTestWorkerState :: HasWorkerBroker
=> TVar TestJobEnv
-> WorkerEnv
-> WorkerDefinition
-> IO WState
initTestWorkerState jobTVar env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
pure $ WT.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Just $ onJobStarted jobTVar env
, onJobFinish = Just $ onJobFinished jobTVar env
, onJobTimeout = Just $ onJobTimeout jobTVar env
, onJobError = Just $ onJobError jobTVar env
, onWorkerKilledSafely = Just $ onWorkerKilled jobTVar env }
onJobStarted :: HasWorkerBroker
=> TVar TestJobEnv
-> WorkerEnv
-> WState
-> BrokerMessage
-> IO ()
onJobStarted jobTVar _env _state bm = do
let j = toA $ getMessage bm
let job = WT.job j
atomically $ modifyTVar jobTVar $ \testJobEnv -> do
testJobEnv { inProgress = Just job }
onJobFinished :: HasWorkerBroker
=> TVar TestJobEnv
-> WorkerEnv
-> WState
-> BrokerMessage
-> IO ()
onJobFinished jobTVar _env _state bm = do
let j = toA $ getMessage bm
let job = WT.job j
atomically $ modifyTVar jobTVar $ \testJobEnv -> do
testJobEnv { inProgress = Nothing
, done = done testJobEnv ++ [job] }
onJobTimeout :: HasWorkerBroker
=> TVar TestJobEnv
-> WorkerEnv
-> WState
-> BrokerMessage
-> IO ()
onJobTimeout jobTVar _env _state bm = do
let j = toA $ getMessage bm
let job = WT.job j
atomically $ modifyTVar jobTVar $ \testJobEnv -> do
testJobEnv { inProgress = Nothing
, timedOut = timedOut testJobEnv ++ [job] }
onJobError :: (HasWorkerBroker, HasCallStack)
=> TVar TestJobEnv
-> WorkerEnv
-> WState
-> BrokerMessage
-> SomeException
-> IO ()
onJobError jobTVar _env _state bm exc = do
let j = toA $ getMessage bm
let job = WT.job j
atomically $ modifyTVar jobTVar $ \testJobEnv -> do
testJobEnv { inProgress = Nothing
, failed = failed testJobEnv ++ [(job, exc)] }
onWorkerKilled :: (HasWorkerBroker, HasCallStack)
=> TVar TestJobEnv
-> WorkerEnv
-> WState
-> Maybe BrokerMessage
-> IO ()
onWorkerKilled _jobTVar _env _state Nothing = pure ()
onWorkerKilled jobTVar _env _state (Just bm) = do
let j = toA $ getMessage bm
let job = WT.job j
atomically $ modifyTVar jobTVar $ \testJobEnv -> do
testJobEnv { inProgress = Nothing
, killed = killed testJobEnv ++ [job] }
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.Utils.Notifications where
import Control.Exception.Safe qualified as Exc
import Control.Monad (void)
import Data.ByteString qualified as BS
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Network.WebSockets qualified as WS
import Prelude
import Test.Instances ()
instance Eq DT.Notification where
-- simple
(==) n1 n2 = show n1 == show n2
millisecond :: Int
millisecond = 1000
withWSConnection :: (String, Int) -> WS.ClientApp () -> IO ()
withWSConnection (host, port) = withWSConnection' (host, port, "/ws")
-- | Wrap the logic of asynchronous connection closing
-- https://hackage.haskell.org/package/websockets-0.13.0.0/docs/Network-WebSockets-Connection.html#v:sendClose
withWSConnection' :: (String, Int, String) -> WS.ClientApp () -> IO ()
withWSConnection' (host, port, path) cb =
WS.runClient host port path $ \conn -> do
cb conn
-- shut down gracefully, otherwise a 'ConnectionException' is thrown
WS.sendClose conn ("" :: BS.ByteString)
-- wait for close response, should throw a 'CloseRequest' exception
Exc.catches (void $ WS.receiveDataMessage conn)
[ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ -> putStrLn "[withWSConnection] closeRequest caught"
_ -> Exc.throw err
-- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
......@@ -59,7 +59,6 @@ main = do
, Phylo.tests
, testGroup "Stemming" [ Lancaster.tests ]
, Worker.tests
, Jobs.qcTests
, asyncUpdatesSpec
, Notifications.qcTests
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment