Commit cdc08517 authored by Fabien Maniere's avatar Fabien Maniere

Merge branch 'dev-worker-fixes' into 'dev'

Dev worker fixes

See merge request !447
parents 9dd68abe 58be4f8a
Pipeline #7908 canceled with stages
...@@ -79,9 +79,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -79,9 +79,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _f_searx_url = _gc_frame_searx_url , _f_searx_url = _gc_frame_searx_url
, _f_istex_url = _gc_frame_istex_url } , _f_istex_url = _gc_frame_istex_url }
, _gc_jobs = CTypes.JobsConfig { _jc_max_docs_parsers = _gc_max_docs_parsers , _gc_jobs = CTypes.JobsConfig { _jc_max_docs_parsers = _gc_max_docs_parsers
, _jc_max_docs_scrapers = _gc_max_docs_scrapers , _jc_max_docs_scrapers = _gc_max_docs_scrapers }
, _jc_js_job_timeout = _gc_js_job_timeout
, _jc_js_id_timeout = _gc_js_id_timeout }
, _gc_apis = CTypes.APIsConfig { _ac_epo_api_url = _gc_epo_api_url , _gc_apis = CTypes.APIsConfig { _ac_epo_api_url = _gc_epo_api_url
, _ac_scrapyd_url } , _ac_scrapyd_url }
, _gc_worker = WorkerSettings { _wsDefinitions = [ wd ] , _gc_worker = WorkerSettings { _wsDefinitions = [ wd ]
...@@ -89,6 +87,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -89,6 +87,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _wsDefaultJobTimeout = 60 , _wsDefaultJobTimeout = 60
, _wsLongJobTimeout = 3000 , _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsAdditionalDelayAfterRead = 5
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
_lc_log_level = INFO _lc_log_level = INFO
......
...@@ -16,8 +16,8 @@ fi ...@@ -16,8 +16,8 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and # with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI # `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in. # cache can kick in.
expected_cabal_project_hash="eb8fdb1a14aa2f7a13f565cf7fa9f6ab0e2dab9212538aed0db5691015be286b" expected_cabal_project_hash="a937358694443ac19fd1e16627f071eda308b8b7bbaa5391e657b2f4c6570a5b"
expected_cabal_project_freeze_hash="a5eb1d9a331266fef56f490712decbd3eaff1fd0daa8bc63f893238a7f47df93" expected_cabal_project_freeze_hash="91775b174f065d00f22b8265d89d6c0b501e8fb7b0fd8d1b4b2f72ee5578a9f7"
cabal --store-dir=$STORE_DIR v2-build --dry-run cabal --store-dir=$STORE_DIR v2-build --dry-run
......
...@@ -156,7 +156,7 @@ source-repository-package ...@@ -156,7 +156,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 05c39e424d15149dc32097b3318cb6007e0e7052 tag: c00a600b646e10a41ef71befd98dcc578e83fd8b
subdir: haskell-bee/ subdir: haskell-bee/
haskell-bee-pgmq/ haskell-bee-pgmq/
haskell-bee-tests/ haskell-bee-tests/
......
...@@ -222,7 +222,7 @@ constraints: any.Boolean ==0.2.4, ...@@ -222,7 +222,7 @@ constraints: any.Boolean ==0.2.4,
hashable +integer-gmp -random-initial-seed, hashable +integer-gmp -random-initial-seed,
any.hashtables ==1.4.2, any.hashtables ==1.4.2,
hashtables -bounds-checking -debug -detailed-profiling -portable -sse42 +unsafe-tricks, hashtables -bounds-checking -debug -detailed-profiling -portable -sse42 +unsafe-tricks,
any.haskell-bee ==0.1.0.0, any.haskell-bee ==0.1.2.0,
any.haskell-bee-pgmq ==0.1.0.0, any.haskell-bee-pgmq ==0.1.0.0,
any.haskell-bee-tests ==0.1.0.0, any.haskell-bee-tests ==0.1.0.0,
any.haskell-igraph ==0.10.4.1, any.haskell-igraph ==0.10.4.1,
......
...@@ -89,10 +89,6 @@ istex_url = URL_TO_CHANGE ...@@ -89,10 +89,6 @@ istex_url = URL_TO_CHANGE
max_docs_parsers = 4000 max_docs_parsers = 4000
max_docs_scrapers = 4000 max_docs_scrapers = 4000
# in seconds
js_job_timeout = 6000
js_id_timeout = 6000
[database] [database]
# PostgreSQL access # PostgreSQL access
...@@ -162,6 +158,9 @@ default_visibility_timeout = 1 ...@@ -162,6 +158,9 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 0 default_delay = 0
# delay after reading the job, should prevent overlaps for multiple workers
additional_delay_after_read = 15
# default timeout (in seconds) # default timeout (in seconds)
default_job_timeout = 60 default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
......
...@@ -153,9 +153,6 @@ newEnv logger config dispatcher = do ...@@ -153,9 +153,6 @@ newEnv logger config dispatcher = do
let !nodeStory_env = mkNodeStoryEnv let !nodeStory_env = mkNodeStoryEnv
-- secret <- Jobs.genSecret -- secret <- Jobs.genSecret
-- let jobs_settings = (Jobs.defaultJobSettings 1 secret)
-- & Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
-- & Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!_env_jwt_settings <- jwtSettings (_gc_secrets config) !_env_jwt_settings <- jwtSettings (_gc_secrets config)
......
...@@ -41,8 +41,6 @@ module Gargantext.Core.Config.Types ...@@ -41,8 +41,6 @@ module Gargantext.Core.Config.Types
, JobsConfig(..) , JobsConfig(..)
, jc_max_docs_parsers , jc_max_docs_parsers
, jc_max_docs_scrapers , jc_max_docs_scrapers
, jc_js_job_timeout
, jc_js_id_timeout
, MicroServicesSettings(..) , MicroServicesSettings(..)
, NotificationsConfig(..) , NotificationsConfig(..)
, JWKFile(..) , JWKFile(..)
...@@ -290,24 +288,19 @@ jwtSettings (SecretsConfig { _s_jwk_file = JWKFile jwkFile }) = do ...@@ -290,24 +288,19 @@ jwtSettings (SecretsConfig { _s_jwk_file = JWKFile jwkFile }) = do
data JobsConfig = data JobsConfig =
JobsConfig { _jc_max_docs_parsers :: !Integer JobsConfig { _jc_max_docs_parsers :: !Integer
, _jc_max_docs_scrapers :: !Integer , _jc_max_docs_scrapers :: !Integer }
, _jc_js_job_timeout :: !Integer
, _jc_js_id_timeout :: !Integer }
deriving (Generic, Show) deriving (Generic, Show)
instance FromValue JobsConfig where instance FromValue JobsConfig where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
_jc_max_docs_parsers <- reqKey "max_docs_parsers" _jc_max_docs_parsers <- reqKey "max_docs_parsers"
_jc_max_docs_scrapers <- reqKey "max_docs_scrapers" _jc_max_docs_scrapers <- reqKey "max_docs_scrapers"
_jc_js_job_timeout <- reqKey "js_job_timeout"
_jc_js_id_timeout <- reqKey "js_id_timeout"
return $ JobsConfig { .. } return $ JobsConfig { .. }
instance ToValue JobsConfig where instance ToValue JobsConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable JobsConfig where instance ToTable JobsConfig where
toTable (JobsConfig { .. }) = table [ "max_docs_parsers" .= _jc_max_docs_parsers toTable (JobsConfig { .. }) =
, "max_docs_scrapers" .= _jc_max_docs_scrapers table [ "max_docs_parsers" .= _jc_max_docs_parsers
, "js_job_timeout" .= _jc_js_job_timeout , "max_docs_scrapers" .= _jc_max_docs_scrapers ]
, "js_id_timeout" .= _jc_js_id_timeout ]
makeLenses ''JobsConfig makeLenses ''JobsConfig
......
...@@ -51,6 +51,7 @@ data WorkerSettings = ...@@ -51,6 +51,7 @@ data WorkerSettings =
-- Default delay for jobs. This is useful in tests, so that we can -- Default delay for jobs. This is useful in tests, so that we can
-- get a chance to set up proper watchers for job, given its id -- get a chance to set up proper watchers for job, given its id
, _wsDefaultDelay :: B.TimeoutS , _wsDefaultDelay :: B.TimeoutS
, _wsAdditionalDelayAfterRead :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition] , _wsDefinitions :: ![WorkerDefinition]
} deriving (Show, Eq) } deriving (Show, Eq)
instance FromValue WorkerSettings where instance FromValue WorkerSettings where
...@@ -61,12 +62,14 @@ instance FromValue WorkerSettings where ...@@ -61,12 +62,14 @@ instance FromValue WorkerSettings where
_wsDefaultJobTimeout <- reqKey "default_job_timeout" _wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout" _wsLongJobTimeout <- reqKey "long_job_timeout"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
additionalDelayAfterRead <- reqKey "additional_delay_after_read"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefaultJobTimeout , _wsDefaultJobTimeout
, _wsLongJobTimeout , _wsLongJobTimeout
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay } , _wsDefaultDelay = B.TimeoutS defaultDelay
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead }
instance ToValue WorkerSettings where instance ToValue WorkerSettings where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
...@@ -76,6 +79,7 @@ instance ToTable WorkerSettings where ...@@ -76,6 +79,7 @@ instance ToTable WorkerSettings where
, "long_job_timeout" .= _wsLongJobTimeout , "long_job_timeout" .= _wsLongJobTimeout
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions ]
data WorkerDefinition = data WorkerDefinition =
......
...@@ -86,11 +86,11 @@ notifyJobStarted :: HasWorkerBroker ...@@ -86,11 +86,11 @@ notifyJobStarted :: HasWorkerBroker
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
notifyJobStarted env (W.State { name }) bm = do notifyJobStarted env (W.State { name }) bm = do
let mId = messageId bm let msgId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger (env ^. w_env_config . gc_logging) $ \ioL -> withLogger (env ^. w_env_config . gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ T.pack $ "[notifyJobStarted] [" <> name <> " :: " <> show mId <> "] starting job: " <> show j $(logLoc) ioL DEBUG $ T.pack $ "[notifyJobStarted] [" <> name <> " :: " <> show msgId <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
...@@ -213,7 +213,7 @@ performAction :: HasWorkerBroker ...@@ -213,7 +213,7 @@ performAction :: HasWorkerBroker
-> WState -> WState
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
performAction env _state bm = do performAction env _s bm = do
let job' = toA $ getMessage bm let job' = toA $ getMessage bm
let job = W.job job' let job = W.job job'
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
......
...@@ -15,10 +15,11 @@ module Gargantext.Core.Worker.Jobs where ...@@ -15,10 +15,11 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Broker.Types qualified as B
import Async.Worker.Types qualified as WT import Async.Worker.Types qualified as WT
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..), wsAdditionalDelayAfterRead)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Jobs.Types (Job(..)) import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob) import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob)
...@@ -45,23 +46,23 @@ sendJobWithCfg gcConfig job = do ...@@ -45,23 +46,23 @@ sendJobWithCfg gcConfig job = do
Just wd -> do Just wd -> do
b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd let queueName = _wdQueue wd
let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay } let addDelayAfterRead = gcConfig ^. gc_worker . wsAdditionalDelayAfterRead
let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay
, W.addDelayAfterRead = B._TimeoutS addDelayAfterRead
, W.toStrat = WT.TSDelete }
withLogger (gcConfig ^. gc_logging) $ \ioL -> withLogger (gcConfig ^. gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" $(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job' W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: WorkerSettings -> Job -> SendJob -> SendJob updateJobData :: WorkerSettings -> Job -> SendJob -> SendJob
updateJobData ws (AddCorpusTempFileAsync {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete updateJobData ws (AddCorpusTempFileAsync {}) sj = withLongTimeout ws $ sj { W.resendOnKill = False }
, W.resendOnKill = False }
updateJobData ws (AddCorpusWithQuery {}) sj = withLongTimeout ws sj updateJobData ws (AddCorpusWithQuery {}) sj = withLongTimeout ws sj
updateJobData ws (AddToAnnuaireWithForm {}) sj = withLongTimeout ws sj updateJobData ws (AddToAnnuaireWithForm {}) sj = withLongTimeout ws sj
updateJobData ws (AddWithFile {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete updateJobData ws (AddWithFile {}) sj = withLongTimeout ws $ sj { W.resendOnKill = False }
, W.resendOnKill = False }
updateJobData ws (DocumentsFromWriteNodes {}) sj = withLongTimeout ws sj updateJobData ws (DocumentsFromWriteNodes {}) sj = withLongTimeout ws sj
updateJobData ws (FrameCalcUpload {}) sj = withLongTimeout ws sj updateJobData ws (FrameCalcUpload {}) sj = withLongTimeout ws sj
updateJobData ws (JSONPost {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete updateJobData ws (JSONPost {}) sj = withLongTimeout ws $ sj { W.resendOnKill = False }
, W.resendOnKill = False }
updateJobData ws (NgramsPostCharts {}) sj = withLongTimeout ws sj updateJobData ws (NgramsPostCharts {}) sj = withLongTimeout ws sj
updateJobData ws (RecomputeGraph {}) sj = withLongTimeout ws sj updateJobData ws (RecomputeGraph {}) sj = withLongTimeout ws sj
updateJobData ws (UpdateNode {}) sj = withLongTimeout ws sj updateJobData ws (UpdateNode {}) sj = withLongTimeout ws sj
......
...@@ -176,15 +176,15 @@ ...@@ -176,15 +176,15 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git" git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs: subdirs:
- "gargantext-graph-core" - "gargantext-graph-core"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-pgmq/" - "haskell-bee-pgmq/"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-tests/" - "haskell-bee-tests/"
- commit: 05c39e424d15149dc32097b3318cb6007e0e7052 - commit: c00a600b646e10a41ef71befd98dcc578e83fd8b
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee/" - "haskell-bee/"
...@@ -369,7 +369,7 @@ flags: ...@@ -369,7 +369,7 @@ flags:
gargantext: gargantext:
"enable-benchmarks": false "enable-benchmarks": false
"no-phylo-debug-logs": true "no-phylo-debug-logs": true
"test-crypto": true "test-crypto": false
graphviz: graphviz:
"test-parsing": false "test-parsing": false
hashable: hashable:
......
...@@ -42,8 +42,6 @@ istex_url = "URL_TO_CHANGE" ...@@ -42,8 +42,6 @@ istex_url = "URL_TO_CHANGE"
[jobs] [jobs]
max_docs_parsers = 1000000 max_docs_parsers = 1000000
max_docs_scrapers = 10000 max_docs_scrapers = 10000
js_job_timeout = 1800
js_id_timeout = 1800
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[database] [database]
...@@ -93,6 +91,9 @@ default_visibility_timeout = 1 ...@@ -93,6 +91,9 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 1 default_delay = 1
# delay after reading the job, should prevent overlaps for multiple workers
additional_delay_after_read = 0
# default timeout (in seconds) # default timeout (in seconds)
default_job_timeout = 60 default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment