[worker] add additional_delay_after_read setting

This prevents multiple workers getting the same job at the moment it
times out.
parent 062f0c68
...@@ -89,6 +89,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -89,6 +89,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _wsDefaultJobTimeout = 60 , _wsDefaultJobTimeout = 60
, _wsLongJobTimeout = 3000 , _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsAdditionalDelayAfterRead = 5
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
_lc_log_level = INFO _lc_log_level = INFO
......
...@@ -162,6 +162,9 @@ default_visibility_timeout = 1 ...@@ -162,6 +162,9 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 0 default_delay = 0
# delay after reading the job, should prevent overlaps for multiple workers
additional_delay_after_read = 15
# default timeout (in seconds) # default timeout (in seconds)
default_job_timeout = 60 default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
......
...@@ -51,6 +51,7 @@ data WorkerSettings = ...@@ -51,6 +51,7 @@ data WorkerSettings =
-- Default delay for jobs. This is useful in tests, so that we can -- Default delay for jobs. This is useful in tests, so that we can
-- get a chance to set up proper watchers for job, given its id -- get a chance to set up proper watchers for job, given its id
, _wsDefaultDelay :: B.TimeoutS , _wsDefaultDelay :: B.TimeoutS
, _wsAdditionalDelayAfterRead :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition] , _wsDefinitions :: ![WorkerDefinition]
} deriving (Show, Eq) } deriving (Show, Eq)
instance FromValue WorkerSettings where instance FromValue WorkerSettings where
...@@ -61,12 +62,14 @@ instance FromValue WorkerSettings where ...@@ -61,12 +62,14 @@ instance FromValue WorkerSettings where
_wsDefaultJobTimeout <- reqKey "default_job_timeout" _wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout" _wsLongJobTimeout <- reqKey "long_job_timeout"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
additionalDelayAfterRead <- reqKey "additional_delay_after_read"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefaultJobTimeout , _wsDefaultJobTimeout
, _wsLongJobTimeout , _wsLongJobTimeout
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay } , _wsDefaultDelay = B.TimeoutS defaultDelay
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead }
instance ToValue WorkerSettings where instance ToValue WorkerSettings where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
...@@ -76,6 +79,7 @@ instance ToTable WorkerSettings where ...@@ -76,6 +79,7 @@ instance ToTable WorkerSettings where
, "long_job_timeout" .= _wsLongJobTimeout , "long_job_timeout" .= _wsLongJobTimeout
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions ]
data WorkerDefinition = data WorkerDefinition =
......
...@@ -19,7 +19,7 @@ module Gargantext.Core.Worker where ...@@ -19,7 +19,7 @@ module Gargantext.Core.Worker where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Broker.Types (toA, getMessage, messageId) import Async.Worker.Broker.Types (toA, getMessage, messageId, setMessageTimeout, TimeoutS(..), getMessageById)
import Async.Worker.Types qualified as W import Async.Worker.Types qualified as W
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens (to) import Control.Lens (to)
...@@ -44,7 +44,7 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..)) ...@@ -44,7 +44,7 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync) import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging) import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers) import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerDefinition(..), wsAdditionalDelayAfterRead)
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Viz.Graph.API (graphRecompute) import Gargantext.Core.Viz.Graph.API (graphRecompute)
...@@ -86,16 +86,42 @@ notifyJobStarted :: HasWorkerBroker ...@@ -86,16 +86,42 @@ notifyJobStarted :: HasWorkerBroker
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
notifyJobStarted env (W.State { name }) bm = do notifyJobStarted env (W.State { name }) bm = do
let mId = messageId bm let msgId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger (env ^. w_env_config . gc_logging) $ \ioL -> withLogger (env ^. w_env_config . gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ T.pack $ "[notifyJobStarted] [" <> name <> " :: " <> show mId <> "] starting job: " <> show j $(logLoc) ioL DEBUG $ T.pack $ "[notifyJobStarted] [" <> name <> " :: " <> show msgId <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markStarted 1 jh runWorkerMonad env $ markStarted 1 jh
-- | Set additional delay, according to worker TOML config. This
-- prevents overlap when there are multiple workers and a job times
-- out: current worker still needs a bit of bookkeeping to do to
-- release it, but PGMQ already exposes that job to another worker.
setAdditionalDelay :: HasWorkerBroker
=> WorkerEnv
-> WState
-> BrokerMessage
-> IO ()
setAdditionalDelay env (W.State { name, broker, queueName }) bm = do
withLogger (env ^. w_env_config . gc_logging) $ \ioL -> do
let msgId = messageId bm
let j = toA $ getMessage bm
let timeoutS = W.jobTimeout j
let additionalDelay = env ^. w_env_config . gc_worker . wsAdditionalDelayAfterRead
$(logLoc) ioL DEBUG $ T.pack $ "[sendAdditionalDelay] [" <> name <> " :: " <> show msgId <> "] Setting delay to: " <> show (TimeoutS timeoutS + additionalDelay)
setMessageTimeout broker queueName msgId (TimeoutS timeoutS + additionalDelay)
mBm' <- getMessageById broker queueName msgId
case mBm' of
Nothing ->
$(logLoc) ioL ERROR $ "[sendAdditionalDelay] no message!"
Just bm' -> do
$(logLoc) ioL DEBUG $ T.pack $ "[sendAdditionalDelay] [" <> name <> " :: " <> show msgId <> "] After setting delay: " <> show bm'
notifyJobFinished :: HasWorkerBroker notifyJobFinished :: HasWorkerBroker
=> WorkerEnv => WorkerEnv
-> WState -> WState
...@@ -213,13 +239,15 @@ performAction :: HasWorkerBroker ...@@ -213,13 +239,15 @@ performAction :: HasWorkerBroker
-> WState -> WState
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
performAction env _state bm = do performAction env s bm = do
let job' = toA $ getMessage bm let job' = toA $ getMessage bm
let job = W.job job' let job = W.job job'
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
setAdditionalDelay env s bm
case job of case job of
Ping -> runWorkerMonad env $ do Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping" $(logLocM) DEBUG "[performAction] ping"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment