{-| Module : CLI.Worker Description : Gargantext Job Worker Copyright : (c) CNRS, 2017-Present License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX -} module CLI.Worker where import Async.Worker.Broker.Types qualified as BT import Async.Worker.Types qualified as W import CLI.Types import CLI.Parsers import Control.Concurrent.Async (forConcurrently_) import Data.List qualified as List (cycle, concat, take) import Data.Text qualified as T import Gargantext.Core.Config (hasConfig, gc_worker, gc_logging) import Gargantext.Core.Config.Types (SettingsFile(..)) import Gargantext.Core.Config.Utils (readConfig) import Gargantext.Core.Config.Worker (WorkerDefinition(..), WorkerSettings(..), findDefinitionByName) import Gargantext.Core.Worker (withPGMQWorkerCtrlC, withPGMQWorkerSingleCtrlC, initWorkerState) import Gargantext.Core.Worker.Env (withWorkerEnv) -- import Gargantext.Core.Worker.Jobs (sendJob) -- import Gargantext.Core.Worker.Jobs.Types (Job(Ping)) import Gargantext.Prelude import Gargantext.System.Logging (withLogger, logMsg, LogLevel(..), Logger) import Options.Applicative import Prelude qualified -- TODO Command to monitor queues workerCLI :: CLIWorker -> IO () workerCLI (CLIW_run (WorkerArgs { .. })) = do let ___ = putStrLn ((List.concat $ List.take 72 $ List.cycle ["_"]) :: Prelude.String) withWorkerEnv worker_toml $ \env -> do let log_cfg = env ^. hasConfig . gc_logging withLogger log_cfg $ \ioLogger -> do ___ logMsg ioLogger INFO "GarganText worker" logMsg ioLogger INFO $ "worker_name: " <> T.unpack worker_name logMsg ioLogger INFO $ "worker toml: " <> _SettingsFile worker_toml ___ let ws = env ^. hasConfig . gc_worker case findDefinitionByName ws worker_name of Nothing -> do let workerNames = _wdName <$> (_wsDefinitions ws) let availableWorkers = T.intercalate ", " workerNames putText $ "Worker definition not found! Available workers: " <> availableWorkers Just wd -> do logMsg ioLogger INFO $ "Starting worker '" <> T.unpack worker_name <> "'" logMsg ioLogger DEBUG $ "gc config: " <> show (env ^. hasConfig) logMsg ioLogger DEBUG $ "Worker settings: " <> show ws ___ if worker_run_single then withPGMQWorkerSingleCtrlC env wd $ \a _state -> do wait a else withPGMQWorkerCtrlC env wd $ \a _state -> do -- _ <- runReaderT (sendJob Ping) env wait a workerCLI (CLIW_runAll (WorkerAllArgs { .. })) = withWorkerEnv worker_toml $ \env -> do let log_cfg = env ^. hasConfig . gc_logging withLogger log_cfg $ \ioLogger -> runAllWorkers ioLogger worker_toml workerCLI (CLIW_stats (WorkerStatsArgs { .. })) = do putStrLn ("worker toml: " <> _SettingsFile ws_toml) withWorkerEnv ws_toml $ \env -> do let ws = env ^. hasConfig . gc_worker mapM_ (\wd -> do state' <- initWorkerState env wd let b = W.broker state' let q = W.queueName state' qs <- BT.getQueueSize b q msgIds <- BT.listPendingMessageIds b q putStrLn ("Queue: " <> show q <> ", size: " <> show qs :: Text) putStrLn (" Messages: " :: Text) mapM_ (\msgId -> do mm <- BT.getMessageById b q msgId case mm of Nothing -> putStrLn (" - " <> show msgId <> " :: NOTHING!" :: Text) Just m -> putStrLn (" - " <> show m :: Text) ) msgIds ) (_wsDefinitions ws) workerCmd :: HasCallStack => Mod CommandFields CLI workerCmd = command "worker" (info (helper <*> (fmap CLISub $ fmap CCMD_worker workerParser)) (progDesc "Gargantext worker.")) workerParser :: Parser CLIWorker workerParser = hsubparser ( command "run" (info (helper <*> worker_p) (progDesc "Run a single worker")) <> command "run-all" (info (helper <*> worker_all_p) (progDesc "Run all worker definitions")) <> command "stats" (info (helper <*> stats_p) (progDesc "Print queue stats")) ) worker_p :: Parser CLIWorker worker_p = fmap CLIW_run $ WorkerArgs <$> settings_p <*> strOption ( long "name" <> metavar "STRING" <> help "Worker name, as defined in the .toml file" ) <*> flag False True ( long "run-single" <> help "Whether to loop or run a single job from queue" ) worker_all_p :: Parser CLIWorker worker_all_p = fmap CLIW_runAll $ WorkerAllArgs <$> settings_p stats_p :: Parser CLIWorker stats_p = fmap CLIW_stats $ WorkerStatsArgs <$> settings_p -- | Runs all the workers concurrently. -- /NOTE/: Be very careful, this IS a BLOCKING operation, despite its usage -- of 'forConcurrently_' under the hood. In particular 'forConcurrently_' will -- execute the inner action in parallel discarding the results, but the inner -- action has still to terminate! -- That is /NOT/ the case for this function, which is meant to start the infinite -- loop for the workers, so beware when using this, make sure that the calling -- code is using this properly (for example along the use of 'race' or a similar -- function from async). runAllWorkers :: Logger IO -> SettingsFile -> IO () runAllWorkers ioLogger worker_toml = do cfg <- readConfig worker_toml let ws = cfg ^. gc_worker forConcurrently_ (_wsDefinitions ws) $ \wd -> do withWorkerEnv worker_toml $ \env -> do logMsg ioLogger INFO $ "Starting worker '" <> T.unpack (_wdName wd) <> "' (queue " <> show (_wdQueue wd) <> ")" withPGMQWorkerCtrlC env wd $ \a _state -> do wait a