......@@ -7,7 +7,7 @@ Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
module CLI.Worker where
......@@ -41,9 +41,9 @@ workerCLI (CLIW_run (WorkerArgs { .. })) = do
$ List.cycle ["_"]) :: Prelude.String)
putStrLn ("GarganText worker" :: Text)
putStrLn ("worker_name: " <> worker_name)
putStrLn ("worker toml: " <> _SettingsFile worker_toml)
putText "GarganText worker"
putText $ "worker_name: " <> worker_name
putText $ "worker toml: " <> T.pack (_SettingsFile worker_toml)
withWorkerEnv worker_toml $ \env -> do
......@@ -52,10 +52,12 @@ workerCLI (CLIW_run (WorkerArgs { .. })) = do
Nothing -> do
let workerNames = _wdName <$> (_wsDefinitions ws)
let availableWorkers = T.intercalate ", " workerNames
putStrLn ("Worker definition not found! Available workers: " <> availableWorkers)
putText $ "Worker definition not found! Available workers: " <> availableWorkers
Just wd -> do
putStrLn ("Starting worker '" <> worker_name <> "'")
putStrLn ("Worker settings: " <> show ws :: Text)
putText $ "Starting worker '" <> worker_name <> "'"
putText $ "gc config: " <> show (env ^. hasConfig)
putText $ "Worker settings: " <> show ws
if worker_run_single then
withPGMQWorkerSingle env wd $ \a _state -> do
wait a
......@@ -164,6 +164,7 @@ library
......@@ -246,6 +247,7 @@ library
......@@ -853,6 +855,7 @@ test-suite garg-test-tasty
test bin/gargantext-cli
......@@ -26,7 +26,6 @@ And you have the main viz
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
module Gargantext.API.Admin.Auth
......@@ -50,20 +49,18 @@ import Data.Text.Lazy.Encoding qualified as LE
import Data.UUID (UUID, fromText, toText)
import Data.UUID.V4 (nextRandom)
import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Auth.PolicyCheck
import Gargantext.API.Errors
import Gargantext.API.Prelude (authenticationError, HasServerError, GargServerC, _ServerError, GargM, IsGargServer)
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Config (HasJWTSettings(..))
import Gargantext.Core.Mail (MailModel(..), mail)
import Gargantext.Core.Mail.Types (mailSettings)
import Gargantext.Core.Types.Individu (User(..), Username, GargPassword(..))
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.User.New (guessUserName)
import Gargantext.Database.Admin.Types.Node (NodeId(..))
import Gargantext.Database.Admin.Types.Node (UserId)
import Gargantext.Database.Admin.Types.Node (NodeId(..), UserId)
import Gargantext.Database.Prelude (Cmd', CmdCommon, DbCmd')
import Gargantext.Database.Query.Table.User
import Gargantext.Database.Query.Tree (isDescendantOf, isIn)
......@@ -72,7 +69,6 @@ import Gargantext.Database.Schema.Node (NodePoly(_node_id))
import Gargantext.Prelude hiding (Handler, reverse, to)
import Gargantext.Prelude.Crypto.Auth qualified as Auth
import Gargantext.Prelude.Crypto.Pass.User (gargPass)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant
import Servant.API.Generic ()
import Servant.Auth.Server
......@@ -321,6 +317,9 @@ generateForgotPasswordUUID = do
-- request, because the delay in email sending etc won't reveal to
-- malicious users emails of our users in the db
forgotPasswordAsync :: Named.ForgotPasswordAsyncAPI (AsServerT (GargM Env BackendInternalError))
forgotPasswordAsync = Named.ForgotPasswordAsyncAPI $ AsyncJobs $
serveJobsAPI ForgotPasswordJob $ \_jHandle p -> do
Jobs.sendJob $ Jobs.ForgotPasswordAsync { Jobs._fpa_args = p }
forgotPasswordAsync = Named.ForgotPasswordAsyncAPI $
serveWorkerAPI $ \p ->
Jobs.ForgotPasswordAsync { Jobs._fpa_args = p }
-- forgotPasswordAsync = Named.ForgotPasswordAsyncAPI $ AsyncJobs $
-- serveJobsAPI ForgotPasswordJob $ \_jHandle p -> do
-- Jobs.sendJob $ Jobs.ForgotPasswordAsync { Jobs._fpa_args = p }
......@@ -93,11 +93,11 @@ instance HasLogger (GargM Env BackendInternalError) where
type instance LogInitParams (GargM Env BackendInternalError) = Mode
type instance LogPayload (GargM Env BackendInternalError) = FL.LogStr
initLogger = \mode -> do
initLogger mode = do
logger_set <- liftIO $ FL.newStderrLoggerSet FL.defaultBufSize
pure $ GargLogger mode logger_set
destroyLogger = \GargLogger{..} -> liftIO $ FL.rmLoggerSet logger_set
logMsg = \(GargLogger mode logger_set) lvl msg -> do
destroyLogger (GargLogger{..}) = liftIO $ FL.rmLoggerSet logger_set
logMsg (GargLogger mode logger_set) lvl msg = do
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
......@@ -195,7 +195,8 @@ newEnv logger port settingsFile@(SettingsFile sf) = do
newPool :: ConnectInfo -> IO (Pool Connection)
newPool param = Pool.newPool $ Pool.setNumStripes (Just 1) $ Pool.defaultPoolConfig (connect param) close (60*60) 8
newPool param =
Pool.newPool $ Pool.setNumStripes (Just 1) $ Pool.defaultPoolConfig (connect param) close (60*60) 8
cleanEnv :: (HasConfig env, HasRepo env) => env -> IO ()
......@@ -162,18 +162,20 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
, _wq_pubmedAPIKey = mPubmedAPIKey
, .. }) maybeLimit jobHandle = do
-- TODO ...
$(logLocM) DEBUG $ T.pack $ "(cid, dbs) " <> show (cid, dbs)
$(logLocM) DEBUG $ T.pack $ "datafield " <> show datafield
$(logLocM) DEBUG $ T.pack $ "flowListWith " <> show flw
$(logLocM) DEBUG $ "[addToCorpusWithQuery] (cid, dbs) " <> show (cid, dbs)
$(logLocM) DEBUG $ "[addToCorpusWithQuery] datafield " <> show datafield
$(logLocM) DEBUG $ "[addToCorpusWithQuery] flowListWith " <> show flw
let mEPOAuthKey = EPO.AuthKey <$> (EPO.User <$> _wq_epoAPIUser)
<*> (EPO.Token <$> _wq_epoAPIToken)
$(logLocM) DEBUG $ "[addToCorpusWithQuery] addLanguageToCorpus " <> show cid <> ", " <> show l
addLanguageToCorpus cid l
$(logLocM) DEBUG "[addToCorpusWithQuery] after addLanguageToCorpus"
case datafield of
Just Web -> do
$(logLocM) DEBUG $ T.pack $ "processing web request " <> show datafield
$(logLocM) DEBUG $ "[addToCorpusWithQuery] processing web request " <> show datafield
markStarted 1 jobHandle
......@@ -188,7 +190,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
-- TODO if cid is folder -> create Corpus
-- if cid is corpus -> add to corpus
-- if cid is root -> create corpus in Private
$(logLocM) DEBUG $ T.pack $ "getDataText with query: " <> show q
$(logLocM) DEBUG $ "[addToCorpusWithQuery] getDataText with query: " <> show q
let db = database2origin dbs
-- mPubmedAPIKey <- getUserPubmedAPIKey user
-- printDebug "[addToCorpusWithQuery] mPubmedAPIKey" mPubmedAPIKey
......@@ -198,11 +200,11 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
case eTxt of
Right txt -> do
-- TODO Sum lenghts of each txt elements
$(logLocM) DEBUG "Processing dataText results"
$(logLocM) DEBUG "[addToCorpusWithQuery] Processing dataText results"
markProgress 1 jobHandle
corpusId <- flowDataText user txt (Multi l) cid (Just flw) jobHandle
$(logLocM) DEBUG $ T.pack $ "corpus id " <> show corpusId
$(logLocM) DEBUG $ "[addToCorpusWithQuery] corpus id " <> show corpusId
_ <- commitCorpus cid user
......@@ -213,7 +215,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
Left err -> do
-- printDebug "Error: " err
$(logLocM) ERROR (T.pack $ show err) -- log the full error
$(logLocM) ERROR $ "[addToCorpusWithQuery] error: " <> show err -- log the full error
markFailed (Just err) jobHandle
addToCorpusWithForm :: ( FlowCmdM env err m
......@@ -297,7 +299,7 @@ addToCorpusWithForm user cid nwf jobHandle = do
markComplete jobHandle
Left parseErr -> do
$(logLocM) ERROR $ "parse error: " <> (Parser._ParseFormatError parseErr)
$(logLocM) ERROR $ "[addToCorpusWithForm] parse error: " <> (Parser._ParseFormatError parseErr)
markFailed (Just parseErr) jobHandle
......@@ -333,11 +335,11 @@ addToCorpusWithFile user cid nwf@(NewWithFile _d (withDefaultLanguage -> l) fNam
addLanguageToCorpus cid l
printDebug "[addToCorpusWithFile] Uploading file to corpus: " cid
$(logLocM) DEBUG $ "[addToCorpusWithFile] Uploading file to corpus: " <> show cid
markStarted 1 jobHandle
fPath <- GargDB.writeFile nwf
printDebug "[addToCorpusWithFile] File saved as: " fPath
$(logLocM) DEBUG $ "[addToCorpusWithFile] File saved as: " <> show fPath
uId <- getUserId user
nIds <- mkNodeWithParent NodeFile (Just cid) uId fName
......@@ -349,12 +351,12 @@ addToCorpusWithFile user cid nwf@(NewWithFile _d (withDefaultLanguage -> l) fNam
_ <- updateHyperdata nId $ hl { _hff_name = fName
, _hff_path = T.pack fPath }
printDebug "[addToCorpusWithFile] Created node with id: " nId
$(logLocM) DEBUG $ "[addToCorpusWithFile] Created node with id: " <> show nId
_ -> pure ()
printDebug "[addToCorpusWithFile] File upload to corpus finished: " cid
$(logLocM) DEBUG $ "[addToCorpusWithFile] File upload to corpus finished: " <> show cid
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
$(logLocM) DEBUG $ "[addToCorpusWithFile] sending email: " <> ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
markComplete jobHandle
Module : Gargantext.API.Node.Corpus.Update
Description : API Node corpus update
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
{-# LANGUAGE TypeApplications #-}
module Gargantext.API.Node.Corpus.Update
( addLanguageToCorpus )
import Control.Lens
import Control.Lens (over)
import Control.Monad
import Data.Proxy
import Gargantext.Core
import Gargantext.Database.Admin.Types.Hyperdata.Corpus
import Gargantext.Database.Admin.Types.Node
......@@ -17,6 +28,7 @@ import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs
-- | Updates the 'HyperdataCorpus' with the input 'Lang'.
addLanguageToCorpus :: (HasNodeError err, DbCmd' env err m, MonadJobStatus m)
=> CorpusId
......@@ -11,7 +11,6 @@ Portability : POSIX
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Node.FrameCalcUpload where
......@@ -61,9 +61,9 @@ postNodeAsyncAPI
-> Named.PostNodeAsyncAPI (AsServerT (GargM Env BackendInternalError))
postNodeAsyncAPI authenticatedUser nId = Named.PostNodeAsyncAPI $ AsyncJobs $
serveJobsAPI NewNodeJob $ \_jHandle p -> do
Jobs.sendJob $ Jobs.NewNodeAsync { Jobs._nna_node_id = nId
, Jobs._nna_authenticatedUser = authenticatedUser
, Jobs._nna_postNode = p }
void $ Jobs.sendJob $ Jobs.NewNodeAsync { Jobs._nna_node_id = nId
, Jobs._nna_authenticatedUser = authenticatedUser
, Jobs._nna_postNode = p }
-- postNodeAsync authenticatedUser nId p jHandle
......@@ -24,8 +24,7 @@ import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Query qualified as API
import Gargantext.Core.Text.List.Social (FlowSocialListWith)
import Gargantext.Core.Types (NodeId)
import Gargantext.Core.Utils.Prefix (unPrefix)
import Gargantext.Core.Utils.Prefix (unPrefixSwagger)
import Gargantext.Core.Utils.Prefix (unPrefix, unPrefixSwagger)
import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Prelude
import Servant.Job.Utils (jsonOptions)
......@@ -25,6 +25,7 @@ import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
......@@ -46,19 +47,26 @@ waitAPI n = do
pure $ "Waited: " <> show n
addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError))
addCorpusWithQuery user = Named.AddWithQuery $ \cid -> AsyncJobs $
serveJobsAPI AddCorpusQueryJob $ \_jHandle q -> do
-- limit <- view $ hasConfig . gc_jobs . jc_max_docs_scrapers
-- New.addToCorpusWithQuery user cid q (Just $ fromIntegral limit) jHandle
Jobs.sendJob $ Jobs.AddCorpusWithQuery { Jobs._acq_args = q
, Jobs._acq_user = user
, Jobs._acq_cid = cid }
-- addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError))
-- addCorpusWithQuery user = Named.AddWithQuery $ \cid -> AsyncJobs $
-- serveJobsAPI AddCorpusQueryJob $ \_jHandle q -> do
-- -- limit <- view $ hasConfig . gc_jobs . jc_max_docs_scrapers
-- -- New.addToCorpusWithQuery user cid q (Just $ fromIntegral limit) jHandle
-- void $ Jobs.sendJob $ Jobs.AddCorpusWithQuery { Jobs._acq_args = q
-- , Jobs._acq_user = user
-- , Jobs._acq_cid = cid }
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError))
addCorpusWithQuery user = Named.AddWithQuery $ \cId ->
serveWorkerAPI $ \p ->
Jobs.AddCorpusWithQuery { Jobs._acq_args = p
, Jobs._acq_user = user
, Jobs._acq_cid = cId }
addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
serveJobsAPI AddCorpusFormJob $ \_jHandle i -> do
......@@ -66,9 +74,9 @@ addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
-- called in a few places, and the job status might be different between invocations.
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
, Jobs._acf_user = user
, Jobs._acf_cid = cid }
void $ Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
, Jobs._acf_user = user
, Jobs._acf_cid = cid }
--addCorpusWithFile :: User -> ServerT Named.AddWithFile (GargM Env BackendInternalError)
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
module Gargantext.API.Routes.Named (
-- * Routes types
......@@ -22,11 +21,11 @@ import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.FrontEnd (FrontEndAPI)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.GraphQL
import Gargantext.API.Routes.Named.Private
import Gargantext.API.Routes.Named.Public
import Gargantext.API.Routes.Types
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Core.Notifications.Dispatcher.WebSocket qualified as Dispatcher
import Servant.API ((:>), (:-), JSON, ReqBody, Post, Get, QueryParam)
import Servant.API.Description (Summary)
......@@ -98,7 +97,7 @@ data ForgotPasswordAPI mode = ForgotPasswordAPI
data ForgotPasswordAsyncAPI mode = ForgotPasswordAsyncAPI
{ forgotPasswordAsyncEp :: mode :- Summary "Forgot password asnc"
:> NamedRoutes (AsyncJobs JobLog '[JSON] ForgotPasswordAsyncParams JobLog)
:> NamedRoutes (WorkerAPI ForgotPasswordAsyncParams)
} deriving Generic
......@@ -12,6 +12,7 @@ import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.Corpus.Export.Types
import Gargantext.API.Node.Types
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Core.Text.Ngrams (NgramsType(..))
import Gargantext.Database.Admin.Types.Node
import Servant
......@@ -40,5 +41,6 @@ newtype AddWithQuery mode = AddWithQuery
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "query"
:> NamedRoutes (AsyncJobs JobLog '[JSON] WithQuery JobLog)
-- :> NamedRoutes (AsyncJobs JobLog '[JSON] WithQuery JobLog)
:> NamedRoutes (WorkerAPI WithQuery)
} deriving Generic
Module : Gargantext.API.Worker
Description : New-style Worker API (no more servant-job)
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Worker where
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types
import Gargantext.Prelude
import Servant.API ((:>), (:-), JSON, Post, ReqBody)
import Servant.Server.Generic (AsServerT)
data WorkerAPI input mode = WorkerAPI
{ workerAPIPost :: mode :- ReqBody '[JSON] input
:> Post '[JSON] JobInfo }
deriving Generic
-- serveWorkerAPI :: ( HasWorkerBroker PGMQBroker Job
-- , m ~ GargM Env BackendInternalError )
-- => (input -> Job)
-- -> input
-- -> WorkerJob (AsServerT m)
-- -- -> ServerT (Post '[JSON] JobInfo) m
-- -- -> Cmd' env err JobInfo
-- serveWorkerAPI f i = do
-- mId <- sendJob $ f i
-- pure $ JobInfo { _ji_message_id = mId }
serveWorkerAPI :: IsGargServer env err m
=> (input -> Job)
-> WorkerAPI input (AsServerT m)
serveWorkerAPI f = WorkerAPI { workerAPIPost }
workerAPIPost i = do
mId <- sendJob $ f i
pure $ JobInfo { _ji_message_id = mId }
......@@ -96,7 +96,14 @@ gServer (NotificationsConfig { .. }) = do
-- send the same message that we received
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
_ -> logMsg ioLogger DEBUG $ "[central_exchange] unknown message"
Just (UpdateWorkerProgress ji jl) -> do
logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
Just (WorkerJobStarted nodeId ji) -> do
logMsg ioLogger DEBUG $ "[central_exchange] worker job started: " <> show nodeId <> ", " <> show ji
void $ timeout 100_000 $ send s_dispatcher r
Nothing ->
logMsg ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
notify :: NotificationsConfig -> CEMessage -> IO ()
......@@ -22,6 +22,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Lazy qualified as BSL
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.Core.Types (NodeId)
import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Prelude
import Prelude qualified
import Servant.Job.Core (Safety(Safe))
......@@ -35,13 +36,24 @@ various events).
data CEMessage =
-- | Old-style jobs, update progress
UpdateJobProgress (JobStatus 'Safe JobLog)
-- | New-style jobs (async worker).
-- Please note that (I think) all jobs are associated with some NodeId
-- (providing a nodeId allows us to discover new jobs on the frontend).
-- | UpdateWorkerProgress JobInfo NodeId JobLog
| UpdateWorkerProgress JobInfo JobLog
-- | Update tree for given nodeId
| UpdateTreeFirstLevel NodeId
| WorkerJobStarted NodeId JobInfo
instance Prelude.Show CEMessage where
show (UpdateJobProgress js) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
show (WorkerJobStarted nodeId ji) = "WorkerJobStarted " <> show nodeId <> " " <> show ji
instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type"
......@@ -49,18 +61,40 @@ instance FromJSON CEMessage where
"update_job_progress" -> do
js <- o .: "js"
pure $ UpdateJobProgress js
"update_worker_progress" -> do
ji <- o .: "ji"
jl <- o .: "jl"
-- nodeId <- o .: "node_id"
-- pure $ UpdateWorkerProgress ji nodeId jl
pure $ UpdateWorkerProgress ji jl
"update_tree_first_level" -> do
node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id
"worker_job_started" -> do
nodeId <- o .: "node_id"
ji <- o .: "ji"
pure $ WorkerJobStarted nodeId ji
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where
toJSON (UpdateJobProgress js) = object [
"type" .= toJSON ("update_job_progress" :: Text)
, "js" .= toJSON js
toJSON (UpdateTreeFirstLevel node_id) = object [
-- toJSON (UpdateWorkerProgress ji nodeId jl) = object [
toJSON (UpdateWorkerProgress ji jl) = object [
"type" .= toJSON ("update_worker_progress" :: Text)
, "ji" .= toJSON ji
, "jl" .= toJSON jl
-- , "node_id" .= toJSON nodeId
toJSON (UpdateTreeFirstLevel nodeId) = object [
"type" .= toJSON ("update_tree_first_level" :: Text)
, "node_id" .= toJSON node_id
, "node_id" .= toJSON nodeId
toJSON (WorkerJobStarted nodeId ji) = object [
"type" .= toJSON ("worker_job_started" :: Text)
, "node_id" .= toJSON nodeId
, "ji" .= toJSON ji
......@@ -38,7 +38,7 @@ import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS
import Servant.Job.Types (JobStatus(_job_id))
import Servant.Job.Types (job_id)
import StmContainers.Set qualified as SSet
......@@ -140,30 +140,63 @@ sendNotification :: TChan.TChan ((ByteString, Topic), (WS.Connection, WS.DataMes
-> IO ()
sendNotification throttleTChan ceMessage sub = do
let ws = s_ws_key_connection sub
-- 'topic' is where the client subscribed, ceMessage is server's
-- message to a client
let topic = s_topic sub
notification <-
case ceMessage of
CETypes.UpdateJobProgress jobStatus -> do
pure $ Notification topic (MJobProgress jobStatus)
CETypes.UpdateTreeFirstLevel _nodeId -> pure $ Notification topic MEmpty
let id' = (wsKey ws, topic)
atomically $ TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
let mNotification =
case (topic, ceMessage) of
(UpdateJobProgress jId, CETypes.UpdateJobProgress jobStatus) -> do
if jId == jobStatus ^. job_id
then Just $ NUpdateJobProgress jId (MJobStatus jobStatus)
else Nothing
-- (UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' nodeId jobLog) -> do
(UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' jobLog) -> do
if jobInfo == jobInfo'
-- then Just $ NUpdateWorkerProgress jobInfo nodeId (MJobLog jobLog)
then Just $ NUpdateWorkerProgress jobInfo (MJobLog jobLog)
else Nothing
(UpdateTree nodeId, CETypes.UpdateTreeFirstLevel nodeId') ->
if nodeId == nodeId'
then Just $ NUpdateTree nodeId
else Nothing
(UpdateTree nodeId, CETypes.WorkerJobStarted nodeId' ji) ->
if nodeId == nodeId'
then Just $ NWorkerJobStarted nodeId ji
else Nothing
_ -> Nothing
case mNotification of
Nothing -> pure ()
Just notification -> do
let id' = (wsKey ws, topic)
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[sendNotification] dispatching notification: " <> show notification
atomically $ do
TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
sendDataMessageThrottled :: (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled (conn, msg) =
WS.sendDataMessage conn msg
-- Custom filtering of list of Subscriptions based on
-- CETypes.CEMessage.
-- | Custom filtering of list of Subscriptions based on
-- 'CETypes.CEMessage'.
-- For example, we can add CEMessage.Broadcast to propagate a
-- notification to all connections.
_filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
_filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions
-- | Predicate, whether 'Subscription' matches given
-- 'CETypes.CEMessage' (i.e. should given 'Subscription' be informed
-- of this message).
ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool
ceMessageSubPred (CETypes.UpdateJobProgress js) (Subscription { s_topic }) =
s_topic == (UpdateJobProgress $ _job_id js)
ceMessageSubPred (CETypes.UpdateTreeFirstLevel node_id) (Subscription { s_topic }) =
s_topic == UpdateTree node_id
s_topic == UpdateJobProgress (js ^. job_id)
-- ceMessageSubPred (CETypes.UpdateWorkerProgress ji nodeId _jl) (Subscription { s_topic }) =
ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }) =
s_topic == UpdateWorkerProgress ji
-- || s_topic == UpdateTree nodeId
ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId
ceMessageSubPred (CETypes.WorkerJobStarted nodeId _ji) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId
......@@ -34,6 +34,7 @@ import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg
......@@ -57,15 +58,19 @@ data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
UpdateJobProgress (JobID 'Safe)
-- | New, worker version for updating job state
| UpdateWorkerProgress JobInfo
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
| UpdateTree NodeId
deriving (Eq, Ord)
instance Prelude.Show Topic where
show (UpdateJobProgress jId) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId)
show (UpdateWorkerProgress ji) = "UpdateWorkerProgress " <> show ji
show (UpdateTree nodeId) = "UpdateTree " <> show nodeId
instance Hashable Topic where
hashWithSalt salt (UpdateJobProgress jId) = hashWithSalt salt ("update-job-progress" :: Text, Aeson.encode jId)
hashWithSalt salt (UpdateWorkerProgress ji) = hashWithSalt salt ("update-worker-progress" :: Text, Aeson.encode ji)
hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId)
instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do
......@@ -74,6 +79,9 @@ instance FromJSON Topic where
"update_job_progress" -> do
jId <- o .: "j_id"
pure $ UpdateJobProgress jId
"update_worker_progress" -> do
ji <- o .: "ji"
pure $ UpdateWorkerProgress ji
"update_tree" -> do
node_id <- o .: "node_id"
pure $ UpdateTree node_id
......@@ -83,40 +91,43 @@ instance ToJSON Topic where
"type" .= toJSON ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
toJSON (UpdateWorkerProgress ji) = Aeson.object [
"type" .= toJSON ("update_worker_progress" :: Text)
, "ji" .= toJSON ji
toJSON (UpdateTree node_id) = Aeson.object [
"type" .= toJSON ("update_tree" :: Text)
, "node_id" .= toJSON node_id
-- | A message to be sent inside a Notification
data Message =
MJobProgress (JobStatus 'Safe JobLog)
| MEmpty
-- | For tests
instance Eq Message where
(==) (MJobProgress js1) (MJobProgress js2) = _job_id js1 == _job_id js2
(==) MEmpty MEmpty = True
(==) _ _ = False
instance Prelude.Show Message where
show (MJobProgress jobStatus) = "MJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jobStatus)
show MEmpty = "MEmpty"
instance ToJSON Message where
toJSON (MJobProgress jobStatus) = Aeson.object [
"type" .= toJSON ("MJobProgress" :: Text)
, "job_status" .= toJSON jobStatus
-- | A job status message
newtype MJobStatus = MJobStatus (JobStatus 'Safe JobLog)
instance Prelude.Show MJobStatus where
show (MJobStatus js) = "MJobStatus " <> show (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
instance ToJSON MJobStatus where
toJSON (MJobStatus js) = Aeson.object [
"type" .= toJSON ("MJobLog" :: Text)
, "job_status" .= toJSON js
toJSON MEmpty = Aeson.object [
"type" .= toJSON ("MEmpty" :: Text)
instance FromJSON MJobStatus where
parseJSON = Aeson.withObject "MJobStatus" $ \o -> do
js <- o .: "job_status"
pure $ MJobStatus js
-- | A job progress message
newtype MJobLog = MJobLog JobLog
instance Prelude.Show MJobLog where
show (MJobLog jl) = "MJobLog " <> show jl
instance ToJSON MJobLog where
toJSON (MJobLog jl) = Aeson.object [
"type" .= toJSON ("MJobLog" :: Text)
, "job_log" .= toJSON jl
instance FromJSON Message where
parseJSON = Aeson.withObject "Message" $ \o -> do
type_ <- o .: "type"
case type_ of
"MJobProgress" -> do
job_status <- o .: "job_status"
pure $ MJobProgress job_status
"MEmpty" -> pure MEmpty
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance FromJSON MJobLog where
parseJSON = Aeson.withObject "MJobLog" $ \o -> do
jl <- o .: "job_log"
pure $ MJobLog jl
data ConnectedUser =
......@@ -205,20 +216,59 @@ class HasDispatcher env dispatcher where
-- | A notification is sent to clients who subscribed to specific topics
data Notification =
Notification Topic Message
deriving (Show)
NUpdateJobProgress (JobID 'Safe) MJobStatus
-- | NUpdateWorkerProgress JobInfo NodeId MJobLog
| NUpdateWorkerProgress JobInfo MJobLog
| NUpdateTree NodeId
| NWorkerJobStarted NodeId JobInfo
instance Prelude.Show Notification where
show (NUpdateJobProgress jId mjs) = "NUpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) <> ", " <> show mjs
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
show (NUpdateWorkerProgress jobInfo mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show mJobLog
show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId
show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji
instance ToJSON Notification where
toJSON (Notification topic message) = Aeson.object [
"notification" .= toJSON (Aeson.object [
"topic" .= toJSON topic
, "message" .= toJSON message
toJSON (NUpdateJobProgress jId mjs) = Aeson.object [
"type" .= ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
, "job_status" .= toJSON mjs
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
toJSON (NUpdateWorkerProgress jobInfo mJobLog) = Aeson.object [
"type" .= ("update_worker_progress" :: Text)
, "job_info" .= toJSON jobInfo
, "job_log" .= toJSON mJobLog
-- , "node_id" .= toJSON nodeId
toJSON (NUpdateTree nodeId) = Aeson.object [
"type" .= ("update_tree" :: Text)
, "node_id" .= toJSON nodeId
toJSON (NWorkerJobStarted nodeId ji) = Aeson.object [
"type" .= ("worker_job_started" :: Text)
, "node_id" .= toJSON nodeId
, "ji" .= toJSON ji
-- We don't need to decode notifications, this is for tests only
instance FromJSON Notification where
parseJSON = Aeson.withObject "Notification" $ \o -> do
n <- o .: "notification"
topic <- n .: "topic"
message <- n .: "message"
pure $ Notification topic message
t <- o .: "type"
case t of
"update_job_progress" -> do
jId <- o .: "j_id"
mjs <- o .: "job_status"
pure $ NUpdateJobProgress jId mjs
"update_worker_progress" -> do
jobInfo <- o .: "job_info"
mJobLog <- o .: "job_log"
-- nodeId <- o .: "node_id"
-- pure $ NUpdateWorkerProgress jobInfo nodeId mJobLog
pure $ NUpdateWorkerProgress jobInfo mJobLog
"update_tree" -> do
nodeId <- o .: "node_id"
pure $ NUpdateTree nodeId
"worker_job_started" -> do
nodeId <- o .: "node_id"
ji <- o .: "ji"
pure $ NWorkerJobStarted nodeId ji
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
......@@ -9,6 +9,7 @@ Portability : POSIX
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError IOException
......@@ -17,7 +18,7 @@ module Gargantext.Core.Worker where
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage, messageId)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
import Async.Worker.Types (HasWorkerBroker)
......@@ -26,15 +27,19 @@ import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Types (WithQuery(..))
import Gargantext.Core.Config (hasConfig, gc_jobs)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CE
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Admin.Types.Node (NodeId(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(noJobHandle) )
import Gargantext.System.Logging ( logLocM, LogLevel(..) )
......@@ -50,10 +55,25 @@ initWorkerState env (WorkerDefinition { .. }) = do
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onMessageReceived = Just $ markJobStarted env
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
, onJobTimeout = Just $ \_s bm -> putStrLn ("on job timeout: " <> show (toA $ getMessage bm) :: Text)
, onJobError = Nothing
, onWorkerKilledSafely = Nothing }
markJobStarted :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
markJobStarted env (Worker.State { name }) bm = do
let j = toA $ getMessage bm
putStrLn $ "[" <> name <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm }
case Worker.job j of
AddCorpusWithQuery { _acq_args = WithQuery { _wq_node_id } } -> do
runWorkerMonad env $ CE.ce_notify $ CE.WorkerJobStarted (UnsafeMkNodeId _wq_node_id) ji
_ -> pure ()
-- | Spawn a worker with PGMQ broker
......@@ -84,30 +104,34 @@ withPGMQWorkerSingle env wd cb = do
-- | How the worker should process jobs
performAction :: (HasWorkerBroker b Job)
performAction :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State b Job
-> BrokerMessage b (Worker.Job Job)
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> IO ()
performAction env _state bm = do
let job' = toA $ getMessage bm
let ji = JobInfo { _ji_message_id = messageId bm }
let jh = WorkerJobHandle { _w_job_info = ji }
case Worker.job job' of
Ping -> putStrLn ("ping" :: Text)
Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping"
AddCorpusFormAsync { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("add corpus form" :: Text)
addToCorpusWithForm _acf_user _acf_cid _acf_args (noJobHandle (Proxy :: Proxy WorkerMonad))
$(logLocM) DEBUG $ "[performAction] add corpus form"
addToCorpusWithForm _acf_user _acf_cid _acf_args jh
AddCorpusWithQuery { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("add corpus with query" :: Text)
$(logLocM) DEBUG "[performAction] add corpus with query"
let limit = Just $ fromIntegral $ env ^. hasConfig . gc_jobs . jc_max_docs_scrapers
addToCorpusWithQuery _acq_user _acq_cid _acq_args limit (noJobHandle (Proxy :: Proxy WorkerMonad))
addToCorpusWithQuery _acq_user _acq_cid _acq_args limit jh
ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> runWorkerMonad env $ do
liftBase $ putStrLn ("forgot password: " <> email)
$(logLocM) DEBUG $ "[performAction] forgot password: " <> email
us <- getUsersWithEmail (T.toLower email)
case us of
[u] -> forgotUserPassword u
_ -> pure ()
NewNodeAsync { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("new node async " :: Text)
$(logLocM) DEBUG $ "[performAction] new node async "
void $ postNode' _nna_authenticatedUser _nna_node_id _nna_postNode
GargJob { _gj_garg_job } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "Garg job: " <> show _gj_garg_job <> " (handling of this job is still not implemented!)"
return ()
GargJob { _gj_garg_job } -> putStrLn ("Garg job: " <> show _gj_garg_job <> " (handling of this job is still not implemented!)" :: Text)
......@@ -17,14 +17,17 @@ Portability : POSIX
module Gargantext.Core.Worker.Env where
import Control.Concurrent.STM.TVar (TVar, modifyTVar, newTVarIO, readTVarIO)
import Control.Lens (prism', to, view)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Pool (Pool)
import Data.Maybe (fromJust)
import Data.Pool qualified as Pool
import Data.Text qualified as T
import Database.PostgreSQL.Simple (Connection)
import Gargantext.API.Admin.EnvTypes (ConcreteJobHandle, GargJob, Mode(Dev), modeToLoggingLevels)
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.EnvTypes (GargJob, Mode(Dev), modeToLoggingLevels)
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
import Gargantext.API.Admin.Settings ( newPool )
-- import Gargantext.API.Admin.Settings ( newPool )
import Gargantext.API.Job (RemainingSteps(..), jobLogStart)
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
......@@ -36,11 +39,12 @@ import Gargantext.Core.Mail.Types (HasMail(..))
import Gargantext.Core.NLP (HasNLPServer(..), NLPServerMap, nlpServerMap)
import Gargantext.Core.NodeStory (HasNodeStoryEnv(..), HasNodeStoryImmediateSaver(..), HasNodeArchiveStoryImmediateSaver(..), NodeStoryEnv, fromDBNodeStoryEnv, nse_saver_immediate, nse_archive_saver_immediate)
import Gargantext.Core.Types (HasValidationError(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Tree.Error (HasTreeError(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, MonadLogger(..), withLoggerHoisted)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, withLoggerHoisted)
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle )
import GHC.IO.Exception (IOException(..), IOErrorType(OtherError))
import Prelude qualified
......@@ -50,12 +54,18 @@ import System.Log.FastLogger qualified as FL
data WorkerEnv = WorkerEnv
{ _w_env_config :: ~GargConfig
, _w_env_logger :: ~(Logger (GargM WorkerEnv IOException))
, _w_env_pool :: ~(Pool Connection)
, _w_env_pool :: ~(Pool.Pool PSQL.Connection)
, _w_env_nodeStory :: ~NodeStoryEnv
, _w_env_mail :: ~Mail.MailConfig
, _w_env_nlp :: ~NLPServerMap
, _w_env_job_state :: ~(TVar (Maybe WorkerJobState))
data WorkerJobState = WorkerJobState
{ _wjs_job_info :: JobInfo
, _wjs_job_log :: JobLog }
deriving (Show, Eq)
withWorkerEnv :: SettingsFile -> (WorkerEnv -> IO a) -> IO a
withWorkerEnv settingsFile k = withLoggerHoisted Dev $ \logger -> do
......@@ -66,8 +76,11 @@ withWorkerEnv settingsFile k = withLoggerHoisted Dev $ \logger -> do
newWorkerEnv logger = do
cfg <- readConfig settingsFile
--nodeStory_env <- fromDBNodeStoryEnv (_gc_repofilepath cfg)
pool <- newPool $ _gc_database_config cfg
-- pool <- newPool $ _gc_database_config cfg
let dbConfig = _gc_database_config cfg
pool <- Pool.newPool $ Pool.setNumStripes (Just 1) $ Pool.defaultPoolConfig (PSQL.connect dbConfig) PSQL.close 60 4
nodeStory_env <- fromDBNodeStoryEnv pool
_w_env_job_state <- newTVarIO Nothing
pure $ WorkerEnv
{ _w_env_pool = pool
, _w_env_logger = logger
......@@ -75,6 +88,7 @@ withWorkerEnv settingsFile k = withLoggerHoisted Dev $ \logger -> do
, _w_env_config = cfg
, _w_env_mail = _gc_mail_config cfg
, _w_env_nlp = nlpServerMap $ _gc_nlp_config cfg
, _w_env_job_state
instance HasConfig WorkerEnv where
......@@ -88,11 +102,11 @@ instance HasLogger (GargM WorkerEnv IOException) where
type instance LogInitParams (GargM WorkerEnv IOException) = Mode
type instance LogPayload (GargM WorkerEnv IOException) = FL.LogStr
initLogger = \mode -> do
initLogger mode = do
w_logger_set <- liftIO $ FL.newStderrLoggerSet FL.defaultBufSize
pure $ GargWorkerLogger mode w_logger_set
destroyLogger = \GargWorkerLogger{..} -> liftIO $ FL.rmLoggerSet w_logger_set
logMsg = \(GargWorkerLogger mode logger_set) lvl msg -> do
destroyLogger (GargWorkerLogger{..}) = liftIO $ FL.rmLoggerSet w_logger_set
logMsg (GargWorkerLogger mode logger_set) lvl msg = do
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
......@@ -122,7 +136,10 @@ instance MonadLogger (GargM WorkerEnv IOException) where
instance CET.HasCentralExchangeNotification WorkerEnv where
ce_notify m = do
c <- asks (view $ to _w_env_config)
liftBase $ CE.notify (_gc_notifications_config c) m
liftBase $ do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[ce_notify] informing about job start: " <> show (_gc_notifications_config c) <> " :: " <> show m
CE.notify (_gc_notifications_config c) m
instance HasValidationError IOException where
......@@ -170,11 +187,11 @@ instance HasLogger WorkerMonad where
type instance LogInitParams WorkerMonad = Mode
type instance LogPayload WorkerMonad = FL.LogStr
initLogger = \mode -> do
initLogger mode = do
wm_logger_set <- liftIO $ FL.newStderrLoggerSet FL.defaultBufSize
pure $ WorkerMonadLogger mode wm_logger_set
destroyLogger = \WorkerMonadLogger{..} -> liftIO $ FL.rmLoggerSet wm_logger_set
logMsg = \(WorkerMonadLogger mode logger_set) lvl msg -> do
destroyLogger (WorkerMonadLogger{..}) = liftIO $ FL.rmLoggerSet wm_logger_set
logMsg (WorkerMonadLogger mode logger_set) lvl msg = do
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
......@@ -196,11 +213,17 @@ runWorkerMonad env m = do
data WorkerJobHandle = WorkerNoJobHandle
data WorkerJobHandle =
| WorkerJobHandle { _w_job_info :: !JobInfo }
deriving (Show, Eq)
-- | Worker handles 1 job at a time, hence it's enough to provide
-- simple progress tracking
instance MonadJobStatus WorkerMonad where
-- type JobHandle WorkerMonad = WorkerJobHandle
type JobHandle WorkerMonad = ConcreteJobHandle IOException
type JobHandle WorkerMonad = WorkerJobHandle
type JobType WorkerMonad = GargJob
type JobOutputType WorkerMonad = JobLog
type JobEventType WorkerMonad = JobLog
......@@ -210,9 +233,33 @@ instance MonadJobStatus WorkerMonad where
noJobHandle _ = noJobHandle (Proxy :: Proxy WorkerMonad)
getLatestJobStatus _ = WorkerMonad (pure noJobLog)
withTracer _ jh n = n jh
markStarted _ _ = WorkerMonad $ pure ()
markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markProgress _ _ = WorkerMonad $ pure ()
markFailure _ _ _ = WorkerMonad $ pure ()
markComplete _ = WorkerMonad $ pure ()
markFailed _ _ = WorkerMonad $ pure ()
addMoreSteps _ _ = WorkerMonad $ pure ()
updateJobProgress :: WorkerJobHandle -> (JobLog -> JobLog) -> WorkerMonad ()
updateJobProgress WorkerNoJobHandle _ = pure ()
updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do
stateTVar <- asks _w_env_job_state
liftIO $ atomically $ modifyTVar stateTVar updateState
state' <- liftIO $ readTVarIO stateTVar
case state' of
Nothing -> pure ()
Just wjs -> do
CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs)
updateState mwjs =
let initJobLog =
if (_wjs_job_info <$> mwjs) == Just ji
_wjs_job_log (fromJust mwjs)
Just (WorkerJobState { _wjs_job_info = ji
, _wjs_job_log = f initJobLog })
......@@ -13,6 +13,7 @@ Portability : POSIX
module Gargantext.Core.Worker.Jobs where
import Async.Worker.Broker.Types (MessageId)
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as Worker
import Async.Worker.Types (HasWorkerBroker)
......@@ -28,7 +29,8 @@ import Gargantext.Prelude
sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env)
=> Job
-> Cmd' env err ()
-> Cmd' env err (MessageId PGMQBroker)
-- -> Cmd' env err ()
sendJob job = do
gcConfig <- view $ hasConfig
let WorkerSettings { _wsDefinitions } = gcConfig ^. gc_worker
......@@ -40,7 +42,7 @@ sendJob job = do
Just wd -> liftBase $ do
b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd
void $ Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
-- | This is just a list of what's implemented and what not.
......@@ -68,7 +68,7 @@ instance FromJSON Job where
instance ToJSON Job where
toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ]
toJSON (AddCorpusFormAsync { .. }) =
object [ ("type" .= ("AddCorpusFormJob" :: Text))
object [ ("type" .= ("AddCorpusFormAsync" :: Text))
, ("args" .= _acf_args)
, ("user" .= _acf_user)
, ("cid" .= _acf_cid) ]
Module : Gargantext.Core.Worker.Types
Description : Some useful worker types
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
module Gargantext.Core.Worker.Types where
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types qualified as BT
import Data.Aeson ((.=), (.:), object, withObject)
import Data.Swagger (NamedSchema(..), ToSchema(..)) -- , genericDeclareNamedSchema)
import Gargantext.Prelude
data JobInfo = JobInfo { _ji_message_id :: !(BT.MessageId PGMQBroker) }
deriving (Show, Eq, Ord, Generic)
instance ToSchema JobInfo where -- TODO
--declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_ji_")
declareNamedSchema _ = do
return $ NamedSchema (Just "JobInfo") $ mempty
instance FromJSON JobInfo where
parseJSON = withObject "JobInfo" $ \o -> do
_ji_message_id <- o .: "message_id"
pure $ JobInfo { .. }
instance ToJSON JobInfo where
toJSON (JobInfo { .. }) = object [ ("message_id" .= _ji_message_id )]
......@@ -46,5 +46,5 @@ insertContextNodeNgramsW nnnw =
insertNothing = Insert { iTable = contextNodeNgramsTable
, iRows = nnnw
, iReturning = rCount
, iOnConflict = (Just DoNothing)
, iOnConflict = Just DoNothing
......@@ -21,14 +21,25 @@ import Gargantext.Database.Query.Table.Node ( getNodeWithType, getNodesIdWithTyp
import Gargantext.Database.Query.Table.Node.Error ( HasNodeError )
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.System.Logging (withLogger, logMsg, LogLevel(..))
import Opaleye
-- import Debug.Trace (trace)
updateHyperdata :: HyperdataC a => NodeId -> a -> DBCmd err Int64
updateHyperdata i h = mkCmd $ \c -> putStrLn ("before runUpdate_" :: Text) >>
runUpdate_ c (updateHyperdataQuery i h) >>= \res ->
putStrLn ("after runUpdate_" :: Text) >> pure res
updateHyperdata i h = do
mkCmd $ \c -> do
res <- withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[updateHyperdata] before runUpdate_"
liftBase $ putText "[updateHyperdata] before runUpdate_"
res <- runUpdate_ c $ updateHyperdataQuery i h
logMsg ioLogger DEBUG $ "[updateHyperdata] after runUpdate_: " <> show res
liftBase putText $ "[updateHyperdata] after runUpdate_: " <> show res
pure res
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[updateHyperdata] after mkCmd"
liftBase putText $ "[updateHyperdata] after mkCmd"
pure res
updateHyperdataQuery :: HyperdataC a => NodeId -> a -> Update Int64
updateHyperdataQuery i h = seq h' $ {- trace "updateHyperdataQuery: encoded JSON" $ -} Update
......@@ -127,7 +127,7 @@ instance HasLogger IO where
data instance Logger IO = IOLogger LogLevel
type instance LogInitParams IO = ()
type instance LogPayload IO = String
initLogger = \() -> do
initLogger () = do
mLvl <- liftIO $ lookupEnv "LOG_LEVEL"
let lvl = case mLvl of
Nothing -> INFO
......@@ -136,8 +136,8 @@ instance HasLogger IO where
Nothing -> error $ "unknown log level " <> s
Just lvl' -> lvl'
pure $ IOLogger lvl
destroyLogger = \_ -> pure ()
logMsg = \(IOLogger minLvl) lvl msg -> do
destroyLogger _ = pure ()
logMsg (IOLogger minLvl) lvl msg = do
if lvl < minLvl
then pure ()
else do
......@@ -11,6 +11,7 @@ Portability : POSIX
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.Utils.Jobs (
-- * Serving the JOBS API
......@@ -27,7 +28,7 @@ import Gargantext.API.Admin.EnvTypes ( mkJobHandle, parseGargJob, Env, GargJob(.
import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) )
import Gargantext.API.Prelude ( GargM )
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
-- import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal qualified as Internal
......@@ -58,9 +59,10 @@ serveJobsAPI
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do
runExceptT $ flip runReaderT env $ do
$(logLocM) INFO (T.pack $ "Running job of type: " ++ show jobType)
unless (jobType `elem` Jobs.handledJobs) $
Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType }
$(logLocM) DEBUG (T.pack $ "Running job of type: " ++ show jobType)
when (jobType `elem` Jobs.handledJobs) $
panicTrace "[serveJobsAPI] WRONG! Use Garagntext.API.Worker.serveWorkerAPI instead!"
-- void $ Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType }
f jHandle i
getLatestJobStatus jHandle
......@@ -11,7 +11,6 @@ Portability : POSIX
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.Utils.Jobs.Internal (
......@@ -36,5 +36,4 @@ qcTests =
testGroup "Notifications QuickCheck tests" $ do
[ QC.testProperty "CEMessage aeson encoding" $ \m -> A.decode (A.encode (m :: CEMessage)) == Just m
, QC.testProperty "Topic aeson encoding" $ \t -> A.decode (A.encode (t :: Topic)) == Just t
, QC.testProperty "Message aeson encoding" $ \m -> A.decode (A.encode (m :: Message)) == Just m
, QC.testProperty "WSRequest aeson encoding" $ \ws -> A.decode (A.encode (ws :: WSRequest)) == Just ws ]
......@@ -14,6 +14,7 @@ module Test.Core.Worker where
import Data.Aeson qualified as Aeson
import Gargantext.Core.Methods.Similarities.Conditional
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
import Test.Instances ()
import Test.Tasty
......@@ -24,6 +25,9 @@ import Test.Tasty.QuickCheck hiding (Positive, Negative)
tests :: TestTree
tests = testGroup "worker unit tests" [
testProperty "Worker Job to/from JSON serialization is correct" $
\job -> Aeson.decode (Aeson.encode (job :: Job)) == Just job
testProperty "Worker Job to/from JSON serialization is correct" $
\job -> Aeson.decode (Aeson.encode (job :: Job)) == Just job
-- , testProperty "JobInfo to/from JSON serialization is correct" $
-- \ji -> Aeson.decode (Aeson.encode (ji :: JobInfo)) == Just ji
......@@ -141,11 +141,11 @@ instance HasLogger (GargM TestEnv BackendInternalError) where
type instance LogInitParams (GargM TestEnv BackendInternalError) = Mode
type instance LogPayload (GargM TestEnv BackendInternalError) = FL.LogStr
initLogger = \mode -> do
initLogger mode = do
test_logger_set <- liftIO $ FL.newStderrLoggerSet FL.defaultBufSize
pure $ GargTestLogger mode test_logger_set
destroyLogger = \GargTestLogger{..} -> liftIO $ FL.rmLoggerSet test_logger_set
logMsg = \(GargTestLogger mode logger_set) lvl msg -> do
destroyLogger GargTestLogger{..} = liftIO $ FL.rmLoggerSet test_logger_set
logMsg (GargTestLogger mode logger_set) lvl msg = do
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
......@@ -36,6 +36,7 @@ import Gargantext.Core.Text.Ngrams (NgramsType(..))
import Gargantext.Core.Types.Individu qualified as Individu
import Gargantext.Core.Types.Main (ListType(CandidateTerm, StopTerm, MapTerm))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Admin.Types.Node (UserId(UnsafeMkUserId))
import Gargantext.Database.Admin.Types.Hyperdata qualified as Hyperdata
import Gargantext.Prelude hiding (replace, Location)
......@@ -98,8 +99,6 @@ instance Arbitrary Job where
return $ GargJob { _gj_garg_job }
instance Arbitrary Message where
arbitrary = do
msgContent <- arbitrary
......@@ -242,7 +241,8 @@ instance Arbitrary CET.CEMessage where
arbitrary = oneof [
-- | JobStatus to/from json doesn't work
-- CET.UpdateJobProgress <$> arbitrary -
CET.UpdateTreeFirstLevel <$> arbitrary
-- CET.UpdateWorkerProgress <$> arbitrary <*> arbitrary
CET.UpdateTreeFirstLevel <$> arbitrary
deriving instance Eq CET.CEMessage
......@@ -253,12 +253,6 @@ instance Arbitrary DET.Topic where
DET.UpdateTree <$> arbitrary
instance Arbitrary DET.Message where
arbitrary = oneof [
-- | JobStatus to/from json doesn't work
-- DET.MJobProgress <$> arbitrary
pure DET.MEmpty
instance Arbitrary DET.WSRequest where
arbitrary = oneof [ DET.WSSubscribe <$> arbitrary
