[workers] more progress towards the end goal, this is still WIP

parent eba70196
Pipeline #6911 failed with stages
in 19 minutes and 30 seconds
......@@ -196,7 +196,7 @@ source-repository-package
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: ec2c3f345049f7cd0b8f4e39edf11c7e437d0cf6
tag: 307f6760383b74cddd5a586d0b5b1f1a2fc94429
source-repository-package
type: git
......
......@@ -27,7 +27,7 @@ import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs qualified as Jobs
-- import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
......@@ -67,16 +67,27 @@ addCorpusWithQuery user = Named.AddWithQuery $ \cId ->
, Jobs._acq_user = user
, Jobs._acq_cid = cId }
-- addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
-- addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
-- serveJobsAPI AddCorpusFormJob $ \_jHandle i -> do
-- -- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- -- called in a few places, and the job status might be different between invocations.
-- -- markStarted 3 jHandle
-- -- New.addToCorpusWithForm user cid i jHandle
-- void $ Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
-- , Jobs._acf_user = user
-- , Jobs._acf_cid = cid }
addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
serveJobsAPI AddCorpusFormJob $ \_jHandle i -> do
addCorpusWithForm user = Named.AddWithForm $ \cId ->
serveWorkerAPI $ \p ->
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
void $ Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
, Jobs._acf_user = user
, Jobs._acf_cid = cid }
Jobs.AddCorpusFormAsync { Jobs._acf_args = p
, Jobs._acf_user = user
, Jobs._acf_cid = cId }
--addCorpusWithFile :: User -> ServerT Named.AddWithFile (GargM Env BackendInternalError)
......
......@@ -97,7 +97,7 @@ data ForgotPasswordAPI mode = ForgotPasswordAPI
data ForgotPasswordAsyncAPI mode = ForgotPasswordAsyncAPI
{ forgotPasswordAsyncEp :: mode :- Summary "Forgot password asnc"
:> NamedRoutes (WorkerAPI ForgotPasswordAsyncParams)
:> NamedRoutes (WorkerAPI '[JSON] ForgotPasswordAsyncParams)
} deriving Generic
......
......@@ -9,7 +9,7 @@ module Gargantext.API.Routes.Named.Corpus (
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
-- import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.Corpus.Export.Types
import Gargantext.API.Node.Types
import Gargantext.API.Worker (WorkerAPI)
......@@ -33,7 +33,8 @@ newtype AddWithForm mode = AddWithForm
:> "add"
:> "form"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog)
-- :> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
} deriving Generic
newtype AddWithQuery mode = AddWithQuery
......@@ -42,5 +43,5 @@ newtype AddWithQuery mode = AddWithQuery
:> Capture "corpus_id" CorpusId
:> "query"
-- :> NamedRoutes (AsyncJobs JobLog '[JSON] WithQuery JobLog)
:> NamedRoutes (WorkerAPI WithQuery)
:> NamedRoutes (WorkerAPI '[JSON] WithQuery)
} deriving Generic
......@@ -15,35 +15,26 @@ module Gargantext.API.Worker where
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
import Servant.API ((:>), (:-), JSON, Post, ReqBody)
import Servant.Server.Generic (AsServerT)
data WorkerAPI input mode = WorkerAPI
{ workerAPIPost :: mode :- ReqBody '[JSON] input
data WorkerAPI contentType input mode = WorkerAPI
{ workerAPIPost :: mode :- ReqBody contentType input
:> Post '[JSON] JobInfo }
deriving Generic
-- serveWorkerAPI :: ( HasWorkerBroker PGMQBroker Job
-- , m ~ GargM Env BackendInternalError )
-- => (input -> Job)
-- -> input
-- -> WorkerJob (AsServerT m)
-- -- -> ServerT (Post '[JSON] JobInfo) m
-- -- -> Cmd' env err JobInfo
-- serveWorkerAPI f i = do
-- mId <- sendJob $ f i
-- pure $ JobInfo { _ji_message_id = mId }
serveWorkerAPI :: IsGargServer env err m
=> (input -> Job)
-> WorkerAPI input (AsServerT m)
=> (input -> Job)
-> WorkerAPI contentType input (AsServerT m)
serveWorkerAPI f = WorkerAPI { workerAPIPost }
where
workerAPIPost i = do
mId <- sendJob $ f i
pure $ JobInfo { _ji_message_id = mId }
let job = f i
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
......@@ -77,8 +77,8 @@ gServer (NotificationsConfig { .. }) = do
-- send the same message that we received
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
Just (UpdateTreeFirstLevel node_id) -> do
logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
Just (UpdateTreeFirstLevel _node_id) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
......@@ -96,11 +96,8 @@ gServer (NotificationsConfig { .. }) = do
-- send the same message that we received
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
Just (UpdateWorkerProgress ji jl) -> do
logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
Just (WorkerJobStarted nodeId ji) -> do
logMsg ioLogger DEBUG $ "[central_exchange] worker job started: " <> show nodeId <> ", " <> show ji
Just (UpdateWorkerProgress _ji _jl) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
void $ timeout 100_000 $ send s_dispatcher r
Nothing ->
logMsg ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
......
......@@ -47,13 +47,11 @@ data CEMessage =
| UpdateWorkerProgress JobInfo JobLog
-- | Update tree for given nodeId
| UpdateTreeFirstLevel NodeId
| WorkerJobStarted NodeId JobInfo
instance Prelude.Show CEMessage where
show (UpdateJobProgress js) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
show (WorkerJobStarted nodeId ji) = "WorkerJobStarted " <> show nodeId <> " " <> show ji
instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type"
......@@ -70,10 +68,6 @@ instance FromJSON CEMessage where
"update_tree_first_level" -> do
node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id
"worker_job_started" -> do
nodeId <- o .: "node_id"
ji <- o .: "ji"
pure $ WorkerJobStarted nodeId ji
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where
toJSON (UpdateJobProgress js) = object [
......@@ -91,11 +85,6 @@ instance ToJSON CEMessage where
"type" .= toJSON ("update_tree_first_level" :: Text)
, "node_id" .= toJSON nodeId
]
toJSON (WorkerJobStarted nodeId ji) = object [
"type" .= toJSON ("worker_job_started" :: Text)
, "node_id" .= toJSON nodeId
, "ji" .= toJSON ji
]
class HasCentralExchangeNotification env where
......
......@@ -34,6 +34,7 @@ import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recv, withSocket)
......@@ -113,8 +114,9 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions =
-- putText $ "[" <> show tId <> "] received a message: " <> decodeUtf8 r
case Aeson.decode (BSL.fromStrict r) of
Nothing -> withLogger () $ \ioL ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Nothing ->
withLogger () $ \ioL ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do
-- putText $ "[dispatcher_listener] received message: " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions
......@@ -144,38 +146,43 @@ sendNotification throttleTChan ceMessage sub = do
-- message to a client
let topic = s_topic sub
let mNotification =
-- | OK so given a websocket subscription and a central
-- exchange message - decide whether to send this message via
-- that socket or not
case (topic, ceMessage) of
(UpdateJobProgress jId, CETypes.UpdateJobProgress jobStatus) -> do
if jId == jobStatus ^. job_id
then Just $ NUpdateJobProgress jId (MJobStatus jobStatus)
then Just $ NUpdateJobProgress jId jobStatus -- (MJobStatus jobStatus)
else Nothing
-- (UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' nodeId jobLog) -> do
(UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' jobLog) -> do
if jobInfo == jobInfo'
-- then Just $ NUpdateWorkerProgress jobInfo nodeId (MJobLog jobLog)
then Just $ NUpdateWorkerProgress jobInfo (MJobLog jobLog)
then Just $ NUpdateWorkerProgress jobInfo jobLog -- (MJobLog jobLog)
else Nothing
(UpdateTree nodeId, CETypes.UpdateWorkerProgress jobInfo jobLog) -> do
if Just nodeId == _ji_mNode_id jobInfo
then Just $ NUpdateWorkerProgress jobInfo jobLog -- (MJobLog jobLog)
else Nothing
(UpdateTree nodeId, CETypes.UpdateTreeFirstLevel nodeId') ->
if nodeId == nodeId'
then Just $ NUpdateTree nodeId
else Nothing
(UpdateTree nodeId, CETypes.WorkerJobStarted nodeId' ji) ->
if nodeId == nodeId'
then Just $ NWorkerJobStarted nodeId ji
else Nothing
_ -> Nothing
case mNotification of
Nothing -> pure ()
Just notification -> do
let id' = (wsKey ws, topic)
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[sendNotification] dispatching notification: " <> show notification
atomically $ do
TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
-- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here
sendDataMessageThrottled :: (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled (conn, msg) =
sendDataMessageThrottled (conn, msg) = do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
WS.sendDataMessage conn msg
......@@ -195,8 +202,6 @@ ceMessageSubPred (CETypes.UpdateJobProgress js) (Subscription { s_topic }) =
-- ceMessageSubPred (CETypes.UpdateWorkerProgress ji nodeId _jl) (Subscription { s_topic }) =
ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }) =
s_topic == UpdateWorkerProgress ji
-- || s_topic == UpdateTree nodeId
|| Just s_topic == (UpdateTree <$> _ji_mNode_id ji)
ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId
ceMessageSubPred (CETypes.WorkerJobStarted nodeId _ji) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId
......@@ -101,32 +101,32 @@ instance ToJSON Topic where
]
-- | A job status message
newtype MJobStatus = MJobStatus (JobStatus 'Safe JobLog)
instance Prelude.Show MJobStatus where
show (MJobStatus js) = "MJobStatus " <> show (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
instance ToJSON MJobStatus where
toJSON (MJobStatus js) = Aeson.object [
"type" .= toJSON ("MJobLog" :: Text)
, "job_status" .= toJSON js
]
instance FromJSON MJobStatus where
parseJSON = Aeson.withObject "MJobStatus" $ \o -> do
js <- o .: "job_status"
pure $ MJobStatus js
-- newtype MJobStatus = MJobStatus (JobStatus 'Safe JobLog)
-- instance Prelude.Show MJobStatus where
-- show (MJobStatus js) = "MJobStatus " <> show (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
-- instance ToJSON MJobStatus where
-- toJSON (MJobStatus js) = Aeson.object [
-- "type" .= toJSON ("MJobLog" :: Text)
-- , "job_status" .= toJSON js
-- ]
-- instance FromJSON MJobStatus where
-- parseJSON = Aeson.withObject "MJobStatus" $ \o -> do
-- js <- o .: "job_status"
-- pure $ MJobStatus js
-- | A job progress message
newtype MJobLog = MJobLog JobLog
instance Prelude.Show MJobLog where
show (MJobLog jl) = "MJobLog " <> show jl
instance ToJSON MJobLog where
toJSON (MJobLog jl) = Aeson.object [
"type" .= toJSON ("MJobLog" :: Text)
, "job_log" .= toJSON jl
]
instance FromJSON MJobLog where
parseJSON = Aeson.withObject "MJobLog" $ \o -> do
jl <- o .: "job_log"
pure $ MJobLog jl
-- newtype MJobLog = MJobLog JobLog
-- instance Prelude.Show MJobLog where
-- show (MJobLog jl) = "MJobLog " <> show jl
-- instance ToJSON MJobLog where
-- toJSON (MJobLog jl) = Aeson.object [
-- "type" .= toJSON ("MJobLog" :: Text)
-- , "job_log" .= toJSON jl
-- ]
-- instance FromJSON MJobLog where
-- parseJSON = Aeson.withObject "MJobLog" $ \o -> do
-- jl <- o .: "job_log"
-- pure $ MJobLog jl
......@@ -216,17 +216,20 @@ class HasDispatcher env dispatcher where
-- | A notification is sent to clients who subscribed to specific topics
data Notification =
NUpdateJobProgress (JobID 'Safe) MJobStatus
-- NUpdateJobProgress (JobID 'Safe) MJobStatus
NUpdateJobProgress (JobID 'Safe) (JobStatus 'Safe JobLog)
-- | NUpdateWorkerProgress JobInfo NodeId MJobLog
| NUpdateWorkerProgress JobInfo MJobLog
| NUpdateWorkerProgress JobInfo JobLog
| NUpdateTree NodeId
| NWorkerJobStarted NodeId JobInfo
| NWorkerJobFinished NodeId JobInfo
instance Prelude.Show Notification where
show (NUpdateJobProgress jId mjs) = "NUpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) <> ", " <> show mjs
show (NUpdateJobProgress jId mjs) = "NUpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) -- <> ", " <> show mjs
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
show (NUpdateWorkerProgress jobInfo mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show mJobLog
show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId
show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji
show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji
instance ToJSON Notification where
toJSON (NUpdateJobProgress jId mjs) = Aeson.object [
"type" .= ("update_job_progress" :: Text)
......@@ -249,6 +252,11 @@ instance ToJSON Notification where
, "node_id" .= toJSON nodeId
, "ji" .= toJSON ji
]
toJSON (NWorkerJobFinished nodeId ji) = Aeson.object [
"type" .= ("worker_job_finished" :: Text)
, "node_id" .= toJSON nodeId
, "ji" .= toJSON ji
]
-- We don't need to decode notifications, this is for tests only
instance FromJSON Notification where
parseJSON = Aeson.withObject "Notification" $ \o -> do
......@@ -271,4 +279,8 @@ instance FromJSON Notification where
nodeId <- o .: "node_id"
ji <- o .: "ji"
pure $ NWorkerJobStarted nodeId ji
"worker_job_finished" -> do
nodeId <- o .: "node_id"
ji <- o .: "ji"
pure $ NWorkerJobFinished nodeId ji
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
......@@ -27,19 +27,18 @@ import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Types (WithQuery(..))
import Gargantext.Core.Config (hasConfig, gc_jobs)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CE
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Admin.Types.Node (NodeId(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude
import Gargantext.System.Logging ( logLocM, LogLevel(..) )
import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, markFailed))
......@@ -55,25 +54,73 @@ initWorkerState env (WorkerDefinition { .. }) = do
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Just $ markJobStarted env
, onJobFinish = Nothing
, onJobTimeout = Just $ \_s bm -> putStrLn ("on job timeout: " <> show (toA $ getMessage bm) :: Text)
, onJobError = Nothing
, onMessageReceived = Just $ notifyJobStarted env
, onJobFinish = Just $ notifyJobFinished env
, onJobTimeout = Just $ notifyJobTimeout env
, onJobError = Just $ notifyJobFailed env
-- TODO Implement Ctrl-C, notify job killed
, onWorkerKilledSafely = Nothing }
markJobStarted :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
markJobStarted env (Worker.State { name }) bm = do
notifyJobStarted :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
notifyJobStarted env (Worker.State { name }) bm = do
let j = toA $ getMessage bm
putStrLn $ "[" <> name <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm }
case Worker.job j of
AddCorpusWithQuery { _acq_args = WithQuery { _wq_node_id } } -> do
runWorkerMonad env $ CE.ce_notify $ CE.WorkerJobStarted (UnsafeMkNodeId _wq_node_id) ji
_ -> pure ()
let job = Worker.job j
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobStarted] [" <> name <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markStarted 1 jh
notifyJobFinished :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
notifyJobFinished env (Worker.State { name }) bm = do
let j = toA $ getMessage bm
let job = Worker.job j
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobFinished] [" <> name <> "] finished job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markComplete jh
notifyJobTimeout :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
notifyJobTimeout env (Worker.State { name }) bm = do
let j = toA $ getMessage bm
let job = Worker.job j
withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobTimeout] [" <> name <> "] job timed out: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job timed out!") jh
notifyJobFailed :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> SomeException
-> IO ()
notifyJobFailed env (Worker.State { name }) bm exc = do
let j = toA $ getMessage bm
let job = Worker.job j
withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobFailed] [" <> name <> "] failed job: " <> show j <> " --- ERROR: " <> show exc
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job failed") jh
-- | Spawn a worker with PGMQ broker
......@@ -111,9 +158,11 @@ performAction :: (HasWorkerBroker PGMQBroker Job)
-> IO ()
performAction env _state bm = do
let job' = toA $ getMessage bm
let ji = JobInfo { _ji_message_id = messageId bm }
let job = Worker.job job'
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
case Worker.job job' of
case job of
Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping"
AddCorpusFormAsync { .. } -> runWorkerMonad env $ do
......
......@@ -27,7 +27,7 @@ import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.EnvTypes (GargJob, Mode(Dev), modeToLoggingLevels)
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
-- import Gargantext.API.Admin.Settings ( newPool )
import Gargantext.API.Job (RemainingSteps(..), jobLogStart)
import Gargantext.API.Job (RemainingSteps(..), jobLogStart, jobLogProgress, jobLogFailures, jobLogComplete, addErrorEvent, jobLogFailTotal, jobLogFailTotalWithMessage, jobLogAddMore)
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
......@@ -138,7 +138,7 @@ instance CET.HasCentralExchangeNotification WorkerEnv where
c <- asks (view $ to _w_env_config)
liftBase $ do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[ce_notify] informing about job start: " <> show (_gc_notifications_config c) <> " :: " <> show m
logMsg ioL DEBUG $ "[ce_notify]: " <> show (_gc_notifications_config c) <> " :: " <> show m
CE.notify (_gc_notifications_config c) m
---------
......@@ -230,15 +230,21 @@ instance MonadJobStatus WorkerMonad where
-- noJobHandle _ = WorkerNoJobHandle
-- noJobHandle _ = noJobHandle (Proxy :: Proxy (GargM WorkerEnv IOException)) -- ConcreteNullHandle
noJobHandle _ = noJobHandle (Proxy :: Proxy WorkerMonad)
noJobHandle Proxy = WorkerNoJobHandle
getLatestJobStatus _ = WorkerMonad (pure noJobLog)
withTracer _ jh n = n jh
markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markProgress _ _ = WorkerMonad $ pure ()
markFailure _ _ _ = WorkerMonad $ pure ()
markComplete _ = WorkerMonad $ pure ()
markFailed _ _ = WorkerMonad $ pure ()
addMoreSteps _ _ = WorkerMonad $ pure ()
markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailures steps latest
Just msg -> addErrorEvent msg (jobLogFailures steps latest))
markComplete jh = updateJobProgress jh jobLogComplete
markFailed mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailTotal latest
Just msg -> jobLogFailTotalWithMessage msg latest)
addMoreSteps steps jh = updateJobProgress jh (jobLogAddMore steps)
updateJobProgress :: WorkerJobHandle -> (JobLog -> JobLog) -> WorkerMonad ()
......
......@@ -42,12 +42,19 @@ sendJob job = do
Just wd -> liftBase $ do
b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd
Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
Worker.sendJob' $ Worker.mkDefaultSendJob b queueName job (jobTimeout job)
-- | Some predefined job timeouts (in seconds)
jobTimeout :: Job -> Int
jobTimeout (AddCorpusFormAsync {}) = 300
jobTimeout (AddCorpusWithQuery {}) = 3000
jobTimeout _ = 10
-- | This is just a list of what's implemented and what not.
-- After we migrate to async workers, this should be removed
-- (see G.C.Worker -> performAction on what's implemented already)
handledJobs :: [ EnvTypes.GargJob ]
handledJobs = [ EnvTypes.AddCorpusQueryJob
handledJobs = [ EnvTypes.AddCorpusFormJob
, EnvTypes.AddCorpusQueryJob
, EnvTypes.ForgotPasswordJob ]
......@@ -18,9 +18,9 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams)
import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types (NewWithForm, WithQuery)
import Gargantext.API.Node.Types (NewWithForm, WithQuery(..))
import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId)
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId(UnsafeMkNodeId))
import Gargantext.Prelude
......@@ -66,25 +66,44 @@ instance FromJSON Job where
return $ GargJob { _gj_garg_job }
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ]
toJSON Ping = object [ "type" .= ("Ping" :: Text) ]
toJSON (AddCorpusFormAsync { .. }) =
object [ ("type" .= ("AddCorpusFormAsync" :: Text))
, ("args" .= _acf_args)
, ("user" .= _acf_user)
, ("cid" .= _acf_cid) ]
object [ "type" .= ("AddCorpusFormAsync" :: Text)
, "args" .= _acf_args
, "user" .= _acf_user
, "cid" .= _acf_cid ]
toJSON (AddCorpusWithQuery { .. }) =
object [ ("type" .= ("AddCorpusWithQuery" :: Text))
, ("args" .= _acq_args)
, ("user" .= _acq_user)
, ("cid" .= _acq_cid) ]
object [ "type" .= ("AddCorpusWithQuery" :: Text)
, "args" .= _acq_args
, "user" .= _acq_user
, "cid" .= _acq_cid ]
toJSON (ForgotPasswordAsync { .. }) =
object [ ("type" .= ("ForgotPasswordAsync" :: Text))
, ("args" .= _fpa_args) ]
object [ "type" .= ("ForgotPasswordAsync" :: Text)
, "args" .= _fpa_args ]
toJSON (NewNodeAsync { .. }) =
object [ ("type" .= ("NewNodeAsync" :: Text))
, ("node_id" .= _nna_node_id)
, ("authenticated_user" .= _nna_authenticatedUser)
, ("post_node" .= _nna_postNode) ]
object [ "type" .= ("NewNodeAsync" :: Text)
, "node_id" .= _nna_node_id
, "authenticated_user" .= _nna_authenticatedUser
, "post_node" .= _nna_postNode ]
toJSON (GargJob { .. }) =
object [ ("type" .= ("GargJob" :: Text))
, ("garg_job" .= _gj_garg_job) ]
object [ "type" .= ("GargJob" :: Text)
, "garg_job" .= _gj_garg_job ]
-- | We want to have a way to specify 'Maybe NodeId' from given worker
-- parameters. The given 'Maybe CorpusId' is an alternative, when
-- params don't have node id access.
-- class HasWorkerNodeId input where
-- getMNodeId :: job -> Maybe CorpusId -> Maybe NodeId
getWorkerMNodeId :: Job -> Maybe NodeId
getWorkerMNodeId Ping = Nothing
getWorkerMNodeId (AddCorpusFormAsync { _acf_args, _acf_cid }) = Just _acf_cid
getWorkerMNodeId (AddCorpusWithQuery { _acq_args = WithQuery { _wq_node_id }}) = Just $ UnsafeMkNodeId _wq_node_id
getWorkerMNodeId (NewNodeAsync { _nna_node_id }) = Just _nna_node_id
getWorkerMNodeId (ForgotPasswordAsync {}) = Nothing
getWorkerMNodeId (GargJob {}) = Nothing
......@@ -15,11 +15,15 @@ import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types qualified as BT
import Data.Aeson ((.=), (.:), object, withObject)
import Data.Swagger (NamedSchema(..), ToSchema(..)) -- , genericDeclareNamedSchema)
import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude
data JobInfo = JobInfo { _ji_message_id :: !(BT.MessageId PGMQBroker) }
data JobInfo = JobInfo { _ji_message_id :: BT.MessageId PGMQBroker
-- NOTE: Most jobs are associated with node id.
-- The 'node_id' allows the frontend to assign progress bar to a node.
, _ji_mNode_id :: Maybe NodeId }
deriving (Show, Eq, Ord, Generic)
instance ToSchema JobInfo where -- TODO
--declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_ji_")
......@@ -28,7 +32,9 @@ instance ToSchema JobInfo where -- TODO
instance FromJSON JobInfo where
parseJSON = withObject "JobInfo" $ \o -> do
_ji_message_id <- o .: "message_id"
_ji_mNode_id <- o .: "node_id"
pure $ JobInfo { .. }
instance ToJSON JobInfo where
toJSON (JobInfo { .. }) = object [ ("message_id" .= _ji_message_id )]
toJSON (JobInfo { .. }) = object [ "message_id" .= _ji_message_id
, "node_id" .= _ji_mNode_id ]
......@@ -21,7 +21,7 @@ import Gargantext.Database.Query.Table.Node ( getNodeWithType, getNodesIdWithTyp
import Gargantext.Database.Query.Table.Node.Error ( HasNodeError )
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.System.Logging (withLogger, logMsg, LogLevel(..))
-- import Gargantext.System.Logging (withLogger, logMsg, LogLevel(..))
import Opaleye
-- import Debug.Trace (trace)
......@@ -29,17 +29,14 @@ import Opaleye
updateHyperdata :: HyperdataC a => NodeId -> a -> DBCmd err Int64
updateHyperdata i h = do
mkCmd $ \c -> do
res <- withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[updateHyperdata] before runUpdate_"
liftBase $ putText "[updateHyperdata] before runUpdate_"
-- res <- withLogger () $ \ioLogger -> do
-- logMsg ioLogger DEBUG "[updateHyperdata] before runUpdate_"
res <- runUpdate_ c $ updateHyperdataQuery i h
logMsg ioLogger DEBUG $ "[updateHyperdata] after runUpdate_: " <> show res
liftBase putText $ "[updateHyperdata] after runUpdate_: " <> show res
-- logMsg ioLogger DEBUG $ "[updateHyperdata] after runUpdate_: " <> show res
pure res
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[updateHyperdata] after mkCmd"
liftBase putText $ "[updateHyperdata] after mkCmd"
pure res
-- withLogger () $ \ioLogger -> do
-- logMsg ioLogger DEBUG $ "[updateHyperdata] after mkCmd"
-- pure res
updateHyperdataQuery :: HyperdataC a => NodeId -> a -> Update Int64
updateHyperdataQuery i h = seq h' $ {- trace "updateHyperdataQuery: encoded JSON" $ -} Update
......
{-# LANGUAGE LambdaCase #-}
{-|
Module : Gargantext.Utils.Jobs.Error
Description : Error utilities
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.Utils.Jobs.Error
( ToHumanFriendlyError(..)
......
......@@ -39,6 +39,11 @@ import Test.Hspec
import Test.Instances ()
instance Eq DT.Notification where
-- simple
(==) n1 n2 = show n1 == show n2
tests :: NotificationsConfig -> D.Dispatcher -> Spec
tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do
describe "Dispatcher, Central Exchange, WebSockets" $ do
......@@ -61,15 +66,13 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc
withAsync wsConnect $ \_a -> do
-- wait a bit to connect
threadDelay (500 * millisecond)
CE.notify nc $ CET.UpdateTreeFirstLevel 0
let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
md <- atomically $ readTChan tchan
md `shouldSatisfy` isJust
let (Just (DT.Notification topic' message')) = md
topic' `shouldBe` topic
message' `shouldBe` DT.MEmpty
md `shouldBe` Just (DT.NUpdateTree nodeId)
millisecond :: Int
......
......@@ -41,6 +41,7 @@ import Gargantext.API.Routes.Named
import Gargantext.API.Routes.Named.Corpus
import Gargantext.API.Routes.Named.Node
import Gargantext.API.Routes.Named.Private
import Gargantext.API.Worker (workerAPIPost)
import Gargantext.Core qualified as Lang
import Gargantext.Core.Text.Corpus.Query (RawQuery(..))
import Gargantext.Core.Text.List.Social
......@@ -49,6 +50,7 @@ import Gargantext.Core.Types ( CorpusId, ListId, NodeId, _NodeId)
import Gargantext.Core.Types.Individu
import Gargantext.Core.Types.Main (ListType(..))
import Gargantext.Core.Types (TableResult(..))
import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Database.Action.User
import Gargantext.Database.Admin.Types.Hyperdata.Corpus
import Gargantext.Database.Admin.Types.Hyperdata.Folder (defaultHyperdataFolderPrivate)
......@@ -351,10 +353,13 @@ createDocsList testDataPath testEnv port clientEnv token = do
-- Import the docsList with only two documents, both containing a \"fortran\" term.
simpleDocs <- liftIO (TIO.readFile =<< getDataFileName testDataPath)
let newWithForm = mkNewWithForm simpleDocs (T.pack $ takeBaseName testDataPath)
(j :: JobPollHandle) <- checkEither $ fmap toJobPollHandle <$> liftIO (runClientM (add_file_async token corpusId newWithForm) clientEnv)
let mkPollUrl jh = "/corpus/" <> fromString (show $ _NodeId corpusId) <> "/add/form/async/" +|_jph_id jh|+ "/poll?limit=1"
j' <- pollUntilFinished token port mkPollUrl j
liftIO (_jph_status j' `shouldBe` "IsFinished")
-- (j :: JobPollHandle) <- checkEither $ fmap toJobPollHandle <$> liftIO (runClientM (add_file_async token corpusId newWithForm) clientEnv)
ji <- checkEither $ liftIO $ runClientM (add_file_async token corpusId newWithForm) clientEnv
-- let mkPollUrl jh = "/corpus/" <> fromString (show $ _NodeId corpusId) <> "/add/form/async/" +|_jph_id jh|+ "/poll?limit=1"
-- j' <- pollUntilFinished token port mkPollUrl j
-- liftIO (_jph_status j' `shouldBe` "IsFinished")
j' <- pollUntilWorkFinished token port ji
liftIO $ j' `shouldSatisfy` isRight
pure corpusId
createFortranDocsList :: TestEnv -> Int -> ClientEnv -> Token -> WaiSession () CorpusId
......@@ -388,7 +393,7 @@ mkNewWithForm content name = NewWithForm
add_file_async :: Token
-> CorpusId
-> NewWithForm
-> ClientM (JobStatus 'Safe JobLog)
-> ClientM JobInfo
add_file_async (toServantToken -> token) corpusId nwf =
clientRoutes & apiWithCustomErrorScheme
& ($ GES_new)
......@@ -402,8 +407,8 @@ add_file_async (toServantToken -> token) corpusId nwf =
& addWithFormAPI
& addWithFormEp
& ($ corpusId)
& asyncJobsAPI'
& (\(_ :<|> submitForm :<|> _) -> submitForm (JobInput nwf Nothing))
& workerAPIPost
& (\submitForm -> submitForm nwf)
-- | Utility to trash a document by performing a raw query towards GQL. Not very type safe,
......
......@@ -5,26 +5,27 @@ module Test.Utils where
import Control.Exception.Safe ()
import Control.Monad ()
import Data.Aeson.KeyMap qualified as KM
import Data.Aeson qualified as JSON
import Data.Aeson.KeyMap qualified as KM
import Data.ByteString.Char8 qualified as B
import Data.ByteString.Lazy qualified as L
import Data.Map.Strict qualified as Map
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Data.Text.Lazy.Encoding qualified as TLE
import Data.Text.Lazy qualified as TL
import Data.Text qualified as T
import Data.Text.Lazy.Encoding qualified as TLE
import Data.TreeDiff
import Fmt (Builder)
import Gargantext.API.Admin.Auth.Types (AuthRequest(..), Token, authRes_token)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Routes.Types (xGargErrorScheme)
import Gargantext.Core.Types.Individu (Username, GargPassword)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
import Network.HTTP.Client (defaultManagerSettings, newManager)
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Types.Header (hAccept, hAuthorization, hContentType)
import Network.HTTP.Types (Header, Method, status200)
import Network.HTTP.Types.Header (hAccept, hAuthorization, hContentType)
import Network.Wai.Handler.Warp (Port)
import Network.Wai.Test (SResponse(..))
import Prelude qualified
......@@ -35,8 +36,8 @@ import System.Environment (lookupEnv)
import System.Timeout qualified as Timeout
import Test.API.Routes (auth_api, mkUrl)
import Test.Hspec.Expectations
import Test.Hspec.Wai.JSON (FromValue(..))
import Test.Hspec.Wai (MatchBody(..), WaiExpectation, WaiSession, request)
import Test.Hspec.Wai.JSON (FromValue(..))
import Test.Hspec.Wai.Matcher (MatchHeader(..), ResponseMatcher(..), bodyEquals, formatHeader, match)
import Test.Tasty.HUnit (Assertion, assertBool)
import Test.Types
......@@ -236,6 +237,35 @@ pollUntilFinished tkn port mkUrlPiece = go 60
Nothing -> False
Just errs -> errs > 1
pollUntilWorkFinished :: HasCallStack
=> Token
-> Port
-> JobInfo
-> WaiSession () JobInfo
pollUntilWorkFinished tkn port = go 60
-- TODO Poll dispatcher for markJobFinished
where
go :: Int -> JobInfo -> WaiSession () JobInfo
go 0 ji = panicTrace $ "pollUntilWorkFinished exhausted attempts. Last found JobInfo: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode ji)
go n ji = case _jph_status h == "IsPending" || _jph_status h == "IsRunning" of
True -> do
liftIO $ threadDelay 1_000_000
h' <- protectedJSON tkn "GET" (mkUrl port $ mkUrlPiece h) ""
go (n-1) h'
False
| _jph_status h == "IsFailure"
-> panicTrace $ "JobPollHandle contains a failure: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode h)
| otherwise
-> case any hasError (_jph_log h) of
True -> panicTrace $ "JobPollHandle contains a failure: " <> TE.decodeUtf8 (L.toStrict $ JSON.encode h)
False -> pure h
-- FIXME(adn) This is wrong, errs should be >= 1.
hasError :: JobLog -> Bool
hasError JobLog{..} = case _scst_failed of
Nothing -> False
Just errs -> errs > 1
-- | Like HUnit's '@?=', but With a nicer error message in case the two entities are not equal.
(@??=) :: (HasCallStack, ToExpr a, Eq a) => a -> a -> Assertion
actual @??= expected =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment