Verified Commit 8d5fd6e9 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

Merge branch '238-dev-async-job-worker' of...

Merge branch '238-dev-async-job-worker' of ssh://gitlab.iscpif.fr:20022/gargantext/haskell-gargantext into 238-dev-async-job-worker
parents f822d404 a81bb4ef
Pipeline #6816 failed with stages
in 38 minutes and 36 seconds
...@@ -81,8 +81,9 @@ data CLIRoutes ...@@ -81,8 +81,9 @@ data CLIRoutes
deriving (Show, Eq) deriving (Show, Eq)
data WorkerArgs = WorkerArgs data WorkerArgs = WorkerArgs
{ worker_toml :: !SettingsFile { worker_toml :: !SettingsFile
, worker_name :: !Text , worker_name :: !Text
, worker_run_single :: !Bool
} deriving (Show, Eq) } deriving (Show, Eq)
data CLICmd data CLICmd
......
...@@ -19,7 +19,7 @@ import Data.Text qualified as T ...@@ -19,7 +19,7 @@ import Data.Text qualified as T
import Gargantext.Core.Config (hasConfig, _gc_worker) import Gargantext.Core.Config (hasConfig, _gc_worker)
import Gargantext.Core.Config.Types (SettingsFile(..)) import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Worker (WorkerDefinition(..), WorkerSettings(..), findDefinitionByName) import Gargantext.Core.Config.Worker (WorkerDefinition(..), WorkerSettings(..), findDefinitionByName)
import Gargantext.Core.Worker (withPGMQWorker) import Gargantext.Core.Worker (withPGMQWorker, withPGMQWorkerSingle)
import Gargantext.Core.Worker.Env (withWorkerEnv) import Gargantext.Core.Worker.Env (withWorkerEnv)
import Gargantext.Core.Worker.Jobs (sendJob) import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(Ping)) import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
...@@ -56,9 +56,13 @@ workerCLI (WorkerArgs { .. }) = do ...@@ -56,9 +56,13 @@ workerCLI (WorkerArgs { .. }) = do
Just wd -> do Just wd -> do
putStrLn ("Starting worker '" <> worker_name <> "'") putStrLn ("Starting worker '" <> worker_name <> "'")
putStrLn ("Worker settings: " <> show ws :: Text) putStrLn ("Worker settings: " <> show ws :: Text)
withPGMQWorker env wd $ \a _state -> do if worker_run_single then
runReaderT (sendJob Ping) env withPGMQWorkerSingle env wd $ \a _state -> do
wait a wait a
else
withPGMQWorker env wd $ \a _state -> do
runReaderT (sendJob Ping) env
wait a
workerCmd :: HasCallStack => Mod CommandFields CLI workerCmd :: HasCallStack => Mod CommandFields CLI
...@@ -70,4 +74,6 @@ worker_p = fmap CCMD_worker $ WorkerArgs ...@@ -70,4 +74,6 @@ worker_p = fmap CCMD_worker $ WorkerArgs
<*> strOption ( long "name" <*> strOption ( long "name"
<> metavar "STRING" <> metavar "STRING"
<> help "Worker name, as defined in the .toml file" ) <> help "Worker name, as defined in the .toml file" )
<*> flag False True ( long "run-single"
<> help "Whether to loop or run a single job from queue" )
...@@ -196,7 +196,7 @@ source-repository-package ...@@ -196,7 +196,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 58ab07e0110281f94ecc8840b8cd0c0a9081b672 tag: d783198e1b7ca8a61e5332965db468da3faef796
source-repository-package source-repository-package
type: git type: git
......
...@@ -64,7 +64,28 @@ withPGMQWorker env (WorkerDefinition { .. }) cb = do ...@@ -64,7 +64,28 @@ withPGMQWorker env (WorkerDefinition { .. }) cb = do
withAsync (Worker.run state') (\a -> cb a state') withAsync (Worker.run state') (\a -> cb a state')
withPGMQWorkerSingle :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorkerSingle env (WorkerDefinition { .. }) cb = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withAsync (Worker.runSingle state') (\a -> cb a state')
-- | How the worker should process jobs
performAction :: (HasWorkerBroker b Job) performAction :: (HasWorkerBroker b Job)
=> WorkerEnv => WorkerEnv
-> Worker.State b Job -> Worker.State b Job
...@@ -91,4 +112,4 @@ performAction env _state bm = do ...@@ -91,4 +112,4 @@ performAction env _state bm = do
liftBase $ putStrLn ("new node async " :: Text) liftBase $ putStrLn ("new node async " :: Text)
void $ postNode' _nna_authenticatedUser _nna_node_id _nna_postNode void $ postNode' _nna_authenticatedUser _nna_node_id _nna_postNode
return () return ()
GargJob { _gj_garg_job } -> putStrLn ("Garg job: " <> show _gj_garg_job :: Text) GargJob { _gj_garg_job } -> putStrLn ("Garg job: " <> show _gj_garg_job <> " (handling of this job is still not implemented!)" :: Text)
...@@ -17,6 +17,7 @@ import Async.Worker.Broker.PGMQ (PGMQBroker) ...@@ -17,6 +17,7 @@ import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as Worker import Async.Worker qualified as Worker
import Async.Worker.Types (HasWorkerBroker) import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view) import Control.Lens (view)
import Gargantext.API.Admin.EnvTypes qualified as EnvTypes
import Gargantext.Core.Config (gc_worker, HasConfig(..)) import Gargantext.Core.Config (gc_worker, HasConfig(..))
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
...@@ -40,3 +41,11 @@ sendJob job = do ...@@ -40,3 +41,11 @@ sendJob job = do
b <- initBrokerWithDBCreate gcConfig b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd let queueName = _wdQueue wd
void $ Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job void $ Worker.sendJob' $ Worker.mkDefaultSendJob' b queueName job
-- | This is just a list of what's implemented and what not.
-- After we migrate to async workers, this should be removed
-- (see G.C.Worker -> performAction on what's implemented already)
handledJobs :: [ EnvTypes.GargJob ]
handledJobs = [ EnvTypes.AddCorpusQueryJob
, EnvTypes.ForgotPasswordJob ]
...@@ -22,22 +22,19 @@ module Gargantext.Utils.Jobs ( ...@@ -22,22 +22,19 @@ module Gargantext.Utils.Jobs (
, markFailedNoErr , markFailedNoErr
) where ) where
import Control.Monad.Except ( runExceptT )
import Control.Monad.Reader ( MonadReader(ask), ReaderT(runReaderT) )
import Data.Aeson (ToJSON)
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.EnvTypes ( mkJobHandle, parseGargJob, Env, GargJob(..) ) import Gargantext.API.Admin.EnvTypes ( mkJobHandle, parseGargJob, Env, GargJob(..) )
import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) ) import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) )
import Gargantext.API.Prelude ( GargM ) import Gargantext.API.Prelude ( GargM )
import Gargantext.Core.Worker.Jobs qualified as Jobs import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Prelude
import Gargantext.System.Logging import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal qualified as Internal import Gargantext.Utils.Jobs.Internal qualified as Internal
import Gargantext.Utils.Jobs.Monad ( JobError, MonadJobStatus(..), markFailureNoErr, markFailedNoErr ) import Gargantext.Utils.Jobs.Monad ( JobError, MonadJobStatus(..), markFailureNoErr, markFailedNoErr )
import Prelude -- import Prelude
import Servant.Job.Async qualified as SJ import Servant.Job.Async qualified as SJ
import System.Directory (doesFileExist) import System.Directory (doesFileExist)
import Text.Read (readMaybe)
jobErrorToGargError jobErrorToGargError
...@@ -62,19 +59,21 @@ serveJobsAPI ...@@ -62,19 +59,21 @@ serveJobsAPI
serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do
runExceptT $ flip runReaderT env $ do runExceptT $ flip runReaderT env $ do
$(logLocM) INFO (T.pack $ "Running job of type: " ++ show jobType) $(logLocM) INFO (T.pack $ "Running job of type: " ++ show jobType)
Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType } unless (jobType `elem` Jobs.handledJobs) $
Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType }
f jHandle i f jHandle i
getLatestJobStatus jHandle getLatestJobStatus jHandle
parsePrios :: [String] -> IO [(GargJob, Int)] parsePrios :: [Text] -> IO [(GargJob, Int)]
parsePrios [] = pure [] parsePrios [] = pure []
parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs parsePrios (x : xs) = (:) <$> go (T.unpack x) <*> parsePrios xs
where go s = case break (=='=') s of where
([], _) -> error "parsePrios: empty jobname?" go s = case break (=='=') s of
([], _) -> errorTrace "parsePrios: empty jobname?"
(prop, valS) (prop, valS)
| Just val <- readMaybe (tail valS) | Just val <- readMaybe (T.tail $ T.pack valS)
, Just j <- parseGargJob (T.pack prop) -> pure (j, val) , Just j <- parseGargJob (T.pack prop) -> pure (j, val)
| otherwise -> error $ | otherwise -> errorTrace $
"parsePrios: invalid input. " ++ show (prop, valS) "parsePrios: invalid input. " ++ show (prop, valS)
readPrios :: Logger IO -> FilePath -> IO [(GargJob, Int)] readPrios :: Logger IO -> FilePath -> IO [(GargJob, Int)]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment