[worker] implement safe kill, more improvements

parent 49a90269
Pipeline #6914 failed with stages
in 14 minutes and 47 seconds
......@@ -21,7 +21,7 @@ import Data.Text qualified as T
import Gargantext.Core.Config (hasConfig, gc_worker)
import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Worker (WorkerDefinition(..), WorkerSettings(..), findDefinitionByName)
import Gargantext.Core.Worker (withPGMQWorker, withPGMQWorkerSingle, initWorkerState)
import Gargantext.Core.Worker (withPGMQWorkerCtrlC, withPGMQWorkerSingleCtrlC, initWorkerState)
import Gargantext.Core.Worker.Env (withWorkerEnv)
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
......@@ -59,10 +59,10 @@ workerCLI (CLIW_run (WorkerArgs { .. })) = do
putText $ "Worker settings: " <> show ws
___
if worker_run_single then
withPGMQWorkerSingle env wd $ \a _state -> do
withPGMQWorkerSingleCtrlC env wd $ \a _state -> do
wait a
else
withPGMQWorker env wd $ \a _state -> do
withPGMQWorkerCtrlC env wd $ \a _state -> do
_ <- runReaderT (sendJob Ping) env
wait a
workerCLI (CLIW_stats (WorkerStatsArgs { .. })) = do
......
......@@ -110,7 +110,7 @@ source-repository-package
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/crawlers/openalex.git
tag: 4eec15855207dc74afc75b94c3764eede4de7b55
tag: 8249a40ff1ba885af45d3958f113af5b8a64c4ac
source-repository-package
type: git
......
......@@ -617,6 +617,8 @@ library
, tuple ^>= 0.3.0.2
, unordered-containers ^>= 0.2.16.0
, unicode-collation >= 0.1.3.5
-- needed for Worker / System.Posix.Signals
, unix >= 2.7.3 && < 2.9
, uri-encode ^>= 1.5.0.7
, utf8-string ^>= 1.0.2
, uuid ^>= 1.3.15
......
......@@ -207,6 +207,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
$(logLocM) DEBUG $ "[addToCorpusWithQuery] corpus id " <> show corpusId
_ <- commitCorpus cid user
$(logLocM) DEBUG $ "[addToCorpusWithQuery] corpus comitted"
-- printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
......
......@@ -24,7 +24,7 @@ broker-agnostic.
module Gargantext.Core.Config.Worker where
import Async.Worker.Broker.Types qualified as Broker
import Async.Worker.Broker.Types qualified as B
import Database.PGMQ.Types qualified as PGMQ
import Database.PostgreSQL.Simple qualified as PGS
import Gargantext.Core.Config.Types (unTOMLConnectInfo, TOMLConnectInfo(..))
......@@ -64,19 +64,19 @@ instance ToTable WorkerSettings where
data WorkerDefinition =
WorkerDefinition {
_wdName :: !WorkerName
, _wdQueue :: !Broker.Queue
, _wdQueue :: !B.Queue
} deriving (Show, Eq)
instance FromValue WorkerDefinition where
fromValue = parseTableFromValue $ do
_wdName <- reqKey "name"
queue <- reqKey "queue"
return $ WorkerDefinition { _wdQueue = Broker.Queue queue, .. }
return $ WorkerDefinition { _wdQueue = B.Queue queue, .. }
instance ToValue WorkerDefinition where
toValue = defaultTableToValue
instance ToTable WorkerDefinition where
toTable (WorkerDefinition { .. }) =
table [ "name" .= _wdName
, "queue" .= Broker._Queue _wdQueue ]
, "queue" .= B._Queue _wdQueue ]
findDefinitionByName :: WorkerSettings -> WorkerName -> Maybe WorkerDefinition
findDefinitionByName (WorkerSettings { _wsDefinitions }) workerName =
......
......@@ -63,7 +63,7 @@ gServer (NotificationsConfig { .. }) = do
forever $ do
-- putText "[central_exchange] receiving"
r <- recv s
logMsg ioLogger DEBUG $ "[central_exchange] received: " <> show r
logMsg ioLogger DDEBUG $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
where
......@@ -110,7 +110,7 @@ notify (NotificationsConfig { _nc_central_exchange_connect }) ceMessage = do
_ <- connect s $ T.unpack _nc_central_exchange_connect
let str = Aeson.encode ceMessage
withLogger () $ \ioLogger ->
logMsg ioLogger DEBUG $ "[central_exchange] sending: " <> (T.unpack $ TE.decodeUtf8 $ BSL.toStrict str)
logMsg ioLogger DDEBUG $ "[central_exchange] sending: " <> (T.unpack $ TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err
void $ timeout 100_000 $ send s $ BSL.toStrict str
......@@ -36,7 +36,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS
import Servant.Job.Types (job_id)
......@@ -182,7 +182,7 @@ sendNotification throttleTChan ceMessage sub = do
sendDataMessageThrottled :: (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled (conn, msg) = do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
logMsg ioL DDEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
WS.sendDataMessage conn msg
......
......@@ -19,8 +19,8 @@ module Gargantext.Core.Worker where
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types (BrokerMessage, toA, getMessage, messageId)
import Async.Worker qualified as Worker
import Async.Worker.Types qualified as Worker
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
......@@ -39,36 +39,35 @@ import Gargantext.Prelude
import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, markFailed))
import System.Posix.Signals (Handler(Catch), installHandler, keyboardSignal)
initWorkerState :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> IO (Worker.State PGMQBroker Job)
-> IO (W.State PGMQBroker Job)
initWorkerState env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
pure $ Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Just $ notifyJobStarted env
, onJobFinish = Just $ notifyJobFinished env
, onJobTimeout = Just $ notifyJobTimeout env
, onJobError = Just $ notifyJobFailed env
-- TODO Implement Ctrl-C, notify job killed
, onWorkerKilledSafely = Nothing }
pure $ W.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Just $ notifyJobStarted env
, onJobFinish = Just $ notifyJobFinished env
, onJobTimeout = Just $ notifyJobTimeout env
, onJobError = Just $ notifyJobFailed env
, onWorkerKilledSafely = Just $ notifyJobKilled env }
notifyJobStarted :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> IO ()
notifyJobStarted env (Worker.State { name }) bm = do
notifyJobStarted env (W.State { name }) bm = do
let j = toA $ getMessage bm
let job = Worker.job j
let job = W.job j
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobStarted] [" <> name <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
......@@ -78,12 +77,12 @@ notifyJobStarted env (Worker.State { name }) bm = do
notifyJobFinished :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> IO ()
notifyJobFinished env (Worker.State { name }) bm = do
notifyJobFinished env (W.State { name }) bm = do
let j = toA $ getMessage bm
let job = Worker.job j
let job = W.job j
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobFinished] [" <> name <> "] finished job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
......@@ -93,12 +92,12 @@ notifyJobFinished env (Worker.State { name }) bm = do
notifyJobTimeout :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> IO ()
notifyJobTimeout env (Worker.State { name }) bm = do
notifyJobTimeout env (W.State { name }) bm = do
let j = toA $ getMessage bm
let job = Worker.job j
let job = W.job j
withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobTimeout] [" <> name <> "] job timed out: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
......@@ -106,15 +105,15 @@ notifyJobTimeout env (Worker.State { name }) bm = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job timed out!") jh
notifyJobFailed :: (HasWorkerBroker PGMQBroker Job)
notifyJobFailed :: (HasWorkerBroker PGMQBroker Job, HasCallStack)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> SomeException
-> IO ()
notifyJobFailed env (Worker.State { name }) bm exc = do
notifyJobFailed env (W.State { name }) bm exc = do
let j = toA $ getMessage bm
let job = Worker.job j
let job = W.job j
withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobFailed] [" <> name <> "] failed job: " <> show j <> " --- ERROR: " <> show exc
let ji = JobInfo { _ji_message_id = messageId bm
......@@ -122,6 +121,22 @@ notifyJobFailed env (Worker.State { name }) bm exc = do
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText "Worker job failed") jh
notifyJobKilled :: (HasWorkerBroker PGMQBroker Job, HasCallStack)
=> WorkerEnv
-> W.State PGMQBroker Job
-> Maybe (BrokerMessage PGMQBroker (W.Job Job))
-> IO ()
notifyJobKilled _ _ Nothing = pure ()
notifyJobKilled env (W.State { name }) (Just bm) = do
let j = toA $ getMessage bm
let job = W.job j
withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobKilled] [" <> name <> "] failed job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markFailed (Just $ UnsafeMkHumanFriendlyErrorText $ "Worker '" <> T.pack name <> "' was killed") jh
-- | Spawn a worker with PGMQ broker
-- TODO:
......@@ -132,33 +147,56 @@ notifyJobFailed env (Worker.State { name }) bm exc = do
withPGMQWorker :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorker env wd cb = do
state' <- initWorkerState env wd
withAsync (Worker.run state') (\a -> cb a state')
withAsync (W.run state') (\a -> cb a state')
withPGMQWorkerSingle :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorkerSingle env wd cb = do
state' <- initWorkerState env wd
withAsync (Worker.runSingle state') (\a -> cb a state')
withAsync (W.runSingle state') (\a -> cb a state')
withPGMQWorkerCtrlC :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorkerCtrlC env wd cb = do
withPGMQWorker env wd $ \a state' -> do
let tid = asyncThreadId a
_ <- installHandler keyboardSignal (Catch (throwTo tid W.KillWorkerSafely)) Nothing
cb a state'
withPGMQWorkerSingleCtrlC :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorkerSingleCtrlC env wd cb = do
withPGMQWorkerSingle env wd $ \a state' -> do
let tid = asyncThreadId a
_ <- installHandler keyboardSignal (Catch (throwTo tid W.KillWorkerSafely)) Nothing
cb a state'
-- | How the worker should process jobs
performAction :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> Worker.State PGMQBroker Job
-> BrokerMessage PGMQBroker (Worker.Job Job)
-> W.State PGMQBroker Job
-> BrokerMessage PGMQBroker (W.Job Job)
-> IO ()
performAction env _state bm = do
let job' = toA $ getMessage bm
let job = Worker.job job'
let job = W.job job'
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
......
......@@ -15,7 +15,7 @@ where
import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (Broker, initBroker)
import Async.Worker.Types qualified as WorkerT
import Async.Worker.Types qualified as W
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PSQL
......@@ -29,9 +29,9 @@ import Gargantext.Prelude
-- | Create DB if not exists, then run 'initBroker' (which, in
-- particular, creates the pgmq extension, if needed)
initBrokerWithDBCreate :: (WorkerT.HasWorkerBroker PGMQBroker Job)
initBrokerWithDBCreate :: (W.HasWorkerBroker PGMQBroker Job)
=> GargConfig
-> IO (Broker PGMQBroker (WorkerT.Job Job))
-> IO (Broker PGMQBroker (W.Job Job))
initBrokerWithDBCreate gc@(GargConfig { _gc_database_config }) = do
-- By using gargantext db credentials, we create pgmq db (if needed)
let WorkerSettings { .. } = gc ^. gc_worker
......
......@@ -138,7 +138,7 @@ instance CET.HasCentralExchangeNotification WorkerEnv where
c <- asks (view $ to _w_env_config)
liftBase $ do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[ce_notify]: " <> show (_gc_notifications_config c) <> " :: " <> show m
logMsg ioL DDEBUG $ "[ce_notify]: " <> show (_gc_notifications_config c) <> " :: " <> show m
CE.notify (_gc_notifications_config c) m
---------
......
......@@ -15,7 +15,7 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker.Broker.Types (MessageId)
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as Worker
import Async.Worker qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view)
import Gargantext.API.Admin.EnvTypes qualified as EnvTypes
......@@ -42,13 +42,13 @@ sendJob job = do
Just wd -> liftBase $ do
b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd
Worker.sendJob' $ Worker.mkDefaultSendJob b queueName job (jobTimeout job)
W.sendJob' $ updateJobData job $ W.mkDefaultSendJob' b queueName job
-- | Some predefined job timeouts (in seconds)
jobTimeout :: Job -> Int
jobTimeout (AddCorpusFormAsync {}) = 300
jobTimeout (AddCorpusWithQuery {}) = 3000
jobTimeout _ = 10
-- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> W.SendJob PGMQBroker Job -> W.SendJob PGMQBroker Job
updateJobData (AddCorpusFormAsync {}) sj = sj { W.timeout = 300 }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 }
updateJobData _ sj = sj
-- | This is just a list of what's implemented and what not.
......
......@@ -10,6 +10,8 @@ Portability : POSIX
-}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Database.Action.Mail
where
......@@ -22,13 +24,15 @@ import Gargantext.Database.Prelude (CmdM)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Schema.User
import Gargantext.Prelude
import Gargantext.System.Logging (MonadLogger, LogLevel(..), logLocM)
------------------------------------------------------------------------
sendMail :: (HasNodeError err, CmdM env err m) => User -> m ()
sendMail :: (HasNodeError err, CmdM env err m, MonadLogger m) => User -> m ()
sendMail u = do
cfg <- view $ mailSettings
userLight <- getUserLightDB u
$(logLocM) DEBUG $ "[sendMail] sending mail to user " <> show userLight
mail cfg (MailInfo { mailInfo_username = userLight_username userLight
, mailInfo_address = userLight_email userLight
}
......
......@@ -122,12 +122,12 @@ class HasNodeError e where
_NodeError :: Prism' e NodeError
errorWith :: ( MonadError e m
, HasNodeError e)
, HasNodeError e)
=> Text -> m a
errorWith x = nodeError (NodeError $ toException $ userError $ T.unpack x)
nodeError :: ( MonadError e m
, HasNodeError e)
, HasNodeError e )
=> NodeError -> m a
nodeError ne = throwError $ _NodeError # ne
......
......@@ -26,8 +26,10 @@ import Text.Read (readMaybe)
data LogLevel =
-- | Detailed debug messages
DDEBUG
-- | Debug messages
DEBUG
| DEBUG
-- | Information
| INFO
-- | Normal runtime conditions
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment