[worker] implement AddCorpusWithQuery job

parent a3f4230d
Pipeline #6757 failed with stages
in 27 minutes and 6 seconds
...@@ -418,8 +418,40 @@ https://haskell-language-server.readthedocs.io/en/latest/installation.html ...@@ -418,8 +418,40 @@ https://haskell-language-server.readthedocs.io/en/latest/installation.html
# Async workers # Async workers
Async workers allow us to accept long-running jobs and execute them
asynchronously. Workers can be spawned on multiple machines, which
allows for scaling.
To run the worker, follow these steps: To run the worker, follow these steps:
- run Redis with: `podman run --rm -it -p 6379:6379 redis:latest` - start a PostgreSQL DB, usually the one with Gargantext DB is enough
- `"simple"` worker definition is in `gargantext-settings.toml` - `"simple"` worker definition is in `gargantext-settings.toml`
- run worker: `cabal v2-run gargantext-cli -- worker --name simple` - run worker: `cabal v2-run gargantext-cli -- worker --name simple`
When running the worker for the first time (or sending a job), best
attempt is made to ensure the DB exists (if not, we will try to create
it) and the `pgmq` schema is initialized. This allows for hassle-free
maintenance and easier onboarding.
The project that we base our worker is
[haskell-bee](https://gitlab.iscpif.fr/gargantext/haskell-bee/). It's
a more generic framework for managing asynchronous workers, supporting
different brokers. Here, we decided to use `pgmq` because we already
have PostgreSQL deployed.
## Design
Thanks to the fact that we already use `Servant.Jobs` (which executes
the jobs in the gargantext-API process), we can migrate our jobs more
easily to a different backend.
All job definitions are in `G.A.A.EnvTypes` under `GargJob`
datatype. However, because of a bit different design, the contsructors
for this datatype are empty, without their respective arguments.
Hence, I created `G.C.W.J.Types` with the `Job` datatype. Hopefully,
in the future, we can replace `GargJob` with this datatype.
If you want to add a new job, just add a new constructor to `Job`
(with all the arguments this job needs), implement to/from JSON
serialization (so we can send and read that job via the broker) and
implement appropriate handler in `G.C.Worker` -> `performAction`.
...@@ -152,7 +152,7 @@ default_visibility_timeout = 1 ...@@ -152,7 +152,7 @@ default_visibility_timeout = 1
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
port = 5432 port = 5432
name = "pgmq" name = "gargantext_pgmq"
user = "gargantua" user = "gargantua"
pass = "C8kdcUrAQy66U" pass = "C8kdcUrAQy66U"
......
...@@ -10,28 +10,21 @@ Portability : POSIX ...@@ -10,28 +10,21 @@ Portability : POSIX
-} -}
{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE UndecidableInstances #-}
module Gargantext.API.Routes module Gargantext.API.Routes
where where
import Control.Lens (view)
import Data.Validity import Data.Validity
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..)) import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..)) import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Corpus.New qualified as New
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Annuaire qualified as Named import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.Core.Config (gc_jobs, HasConfig(..))
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs qualified as Jobs import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
...@@ -55,9 +48,12 @@ waitAPI n = do ...@@ -55,9 +48,12 @@ waitAPI n = do
addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError)) addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError))
addCorpusWithQuery user = Named.AddWithQuery $ \cid -> AsyncJobs $ addCorpusWithQuery user = Named.AddWithQuery $ \cid -> AsyncJobs $
serveJobsAPI AddCorpusQueryJob $ \jHandle q -> do serveJobsAPI AddCorpusQueryJob $ \_jHandle q -> do
limit <- view $ hasConfig . gc_jobs . jc_max_docs_scrapers -- limit <- view $ hasConfig . gc_jobs . jc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just $ fromIntegral limit) jHandle -- New.addToCorpusWithQuery user cid q (Just $ fromIntegral limit) jHandle
Jobs.sendJob $ Jobs.AddCorpusWithQuery { Jobs._acq_args = q
, Jobs._acq_user = user
, Jobs._acq_cid = cid }
{- let log' x = do {- let log' x = do
printDebug "addToCorpusWithQuery" x printDebug "addToCorpusWithQuery" x
liftBase $ log x liftBase $ log x
......
...@@ -24,9 +24,10 @@ import Async.Worker.Types (HasWorkerBroker) ...@@ -24,9 +24,10 @@ import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword) import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..)) import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm) import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.New (postNode') import Gargantext.API.Node.New (postNode')
import Gargantext.Core.Config (hasConfig) import Gargantext.Core.Config (hasConfig, gc_jobs)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env import Gargantext.Core.Worker.Env
...@@ -76,6 +77,10 @@ performAction env _state bm = do ...@@ -76,6 +77,10 @@ performAction env _state bm = do
AddCorpusFormAsync { .. } -> runWorkerMonad env $ do AddCorpusFormAsync { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("add corpus form" :: Text) liftBase $ putStrLn ("add corpus form" :: Text)
addToCorpusWithForm _acf_user _acf_cid _acf_args (noJobHandle (Proxy :: Proxy WorkerMonad)) addToCorpusWithForm _acf_user _acf_cid _acf_args (noJobHandle (Proxy :: Proxy WorkerMonad))
AddCorpusWithQuery { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("add corpus with query" :: Text)
let limit = Just $ fromIntegral $ env ^. hasConfig . gc_jobs . jc_max_docs_scrapers
addToCorpusWithQuery _acq_user _acq_cid _acq_args limit (noJobHandle (Proxy :: Proxy WorkerMonad))
ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> runWorkerMonad env $ do ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> runWorkerMonad env $ do
liftBase $ putStrLn ("forgot password: " <> email) liftBase $ putStrLn ("forgot password: " <> email)
us <- getUsersWithEmail (T.toLower email) us <- getUsersWithEmail (T.toLower email)
......
...@@ -18,7 +18,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch) ...@@ -18,7 +18,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams) import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams)
import Gargantext.API.Admin.EnvTypes ( GargJob ) import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.API.Node.New.Types ( PostNode(..) ) import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types (NewWithForm) import Gargantext.API.Node.Types (NewWithForm, WithQuery)
import Gargantext.Core.Types.Individu (User) import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId) import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId)
import Gargantext.Prelude import Gargantext.Prelude
...@@ -29,6 +29,9 @@ data Job = ...@@ -29,6 +29,9 @@ data Job =
| AddCorpusFormAsync { _acf_args :: NewWithForm | AddCorpusFormAsync { _acf_args :: NewWithForm
, _acf_user :: User , _acf_user :: User
, _acf_cid :: CorpusId } , _acf_cid :: CorpusId }
| AddCorpusWithQuery { _acq_args :: WithQuery
, _acq_user :: User
, _acq_cid :: CorpusId }
| ForgotPasswordAsync { _fpa_args :: ForgotPasswordAsyncParams } | ForgotPasswordAsync { _fpa_args :: ForgotPasswordAsyncParams }
| NewNodeAsync { _nna_node_id :: NodeId | NewNodeAsync { _nna_node_id :: NodeId
, _nna_authenticatedUser :: AuthenticatedUser , _nna_authenticatedUser :: AuthenticatedUser
...@@ -45,6 +48,11 @@ instance FromJSON Job where ...@@ -45,6 +48,11 @@ instance FromJSON Job where
_acf_user <- o .: "user" _acf_user <- o .: "user"
_acf_cid <- o .: "cid" _acf_cid <- o .: "cid"
return $ AddCorpusFormAsync { .. } return $ AddCorpusFormAsync { .. }
"AddCorpusWithQuery" -> do
_acq_args <- o .: "args"
_acq_user <- o .: "user"
_acq_cid <- o .: "cid"
return $ AddCorpusWithQuery { .. }
"ForgotPasswordAsync" -> do "ForgotPasswordAsync" -> do
_fpa_args <- o .: "args" _fpa_args <- o .: "args"
return $ ForgotPasswordAsync { _fpa_args } return $ ForgotPasswordAsync { _fpa_args }
...@@ -59,15 +67,24 @@ instance FromJSON Job where ...@@ -59,15 +67,24 @@ instance FromJSON Job where
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s) s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where instance ToJSON Job where
toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ] toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ]
toJSON (AddCorpusFormAsync { .. }) = object [ ("type" .= ("AddCorpusFormJob" :: Text)) toJSON (AddCorpusFormAsync { .. }) =
, ("args" .= _acf_args) object [ ("type" .= ("AddCorpusFormJob" :: Text))
, ("user" .= _acf_user) , ("args" .= _acf_args)
, ("cid" .= _acf_cid) ] , ("user" .= _acf_user)
toJSON (ForgotPasswordAsync { .. }) = object [ ("type" .= ("ForgotPasswordAsync" :: Text)) , ("cid" .= _acf_cid) ]
, ("args" .= _fpa_args) ] toJSON (AddCorpusWithQuery { .. }) =
toJSON (NewNodeAsync { .. }) = object [ ("type" .= ("NewNodeAsync" :: Text)) object [ ("type" .= ("AddCorpusWithQuery" :: Text))
, ("node_id" .= _nna_node_id) , ("args" .= _acq_args)
, ("authenticated_user" .= _nna_authenticatedUser) , ("user" .= _acq_user)
, ("post_node" .= _nna_postNode) ] , ("cid" .= _acq_cid) ]
toJSON (GargJob { .. }) = object [ ("type" .= ("GargJob" :: Text)) toJSON (ForgotPasswordAsync { .. }) =
, ("garg_job" .= _gj_garg_job) ] object [ ("type" .= ("ForgotPasswordAsync" :: Text))
, ("args" .= _fpa_args) ]
toJSON (NewNodeAsync { .. }) =
object [ ("type" .= ("NewNodeAsync" :: Text))
, ("node_id" .= _nna_node_id)
, ("authenticated_user" .= _nna_authenticatedUser)
, ("post_node" .= _nna_postNode) ]
toJSON (GargJob { .. }) =
object [ ("type" .= ("GargJob" :: Text))
, ("garg_job" .= _gj_garg_job) ]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment