[worker] this works now, creates db & schema automatically

parent b4efbb6d
Pipeline #6752 failed with stages
in 62 minutes and 29 seconds
......@@ -80,7 +80,9 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _gc_apis = CTypes.APIsConfig { _ac_pubmed_api_key = _gc_pubmed_api_key
, _ac_epo_api_url = _gc_epo_api_url
, _ac_scrapyd_url }
, _gc_worker = WorkerSettings { _wsDefinitions = [] }
, _gc_worker = WorkerSettings { _wsDefinitions = []
, _wsDefaultVisibilityTimeout = 1
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_log_level = LevelDebug
}
where
......
......@@ -18,7 +18,7 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in.
expected_cabal_project_hash="9714d7562e90ad8438108ac87d47c9afdb741b67e34349efb9f65ead0f94cfd8"
expected_cabal_project_hash="eb28225cf0ebf07cc233223d3bbed085ea6ed19e05912c06ecda92a89f8132cb"
expected_cabal_project_freeze_hash="30dd1cf2cb2015351dd0576391d22b187443b1935c2be23599b821ad1ab95f23"
......
......@@ -191,12 +191,12 @@ source-repository-package
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: fcb7d4fb811e5b7239078b48268c469c8d28fdf9
tag: 0591a643d8ba1776af4fac56c1e4ff5fc3e98bb3
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: d159ed580acde0bbdbd7b3b1c33fe1a7d5cff34f
tag: 58ab07e0110281f94ecc8840b8cd0c0a9081b672
source-repository-package
type: git
......
......@@ -240,6 +240,7 @@ library
Gargantext.Core.Viz.Phylo.SynchronicClustering
Gargantext.Core.Viz.Types
Gargantext.Core.Worker
Gargantext.Core.Worker.Broker
Gargantext.Core.Worker.Env
Gargantext.Core.Worker.Jobs
Gargantext.Core.Worker.Jobs.Types
......@@ -524,8 +525,8 @@ library
, graphviz ^>= 2999.20.1.0
, haskell-bee
, haskell-igraph ^>= 0.10.4
, haskell-pgmq >= 0.1.0.0 && < 0.2
, haskell-throttle
, hedis >= 0.15.2 && < 0.16
, hlcm ^>= 0.2.2
, hsinfomap ^>= 0.1
, hsparql ^>= 0.3.8
......@@ -591,6 +592,7 @@ library
, servant-swagger-ui-core >= 0.3.5
, servant-websockets >= 2.0.0 && < 2.1
, servant-xml-conduit ^>= 0.1.0.4
, shelly >= 1.9 && < 2
, singletons ^>= 3.0.2
, singletons-th >= 3.1 && < 3.2
, smtp-mail >= 0.3.0.0
......
{-|
Module : Gargantext.Core.Config.Worker
Description : Worker TOML file config
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
Although Async.Worker.Broker supports various Broker types, in
Gargantext we will only use PGMQ. This makes for easier config,
simpler design. Also, the DevOps stuff is simpler (providing multiple
brokers at the same time could lead to complexities in analyzing
what's going on).
If need arises, we could switch to a different broker by rewriting its
initialization. At the same time, sending and executing jobs should be
broker-agnostic.
-}
module Gargantext.Core.Config.Worker where
import Async.Worker.Broker.Types qualified as Broker
import Database.PostgreSQL.Simple qualified as PGS
import Gargantext.Core.Config.Types (unTOMLConnectInfo)
import Database.PGMQ.Types qualified as PGMQ
import Gargantext.Prelude
import Toml.Schema
type WorkerName = Text
data WorkerSettings =
WorkerSettings {
_wsDatabase :: !PGS.ConnectInfo
-- After this number of seconds, the job will be available again.
-- You can set timeout for each job individually and this is the
-- preferred method over using defaultVt.
, _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout
, _wsDefinitions :: ![WorkerDefinition]
} deriving (Show, Eq)
instance FromValue WorkerSettings where
fromValue = parseTableFromValue $ do
dbConfig <- reqKey "database"
_wsDefinitions <- reqKey "definitions"
let _wsDefaultVisibilityTimeout = 1
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefinitions
, _wsDefaultVisibilityTimeout }
data WorkerDefinition =
WorkerDefinition {
_wdName :: !WorkerName
, _wdQueue :: !Broker.Queue
} deriving (Show, Eq)
instance FromValue WorkerDefinition where
fromValue = parseTableFromValue $ do
_wdName <- reqKey "name"
queue <- reqKey "queue"
return $ WorkerDefinition { _wdQueue = Broker.Queue queue, .. }
findDefinitionByName :: WorkerSettings -> WorkerName -> Maybe WorkerDefinition
findDefinitionByName (WorkerSettings { _wsDefinitions }) workerName =
head $ filter (\wd -> _wdName wd == workerName) _wsDefinitions
-- wdToRedisBrokerInitParams :: WorkerDefinition -> Maybe BRedis.RedisBrokerInitParams
-- wdToRedisBrokerInitParams wd = BRedis.RedisBrokerInitParams <$> (wdToRedisConnectInfo wd)
......@@ -9,7 +9,6 @@ Portability : POSIX
-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError IOException
......@@ -17,7 +16,7 @@ Portability : POSIX
module Gargantext.Core.Worker where
import Async.Worker.Broker.Redis (RedisBroker)
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
......@@ -27,43 +26,42 @@ import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.New (postNode')
import Gargantext.Core.Config.Worker (WorkerDefinition(..), wdToRedisConnectInfo)
import Gargantext.Core.Config (hasConfig)
import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(noJobHandle) )
-- | Spawn a worker with Redis broker
-- | Spawn a worker with PGMQ broker
-- TODO:
-- - reduce size of DB pool
-- - progress report via notifications
-- - I think there is no point to save job result, as usually there is none (we have side-effects only)
-- - replace Servant.Job to use workers instead of garg API threads
withRedisWorker :: (HasWorkerBroker RedisBroker Job)
withPGMQWorker :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> Worker.State RedisBroker Job -> IO ())
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> IO ()
withRedisWorker env wd@(WorkerDefinition { .. }) cb = do
case wdToRedisConnectInfo wd of
Nothing -> panicTrace $ "worker definition: could not create redis conn info"
Just connInfo -> do
broker <- initializeRedisBroker connInfo
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withAsync (Worker.run state') (\a -> cb a state')
withPGMQWorker env (WorkerDefinition { .. }) cb = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withAsync (Worker.run state') (\a -> cb a state')
performAction :: (HasWorkerBroker b Job)
......
{-# LANGUAGE TupleSections #-}
module Gargantext.Core.Worker.Broker
( initBrokerWithDBCreate )
where
import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (Broker, initBroker)
import Async.Worker.Types qualified as WorkerT
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.Core.Config (GargConfig(..), gc_worker)
import Gargantext.Core.Config.Worker (WorkerSettings(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Prelude
import Shelly qualified as SH
-- | Create DB if not exists, then run 'initBroker' (which, in
-- particular, creates the pgmq extension, if needed)
initBrokerWithDBCreate :: (WorkerT.HasWorkerBroker PGMQBroker Job)
=> GargConfig
-> IO (Broker PGMQBroker (WorkerT.Job Job))
initBrokerWithDBCreate gc@(GargConfig { _gc_database_config }) = do
-- By using gargantext db credentials, we create pgmq db (if needed)
let WorkerSettings { .. } = gc ^. gc_worker
let psqlDB = TE.decodeUtf8 $ PSQL.postgreSQLConnectionString _gc_database_config
-- For the \gexec trick, see:
-- https://stackoverflow.com/questions/18389124/simulate-create-database-if-not-exists-for-postgresql
(_res, _ec) <- SH.shelly $ SH.silently $ SH.escaping False $ do
let sql = "\"SELECT 'CREATE DATABASE " <> (T.pack $ PSQL.connectDatabase _wsDatabase) <> "' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '" <> (T.pack $ PSQL.connectDatabase _wsDatabase) <> "')\\gexec\""
result <- SH.run "echo" [sql, "|", "psql", "-d", "\"" <> psqlDB <> "\""]
(result,) <$> SH.lastExitCode
initBroker $ PGMQBrokerInitParams _wsDatabase _wsDefaultVisibilityTimeout
......@@ -13,43 +13,30 @@ Portability : POSIX
module Gargantext.Core.Worker.Jobs where
import Async.Worker.Broker.Redis (RedisBroker, BrokerInitParams(RedisBrokerInitParams))
import Async.Worker.Broker.Types (Broker, initBroker)
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view)
import Database.Redis qualified as Redis
import Gargantext.Core.Config (gc_worker, HasConfig(..))
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..), wdToRedisConnectInfo)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Database.Prelude (Cmd')
import Gargantext.Prelude
initializeRedisBroker :: (HasWorkerBroker RedisBroker Job)
=> Redis.ConnectInfo
-> IO (Broker RedisBroker (Worker.Job Job))
initializeRedisBroker connInfo = do
let initParams = RedisBrokerInitParams connInfo
initBroker initParams
sendJob :: (HasWorkerBroker RedisBroker Job, HasConfig env)
sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env)
=> Job
-> Cmd' env err ()
sendJob job = do
ws <- view $ hasConfig . gc_worker
gcConfig <- view $ hasConfig
let WorkerSettings { _wsDefinitions } = gcConfig ^. gc_worker
-- TODO Try to guess which worker should get this job
-- let mWd = findDefinitionByName ws workerName
let mWd = head $ _wsDefinitions ws
let mWd = head _wsDefinitions
case mWd of
Nothing -> panicTrace $ "worker definition not found"
Nothing -> panicTrace "No worker definitions available"
Just wd -> liftBase $ do
case wdToRedisConnectInfo wd of
Nothing -> panicTrace $ "worker definition: could not create redis conn info"
Just connInfo -> do
b <- initializeRedisBroker connInfo
let queueName = _wdQueue wd
void $ Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd
void $ Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
......@@ -260,7 +260,7 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs:
- .
- commit: d159ed580acde0bbdbd7b3b1c33fe1a7d5cff34f
- commit: 58ab07e0110281f94ecc8840b8cd0c0a9081b672
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs:
- .
......@@ -276,7 +276,7 @@
git: "https://gitlab.iscpif.fr/gargantext/haskell-infomap.git"
subdirs:
- .
- commit: fcb7d4fb811e5b7239078b48268c469c8d28fdf9
- commit: 0591a643d8ba1776af4fac56c1e4ff5fc3e98bb3
git: "https://gitlab.iscpif.fr/gargantext/haskell-pgmq"
subdirs:
- .
......
[cors]
allowed-origins = [
"https://demo.gargantext.org"
, "https://formation.gargantext.org"
, "https://academia.sub.gargantext.org"
, "https://cnrs.gargantext.org"
, "https://imt.sub.gargantext.org"
, "https://helloword.gargantext.org"
, "https://complexsystems.gargantext.org"
, "https://europa.gargantext.org"
, "https://earth.sub.gargantext.org"
, "https://health.sub.gargantext.org"
, "https://msh.sub.gargantext.org"
, "https://dev.sub.gargantext.org"
, "http://localhost:8008"
]
use-origins-for-hosts = true
[microservices.proxy]
port = 8009
enabled = false
[worker]
\ No newline at end of file
......@@ -74,4 +74,13 @@ FR = "spacy://localhost:8001"
All = "corenlp://localhost:9000"
[worker]
definitions = []
[worker.database]
host = "127.0.0.1"
port = 5432
name = "pgmq_test"
user = "gargantua"
pass = "gargantua_test"
[[worker.definitions]]
name = "default"
queue = "default"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment