[worker] timeout fixed moved to upstream haskell-bee

parent d49f85f9
Pipeline #7903 failed with stages
in 14 minutes and 42 seconds
...@@ -16,8 +16,8 @@ fi ...@@ -16,8 +16,8 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and # with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI # `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in. # cache can kick in.
expected_cabal_project_hash="eb8fdb1a14aa2f7a13f565cf7fa9f6ab0e2dab9212538aed0db5691015be286b" expected_cabal_project_hash="5d4b397d8782f0f0bb8422190f5c83ff247882acd9cdd4aba8b7684c01675a1b"
expected_cabal_project_freeze_hash="a5eb1d9a331266fef56f490712decbd3eaff1fd0daa8bc63f893238a7f47df93" expected_cabal_project_freeze_hash="91775b174f065d00f22b8265d89d6c0b501e8fb7b0fd8d1b4b2f72ee5578a9f7"
cabal --store-dir=$STORE_DIR v2-build --dry-run cabal --store-dir=$STORE_DIR v2-build --dry-run
......
...@@ -222,7 +222,7 @@ constraints: any.Boolean ==0.2.4, ...@@ -222,7 +222,7 @@ constraints: any.Boolean ==0.2.4,
hashable +integer-gmp -random-initial-seed, hashable +integer-gmp -random-initial-seed,
any.hashtables ==1.4.2, any.hashtables ==1.4.2,
hashtables -bounds-checking -debug -detailed-profiling -portable -sse42 +unsafe-tricks, hashtables -bounds-checking -debug -detailed-profiling -portable -sse42 +unsafe-tricks,
any.haskell-bee ==0.1.0.0, any.haskell-bee ==0.1.2.0,
any.haskell-bee-pgmq ==0.1.0.0, any.haskell-bee-pgmq ==0.1.0.0,
any.haskell-bee-tests ==0.1.0.0, any.haskell-bee-tests ==0.1.0.0,
any.haskell-igraph ==0.10.4.1, any.haskell-igraph ==0.10.4.1,
......
...@@ -19,7 +19,7 @@ module Gargantext.Core.Worker where ...@@ -19,7 +19,7 @@ module Gargantext.Core.Worker where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Broker.Types (toA, getMessage, messageId, setMessageTimeout, TimeoutS(..), getMessageById) import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker.Types qualified as W import Async.Worker.Types qualified as W
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens (to) import Control.Lens (to)
...@@ -44,7 +44,7 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..)) ...@@ -44,7 +44,7 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync) import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging) import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers) import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..), wsAdditionalDelayAfterRead) import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Viz.Graph.API (graphRecompute) import Gargantext.Core.Viz.Graph.API (graphRecompute)
...@@ -96,32 +96,6 @@ notifyJobStarted env (W.State { name }) bm = do ...@@ -96,32 +96,6 @@ notifyJobStarted env (W.State { name }) bm = do
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
runWorkerMonad env $ markStarted 1 jh runWorkerMonad env $ markStarted 1 jh
-- | Set additional delay, according to worker TOML config. This
-- prevents overlap when there are multiple workers and a job times
-- out: current worker still needs a bit of bookkeeping to do to
-- release it, but PGMQ already exposes that job to another worker.
setAdditionalDelay :: HasWorkerBroker
=> WorkerEnv
-> WState
-> BrokerMessage
-> IO ()
setAdditionalDelay env (W.State { name, broker, queueName }) bm = do
withLogger (env ^. w_env_config . gc_logging) $ \ioL -> do
let msgId = messageId bm
let j = toA $ getMessage bm
let timeoutS = W.jobTimeout j
let additionalDelay = env ^. w_env_config . gc_worker . wsAdditionalDelayAfterRead
$(logLoc) ioL DEBUG $ T.pack $ "[sendAdditionalDelay] [" <> name <> " :: " <> show msgId <> "] Setting delay to: " <> show (TimeoutS timeoutS + additionalDelay)
setMessageTimeout broker queueName msgId (TimeoutS timeoutS + additionalDelay)
mBm' <- getMessageById broker queueName msgId
case mBm' of
Nothing ->
$(logLoc) ioL ERROR $ "[sendAdditionalDelay] no message!"
Just bm' -> do
$(logLoc) ioL DEBUG $ T.pack $ "[sendAdditionalDelay] [" <> name <> " :: " <> show msgId <> "] After setting delay: " <> show bm'
notifyJobFinished :: HasWorkerBroker notifyJobFinished :: HasWorkerBroker
=> WorkerEnv => WorkerEnv
-> WState -> WState
...@@ -239,15 +213,13 @@ performAction :: HasWorkerBroker ...@@ -239,15 +213,13 @@ performAction :: HasWorkerBroker
-> WState -> WState
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
performAction env s bm = do performAction env _s bm = do
let job' = toA $ getMessage bm let job' = toA $ getMessage bm
let job = W.job job' let job = W.job job'
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
setAdditionalDelay env s bm
case job of case job of
Ping -> runWorkerMonad env $ do Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping" $(logLocM) DEBUG "[performAction] ping"
......
...@@ -15,10 +15,11 @@ module Gargantext.Core.Worker.Jobs where ...@@ -15,10 +15,11 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Broker.Types qualified as B
import Async.Worker.Types qualified as WT import Async.Worker.Types qualified as WT
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..), wsAdditionalDelayAfterRead)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Jobs.Types (Job(..)) import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob) import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob)
...@@ -45,7 +46,9 @@ sendJobWithCfg gcConfig job = do ...@@ -45,7 +46,9 @@ sendJobWithCfg gcConfig job = do
Just wd -> do Just wd -> do
b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd let queueName = _wdQueue wd
let addDelayAfterRead = gcConfig ^. gc_worker . wsAdditionalDelayAfterRead
let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay
, W.addDelayAfterRead = B._TimeoutS addDelayAfterRead
, W.toStrat = WT.TSDelete } , W.toStrat = WT.TSDelete }
withLogger (gcConfig ^. gc_logging) $ \ioL -> withLogger (gcConfig ^. gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" $(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
......
...@@ -176,15 +176,15 @@ ...@@ -176,15 +176,15 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git" git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs: subdirs:
- "gargantext-graph-core" - "gargantext-graph-core"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-pgmq/" - "haskell-bee-pgmq/"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-tests/" - "haskell-bee-tests/"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee/" - "haskell-bee/"
...@@ -369,7 +369,7 @@ flags: ...@@ -369,7 +369,7 @@ flags:
gargantext: gargantext:
"enable-benchmarks": false "enable-benchmarks": false
"no-phylo-debug-logs": true "no-phylo-debug-logs": true
"test-crypto": true "test-crypto": false
graphviz: graphviz:
"test-parsing": false "test-parsing": false
hashable: hashable:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment