[worker] first draft of worker CLI, with TOML config and a simple Ping job

parent d6c03dc3
...@@ -79,6 +79,12 @@ data CLIRoutes ...@@ -79,6 +79,12 @@ data CLIRoutes
| CLIR_export FilePath | CLIR_export FilePath
deriving (Show, Eq) deriving (Show, Eq)
data WorkerArgs = WorkerArgs
{ worker_ini :: !IniFile
, worker_settings :: !SettingsFile
, worker_name :: !Text
} deriving (Show, Eq)
data CLICmd data CLICmd
= CCMD_clean_csv_corpus = CCMD_clean_csv_corpus
| CCMD_filter_terms_and_cooc !CorpusFile !TermListFile !OutputFile | CCMD_filter_terms_and_cooc !CorpusFile !TermListFile !OutputFile
...@@ -92,6 +98,7 @@ data CLICmd ...@@ -92,6 +98,7 @@ data CLICmd
| CCMD_upgrade !UpgradeArgs | CCMD_upgrade !UpgradeArgs
| CCMD_golden_file_diff !GoldenFileDiffArgs | CCMD_golden_file_diff !GoldenFileDiffArgs
| CCMD_routes !CLIRoutes | CCMD_routes !CLIRoutes
| CCMD_worker !WorkerArgs
deriving (Show, Eq) deriving (Show, Eq)
data CLI = data CLI =
......
...@@ -32,6 +32,7 @@ import CLI.Phylo (phyloCLI, phyloCmd) ...@@ -32,6 +32,7 @@ import CLI.Phylo (phyloCLI, phyloCmd)
import CLI.Phylo.Profile (phyloProfileCLI, phyloProfileCmd) import CLI.Phylo.Profile (phyloProfileCLI, phyloProfileCmd)
import CLI.Server.Routes (routesCLI, routesCmd) import CLI.Server.Routes (routesCLI, routesCmd)
import CLI.Upgrade (upgradeCLI, upgradeCmd) import CLI.Upgrade (upgradeCLI, upgradeCmd)
import CLI.Worker (workerCLI, workerCmd)
runCLI :: CLI -> IO () runCLI :: CLI -> IO ()
runCLI = \case runCLI = \case
...@@ -59,6 +60,8 @@ runCLI = \case ...@@ -59,6 +60,8 @@ runCLI = \case
-> fileDiffCLI args -> fileDiffCLI args
CLISub (CCMD_routes args) CLISub (CCMD_routes args)
-> routesCLI args -> routesCLI args
CLISub (CCMD_worker args)
-> workerCLI args
main :: IO () main :: IO ()
...@@ -81,5 +84,6 @@ allOptions = subparser ( ...@@ -81,5 +84,6 @@ allOptions = subparser (
phyloProfileCmd <> phyloProfileCmd <>
upgradeCmd <> upgradeCmd <>
fileDiffCmd <> fileDiffCmd <>
routesCmd routesCmd <>
workerCmd
) )
...@@ -175,6 +175,16 @@ source-repository-package ...@@ -175,6 +175,16 @@ source-repository-package
location: https://github.com/fpringle/servant-routes.git location: https://github.com/fpringle/servant-routes.git
tag: 7694f62af6bc1596d754b42af16da131ac403b3a tag: 7694f62af6bc1596d754b42af16da131ac403b3a
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: fcb7d4fb811e5b7239078b48268c469c8d28fdf9
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: da1a7aaadddb5cfc940243c787ddb2575869f6c9
allow-older: * allow-older: *
allow-newer: * allow-newer: *
......
...@@ -23,3 +23,21 @@ use-origins-for-hosts = true ...@@ -23,3 +23,21 @@ use-origins-for-hosts = true
[microservices.proxy] [microservices.proxy]
port = 8009 port = 8009
enabled = false enabled = false
[worker]
# [worker.pgmq]
# podman run --rm -it -p 5433:5432 -e POSTGRES_PASSWORD=postgres cgenie/pgmq:16-1.3.3.1
# dbHost = localhost
# dbPort = 5433
# dbName = pgmq
# dbUser = postgres
# You can have multiple workers, each one under worker.definitions
[[worker.definitions]]
name = "simple"
queue = "simple"
# podman run --rm -it -p 6379:6379 redis:latest
broker.redis = { host = "localhost", port = 6379 }
...@@ -230,6 +230,9 @@ library ...@@ -230,6 +230,9 @@ library
Gargantext.Core.Viz.Phylo.PhyloTools Gargantext.Core.Viz.Phylo.PhyloTools
Gargantext.Core.Viz.Phylo.SynchronicClustering Gargantext.Core.Viz.Phylo.SynchronicClustering
Gargantext.Core.Viz.Types Gargantext.Core.Viz.Types
Gargantext.Core.Worker
Gargantext.Core.Worker.Jobs
Gargantext.Core.Worker.TOML
Gargantext.Database.Action.Flow Gargantext.Database.Action.Flow
Gargantext.Database.Action.Flow.Types Gargantext.Database.Action.Flow.Types
Gargantext.Database.Action.Metrics.TFICF Gargantext.Database.Action.Metrics.TFICF
...@@ -559,7 +562,9 @@ library ...@@ -559,7 +562,9 @@ library
, gargantext-prelude , gargantext-prelude
, graphviz ^>= 2999.20.1.0 , graphviz ^>= 2999.20.1.0
, hashable ^>= 1.3.0.0 , hashable ^>= 1.3.0.0
, haskell-bee
, haskell-igraph ^>= 0.10.4 , haskell-igraph ^>= 0.10.4
, hedis >= 0.15.2 && < 0.16
, hlcm ^>= 0.2.2 , hlcm ^>= 0.2.2
, hsinfomap ^>= 0.1 , hsinfomap ^>= 0.1
, hsparql ^>= 0.3.8 , hsparql ^>= 0.3.8
...@@ -580,7 +585,7 @@ library ...@@ -580,7 +585,7 @@ library
, iso639 , iso639
, jose ^>= 0.8.4 , jose ^>= 0.8.4
, json-stream ^>= 0.4.2.4 , json-stream ^>= 0.4.2.4
, lens ^>= 4.19.2 , lens >= 5.2.2 && < 5.3
, lens-aeson < 1.3 , lens-aeson < 1.3
, lifted-base ^>= 0.2.3.12 , lifted-base ^>= 0.2.3.12
, listsafe ^>= 0.1.0.1 , listsafe ^>= 0.1.0.1
...@@ -726,6 +731,7 @@ executable gargantext-cli ...@@ -726,6 +731,7 @@ executable gargantext-cli
CLI.Types CLI.Types
CLI.Upgrade CLI.Upgrade
CLI.Utils CLI.Utils
CLI.Worker
Paths_gargantext Paths_gargantext
hs-source-dirs: hs-source-dirs:
bin/gargantext-cli bin/gargantext-cli
...@@ -742,7 +748,9 @@ executable gargantext-cli ...@@ -742,7 +748,9 @@ executable gargantext-cli
, full-text-search ^>= 0.2.1.4 , full-text-search ^>= 0.2.1.4
, gargantext , gargantext
, gargantext-prelude , gargantext-prelude
, haskell-bee
, ini ^>= 0.4.1 , ini ^>= 0.4.1
, lens >= 5.2.2 && < 5.3
, optparse-applicative , optparse-applicative
, optparse-generic ^>= 1.4.7 , optparse-generic ^>= 1.4.7
, parallel ^>= 3.2.2.0 , parallel ^>= 3.2.2.0
......
...@@ -77,6 +77,7 @@ devSettings (JwkFile jwkFile) (SettingsFile settingsFile) = do ...@@ -77,6 +77,7 @@ devSettings (JwkFile jwkFile) (SettingsFile settingsFile) = do
, _scrapydUrl = fromMaybe (panicTrace "Invalid scrapy URL") $ parseBaseUrl "http://localhost:6800" , _scrapydUrl = fromMaybe (panicTrace "Invalid scrapy URL") $ parseBaseUrl "http://localhost:6800"
, _cookieSettings = defaultCookieSettings { cookieXsrfSetting = Just xsrfCookieSetting } -- TODO-SECURITY tune , _cookieSettings = defaultCookieSettings { cookieXsrfSetting = Just xsrfCookieSetting } -- TODO-SECURITY tune
, _jwtSettings = defaultJWTSettings jwk -- TODO-SECURITY tune , _jwtSettings = defaultJWTSettings jwk -- TODO-SECURITY tune
, _workerSettings = _gargWorkerSettings
} }
where where
xsrfCookieSetting = defaultXsrfCookieSettings { xsrfExcludeGet = True } xsrfCookieSetting = defaultXsrfCookieSettings { xsrfExcludeGet = True }
......
...@@ -5,6 +5,7 @@ import Control.Lens hiding ((.=)) ...@@ -5,6 +5,7 @@ import Control.Lens hiding ((.=))
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.Settings.CORS import Gargantext.API.Admin.Settings.CORS
import Gargantext.API.Admin.Settings.MicroServices import Gargantext.API.Admin.Settings.MicroServices
import Gargantext.Core.Worker.TOML
import Gargantext.Prelude (panicTrace) import Gargantext.Prelude (panicTrace)
import Gargantext.System.Logging import Gargantext.System.Logging
import Prelude import Prelude
...@@ -15,6 +16,7 @@ import Servant.Client.Core.BaseUrl ...@@ -15,6 +16,7 @@ import Servant.Client.Core.BaseUrl
data GargTomlSettings = GargTomlSettings data GargTomlSettings = GargTomlSettings
{ _gargCorsSettings :: !CORSSettings { _gargCorsSettings :: !CORSSettings
, _gargMicroServicesSettings :: !MicroServicesSettings , _gargMicroServicesSettings :: !MicroServicesSettings
, _gargWorkerSettings :: !WorkerSettings
} }
makeLenses ''GargTomlSettings makeLenses ''GargTomlSettings
...@@ -23,6 +25,7 @@ settingsCodec :: TomlCodec GargTomlSettings ...@@ -23,6 +25,7 @@ settingsCodec :: TomlCodec GargTomlSettings
settingsCodec = GargTomlSettings settingsCodec = GargTomlSettings
<$> (Toml.table corsSettingsCodec "cors" .= _gargCorsSettings) <$> (Toml.table corsSettingsCodec "cors" .= _gargCorsSettings)
<*> (Toml.table microServicesSettingsCodec "microservices.proxy" .= _gargMicroServicesSettings) <*> (Toml.table microServicesSettingsCodec "microservices.proxy" .= _gargMicroServicesSettings)
<*> (Toml.table workerSettingsCodec "worker" .= _gargWorkerSettings)
-- | Extends the 'allowed-origins' in the CORSettings with the URLs embellished -- | Extends the 'allowed-origins' in the CORSettings with the URLs embellished
-- with the proxy port. -- with the proxy port.
......
...@@ -6,10 +6,11 @@ import Control.Lens ...@@ -6,10 +6,11 @@ import Control.Lens
import Control.Monad.Logger (LogLevel) import Control.Monad.Logger (LogLevel)
import GHC.Enum import GHC.Enum
import Gargantext.API.Admin.Settings.CORS import Gargantext.API.Admin.Settings.CORS
import Gargantext.API.Admin.Settings.MicroServices
import Gargantext.Core.Worker.TOML
import Gargantext.Prelude import Gargantext.Prelude
import Servant.Auth.Server (JWTSettings, CookieSettings(..)) import Servant.Auth.Server (JWTSettings, CookieSettings(..))
import Servant.Client (BaseUrl) import Servant.Client (BaseUrl)
import Gargantext.API.Admin.Settings.MicroServices
type PortNumber = Int type PortNumber = Int
...@@ -30,6 +31,7 @@ data Settings = Settings ...@@ -30,6 +31,7 @@ data Settings = Settings
, _cookieSettings :: !CookieSettings , _cookieSettings :: !CookieSettings
, _sendLoginEmails :: !SendEmailType , _sendLoginEmails :: !SendEmailType
, _scrapydUrl :: !BaseUrl , _scrapydUrl :: !BaseUrl
, _workerSettings :: !WorkerSettings
} }
makeLenses ''Settings makeLenses ''Settings
......
{-|
Module : Gargantext.Core.Worker
Description : Asynchronous worker logic
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.Core.Worker where
import Async.Worker.Broker.Redis (RedisBroker)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T
import Database.Redis qualified as Redis
import Gargantext.Core.Worker.Jobs
import Gargantext.Core.Worker.TOML (WorkerDefinition(..))
import Gargantext.Prelude
withRedisWorker :: (HasWorkerBroker RedisBroker Job)
=> Redis.ConnectInfo
-> WorkerDefinition
-> (Async () -> Worker.State RedisBroker Job -> IO ())
-> IO ()
withRedisWorker connInfo (WorkerDefinition { .. }) cb = do
broker <- initializeRedisBroker connInfo
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withAsync (Worker.run state') (\a -> cb a state')
performAction :: (HasWorkerBroker b Job)
=> Worker.State b Job
-> BrokerMessage b (Worker.Job Job)
-> IO ()
performAction _state bm = do
let job' = toA $ getMessage bm
case Worker.job job' of
Ping -> putStrLn ("ping" :: Text)
{-|
Module : Gargantext.Core.Worker.Jobs
Description : Worker job definitions
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.Core.Worker.Jobs where
import Async.Worker.Broker.Redis (RedisBroker, BrokerInitParams(RedisBrokerInitParams))
import Async.Worker.Broker.Types (Broker, initBroker)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view)
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Database.Redis qualified as Redis
import Gargantext.API.Admin.Types (HasSettings, settings, workerSettings)
import Gargantext.Core.Worker.TOML (findDefinitionByName, WorkerDefinition(..))
import Gargantext.Database.Prelude (Cmd')
import Gargantext.Prelude
data Job =
Ping
deriving (Show, Eq)
instance FromJSON Job where
parseJSON = withObject "Job" $ \o -> do
type_ <- o .: "type"
case type_ of
"Ping" -> return Ping
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [("type" .= ("Ping" :: Text))]
initializeRedisBroker :: (HasWorkerBroker RedisBroker Job)
=> Redis.ConnectInfo
-> IO (Broker RedisBroker (Worker.Job Job))
initializeRedisBroker connInfo = do
let initParams = RedisBrokerInitParams connInfo
initBroker initParams
sendJob :: (HasWorkerBroker RedisBroker Job, HasSettings env)
=> Redis.ConnectInfo
-> Text
-> Job
-> Cmd' env err ()
sendJob connInfo workerName job = do
ws <- view $ settings . workerSettings
let mWd = findDefinitionByName ws workerName
case mWd of
Nothing -> panicTrace $ "worker definition not found for " <> workerName
Just wd -> liftBase $ do
b <- initializeRedisBroker connInfo
let queueName = _wdQueue wd
_ <- Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
pure ()
{-|
Module : Gargantext.Core.Worker.TOML
Description : Worker TOML file config
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.Core.Worker.TOML where
import Async.Worker.Broker.Types qualified as Broker
import Data.Text qualified as T
import Database.Redis qualified as Redis
import Gargantext.Prelude
import Toml
type WorkerName = Text
data WorkerSettings =
WorkerSettings {
_wsDefinitions :: ![WorkerDefinition]
} deriving (Show, Eq)
data WorkerDefinition =
WorkerDefinition {
_wdName :: !WorkerName
, _wdQueue :: !Broker.Queue
, _wdBroker :: !WorkerBroker
} deriving (Show, Eq)
data WorkerBroker =
WorkerBrokerRedis WorkerRedis
-- TODO Add WorkerBrokerPGMQ
deriving (Show, Eq)
data WorkerRedis =
WorkerRedis {
_wrHost :: !Text
, _wrPort :: !Int
} deriving (Show, Eq)
workerSettingsCodec :: TomlCodec WorkerSettings
workerSettingsCodec = WorkerSettings
<$> Toml.list workerDefinitionCodec "definitions" .= _wsDefinitions
workerDefinitionCodec :: TomlCodec WorkerDefinition
workerDefinitionCodec = WorkerDefinition
<$> Toml.text "name" .= _wdName
<*> Toml.string "queue" .= _wdQueue
<*> Toml.table workerBrokerCodec "broker.redis" .= _wdBroker
workerBrokerCodec :: TomlCodec WorkerBroker
workerBrokerCodec =
Toml.dimatch matchWorkerBrokerRedis WorkerBrokerRedis workerRedisCodec
matchWorkerBrokerRedis :: WorkerBroker -> Maybe WorkerRedis
matchWorkerBrokerRedis (WorkerBrokerRedis wr) = Just wr
workerRedisCodec :: TomlCodec WorkerRedis
workerRedisCodec = WorkerRedis
<$> Toml.text "host" .= _wrHost
<*> Toml.int "port" .= _wrPort
wdToRedisConnectInfo :: WorkerDefinition -> Maybe Redis.ConnectInfo
wdToRedisConnectInfo (WorkerDefinition { _wdBroker = WorkerBrokerRedis (WorkerRedis { .. }) }) =
Just $ Redis.defaultConnectInfo { Redis.connectHost = T.unpack _wrHost
, Redis.connectPort = Redis.PortNumber $ fromIntegral _wrPort }
findDefinitionByName :: WorkerSettings -> WorkerName -> Maybe WorkerDefinition
findDefinitionByName (WorkerSettings { _wsDefinitions }) workerName =
head $ filter (\wd -> _wdName wd == workerName) _wsDefinitions
-- wdToRedisBrokerInitParams :: WorkerDefinition -> Maybe BRedis.RedisBrokerInitParams
-- wdToRedisBrokerInitParams wd = BRedis.RedisBrokerInitParams <$> (wdToRedisConnectInfo wd)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment