Commit 397b19a5 authored by Fabien Maniere's avatar Fabien Maniere

Merge branch '477-dev-flow-zip-file-upload-2' into 'dev'

Resolve "Error uploading zip file on dev"

See merge request !417
parents eb29c06d b60e61f0
Pipeline #7719 passed with stages
in 37 minutes and 21 seconds
...@@ -85,6 +85,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -85,6 +85,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _ac_scrapyd_url } , _ac_scrapyd_url }
, _gc_worker = WorkerSettings { _wsDefinitions = [ wd ] , _gc_worker = WorkerSettings { _wsDefinitions = [ wd ]
, _wsDefaultVisibilityTimeout = 1 , _wsDefaultVisibilityTimeout = 1
, _wsDefaultJobTimeout = 60
, _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
...@@ -141,5 +143,8 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig ...@@ -141,5 +143,8 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig
defaultNotificationsConfig = defaultNotificationsConfig =
CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560" CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560"
, _nc_central_exchange_connect = "tcp://localhost:5560" , _nc_central_exchange_connect = "tcp://localhost:5560"
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "tcp://*:5561" , _nc_dispatcher_bind = "tcp://*:5561"
, _nc_dispatcher_connect = "tcp://localhost:5561" } , _nc_dispatcher_connect = "tcp://localhost:5561"
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500 }
...@@ -16,8 +16,8 @@ fi ...@@ -16,8 +16,8 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and # with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI # `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in. # cache can kick in.
expected_cabal_project_hash="c7e0466c8d4c1ca88b4f3d62d022bd29329d44afc48fffbcfacf0f65293acba8" expected_cabal_project_hash="eb8fdb1a14aa2f7a13f565cf7fa9f6ab0e2dab9212538aed0db5691015be286b"
expected_cabal_project_freeze_hash="553b98aadb35506a305bd740cdd71f5fadc1e6d55d10f91cf39daa6735a63d78" expected_cabal_project_freeze_hash="553b98aadb35506a305bd740cdd71f5fadc1e6d55d10f91cf39daa6735a63d78"
cabal --store-dir=$STORE_DIR v2-build --dry-run cabal --store-dir=$STORE_DIR v2-build --dry-run
......
...@@ -151,12 +151,12 @@ source-repository-package ...@@ -151,12 +151,12 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: 1dd92f0aa8e9f8096064e5656c336e562680f4e3 tag: 9a869df2842eccc86a0f31a69fb8dc5e5ca218a8
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 4a9c709613554eed0189b486de2126c18797088c tag: 05c39e424d15149dc32097b3318cb6007e0e7052
subdir: haskell-bee/ subdir: haskell-bee/
haskell-bee-pgmq/ haskell-bee-pgmq/
haskell-bee-tests/ haskell-bee-tests/
......
...@@ -123,9 +123,19 @@ smtp_host = "localhost" ...@@ -123,9 +123,19 @@ smtp_host = "localhost"
# HOST_password = password # HOST_password = password
[notifications] [notifications.central-exchange]
central-exchange = { bind = "tcp://*:5560", connect = "tcp://127.0.0.1:5560" } bind = "tcp://:5560"
dispatcher = { bind = "tcp://*:5561", connect = "tcp://127.0.0.1:5561" } connect = "tcp://127.0.0.1:5560"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 200
[notifications.dispatcher]
bind = "tcp://:5561"
connect = "tcp://127.0.0.1:5561"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 500
# Same dispatcher messages are throttled, this is the throttle delay
throttle_ms = 500
[nlp] [nlp]
...@@ -149,6 +159,11 @@ default_visibility_timeout = 1 ...@@ -149,6 +159,11 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 0 default_delay = 0
# default timeout (in seconds)
default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# if you leave the same credentials as in [database] section above, # if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database # workers will try to set up the `gargantext_pgmq` database
# automatically # automatically
......
...@@ -19,6 +19,7 @@ rec { ...@@ -19,6 +19,7 @@ rec {
ghc966 ghc966
cabal_install cabal_install
pkgs.haskellPackages.alex pkgs.haskellPackages.alex
pkgs.haskellPackages.ghcid
pkgs.haskellPackages.happy pkgs.haskellPackages.happy
pkgs.haskellPackages.pretty-show pkgs.haskellPackages.pretty-show
]; ];
......
...@@ -257,6 +257,8 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do ...@@ -257,6 +257,8 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
-- TODO Add progress (jobStatus) update for docs - this is a -- TODO Add progress (jobStatus) update for docs - this is a
-- long action -- long action
markStarted (fromIntegral count) jobHandle
let docsC' = zipSources (yieldMany [1..]) (transPipe liftBase docsC) let docsC' = zipSources (yieldMany [1..]) (transPipe liftBase docsC)
.| mapMC (\(idx, doc) -> .| mapMC (\(idx, doc) ->
if idx > limit then do if idx > limit then do
......
...@@ -321,23 +321,29 @@ makeLenses ''APIsConfig ...@@ -321,23 +321,29 @@ makeLenses ''APIsConfig
data NotificationsConfig = data NotificationsConfig =
NotificationsConfig { _nc_central_exchange_bind :: ~T.Text NotificationsConfig { _nc_central_exchange_bind :: ~T.Text
, _nc_central_exchange_connect :: ~T.Text , _nc_central_exchange_connect :: ~T.Text
, _nc_dispatcher_bind :: ~T.Text , _nc_ce_send_timeout_ms :: ~Int
, _nc_dispatcher_connect :: ~T.Text } , _nc_dispatcher_bind :: ~T.Text
, _nc_dispatcher_connect :: ~T.Text
, _nc_dispatcher_send_timeout_ms :: ~Int
, _nc_dispatcher_throttle_ms :: ~Int }
deriving (Show, Eq) deriving (Show, Eq)
instance FromValue NotificationsConfig where instance FromValue NotificationsConfig where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
(_nc_central_exchange_bind, _nc_central_exchange_connect) <- (_nc_central_exchange_bind, _nc_central_exchange_connect, _nc_ce_send_timeout_ms) <-
reqKeyOf "central-exchange" $ parseTableFromValue $ do reqKeyOf "central-exchange" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "send_timeout_ms"
(_nc_dispatcher_bind, _nc_dispatcher_connect) <- pure (b, c, t)
(_nc_dispatcher_bind, _nc_dispatcher_connect, _nc_dispatcher_send_timeout_ms, _nc_dispatcher_throttle_ms) <-
reqKeyOf "dispatcher" $ parseTableFromValue $ do reqKeyOf "dispatcher" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "send_timeout_ms"
tt <- reqKey "throttle_ms"
pure (b, c, t, tt)
return $ NotificationsConfig { .. } return $ NotificationsConfig { .. }
instance ToValue NotificationsConfig where instance ToValue NotificationsConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -345,8 +351,11 @@ instance ToTable NotificationsConfig where ...@@ -345,8 +351,11 @@ instance ToTable NotificationsConfig where
toTable (NotificationsConfig { .. }) = toTable (NotificationsConfig { .. }) =
table [ "central-exchange" .= table [ "central-exchange" .=
table [ "bind" .= _nc_central_exchange_bind table [ "bind" .= _nc_central_exchange_bind
, "connect" .= _nc_central_exchange_connect ] , "connect" .= _nc_central_exchange_connect
, "send_timeout_ms" .= _nc_ce_send_timeout_ms ]
, "dispatcher" .= , "dispatcher" .=
table [ "bind" .= _nc_dispatcher_bind table [ "bind" .= _nc_dispatcher_bind
, "connect" .= _nc_dispatcher_connect ] , "connect" .= _nc_dispatcher_connect
, "send_timeout_ms" .= _nc_dispatcher_send_timeout_ms
, "throttle" .= _nc_dispatcher_throttle_ms ]
] ]
...@@ -38,8 +38,13 @@ type WorkerName = Text ...@@ -38,8 +38,13 @@ type WorkerName = Text
data WorkerSettings = data WorkerSettings =
WorkerSettings { WorkerSettings {
_wsDatabase :: !PGS.ConnectInfo _wsDatabase :: !PGS.ConnectInfo
-- After this number of seconds, the job will be available again.
-- | default job timeout, in seconds
, _wsDefaultJobTimeout :: ~Int
-- | default "long" job timeout, in seconds
, _wsLongJobTimeout :: ~Int
-- After this number of seconds, the job will be available again.
-- You can set timeout for each job individually and this is the -- You can set timeout for each job individually and this is the
-- preferred method over using defaultVt. -- preferred method over using defaultVt.
, _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout , _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout
...@@ -53,8 +58,12 @@ instance FromValue WorkerSettings where ...@@ -53,8 +58,12 @@ instance FromValue WorkerSettings where
dbConfig <- reqKey "database" dbConfig <- reqKey "database"
_wsDefinitions <- reqKey "definitions" _wsDefinitions <- reqKey "definitions"
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout" _wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
_wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefaultJobTimeout
, _wsLongJobTimeout
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay } , _wsDefaultDelay = B.TimeoutS defaultDelay }
...@@ -63,6 +72,8 @@ instance ToValue WorkerSettings where ...@@ -63,6 +72,8 @@ instance ToValue WorkerSettings where
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
toTable (WorkerSettings { .. }) = toTable (WorkerSettings { .. }) =
table [ "database" .= TOMLConnectInfo _wsDatabase table [ "database" .= TOMLConnectInfo _wsDatabase
, "default_job_timeout" .= _wsDefaultJobTimeout
, "long_job_timeout" .= _wsLongJobTimeout
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions ]
......
...@@ -74,7 +74,7 @@ gServer cfg = do ...@@ -74,7 +74,7 @@ gServer cfg = do
-- C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig{..} = cfg ^. gc_notifications_config nc@NotificationsConfig{..} = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
worker s_dispatcher tChan = do worker s_dispatcher tChan = do
withLogger log_cfg $ \ioLogger -> do withLogger log_cfg $ \ioLogger -> do
...@@ -99,29 +99,24 @@ gServer cfg = do ...@@ -99,29 +99,24 @@ gServer cfg = do
-- process, independent of the server. -- process, independent of the server.
-- send the same message that we received -- send the same message that we received
-- void $ sendNonblocking s_dispatcher r -- void $ sendNonblocking s_dispatcher r
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just (UpdateWorkerProgress _ji _jl) -> do Just (UpdateWorkerProgress _ji _jl) -> do
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just Ping -> do Just Ping -> do
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r $(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
-- | A static send timeout in microseconds. -- | Sends the given payload ensure the send doesn't take more than the
send_timeout_us :: Int -- 'nc_ce_send_timeout_ms', logging a message if the timeouts kicks in.
send_timeout_us = 50_000 sendTimeout :: Sender a => NotificationsConfig -> Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout (NotificationsConfig { _nc_ce_send_timeout_ms }) ioLogger sock payload = withFrozenCallStack $ do
-- | Sends the given payload ensure the send doesn't take more than the static timeoutKickedIn <- timeout (_nc_ce_send_timeout_ms * 1000) $ send sock $ payload
-- 'send_timeout_ns', logging a message if the timeouts kicks in.
sendTimeout :: Sender a => Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout ioLogger sock payload = withFrozenCallStack $ do
timeoutKickedIn <- timeout send_timeout_us $ send sock $ payload
case timeoutKickedIn of case timeoutKickedIn of
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
Just () -> Just () -> pure ()
$(logLoc) ioLogger DEBUG $ "[central_exchange] message sent."
notify :: HasCallStack => GargConfig -> CEMessage -> IO () notify :: HasCallStack => GargConfig -> CEMessage -> IO ()
notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
...@@ -130,12 +125,11 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do ...@@ -130,12 +125,11 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect
let do_work = do let do_work = do
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending to " <> _nc_central_exchange_connect
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str) $(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str -- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err -- putText $ "[notify] err: " <> show err
sendTimeout ioLogger s (BSL.toStrict str) sendTimeout nc ioLogger s (BSL.toStrict str)
do_work `finally` shutdown s connectEndpoint do_work `finally` shutdown s connectEndpoint
where where
NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config nc@NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson ...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T import Data.Text qualified as T
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.Config
( GargConfig, LogConfig, gc_logging, gc_notifications_config )
import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging
( HasLogger(logMsg), LogLevel(..), withLogger, logLoc )
import Nanomsg (Pull(..), bind, recv, withSocket) import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set qualified as SSet import StmContainers.Set qualified as SSet
import Gargantext.Core.Config import System.Timeout (timeout)
import Gargantext.System.Logging
{- {-
...@@ -84,7 +87,8 @@ dispatcherListener config subscriptions = do ...@@ -84,7 +87,8 @@ dispatcherListener config subscriptions = do
-- NOTE I'm not sure that we need more than 1 worker here, but in -- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
Async.withAsync (throttle 500_000 throttleTChan (sendDataMessageThrottled log_cfg)) $ \_ -> do Async.withAsync (throttle (_nc_dispatcher_throttle_ms * 1000) throttleTChan
(sendDataMessageThrottled nc log_cfg)) $ \_ -> do
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do forever $ do
-- putText "[dispatcher_listener] receiving" -- putText "[dispatcher_listener] receiving"
...@@ -92,7 +96,7 @@ dispatcherListener config subscriptions = do ...@@ -92,7 +96,7 @@ dispatcherListener config subscriptions = do
-- C.putStrLn $ "[dispatcher_listener] " <> r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig { _nc_dispatcher_bind } = config ^. gc_notifications_config nc@NotificationsConfig { _nc_dispatcher_bind, _nc_dispatcher_throttle_ms } = config ^. gc_notifications_config
log_cfg = config ^. gc_logging log_cfg = config ^. gc_logging
worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do
tId <- myThreadId tId <- myThreadId
...@@ -164,11 +168,19 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -164,11 +168,19 @@ sendNotification throttleTChan ceMessage sub = do
-- | The "true" message sending to websocket. After it was withheld -- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here -- for a while (for throttling), it is finally sent here
sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO () sendDataMessageThrottled :: NotificationsConfig -> LogConfig -> (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled log_cfg (conn, msg) = do sendDataMessageThrottled (NotificationsConfig { _nc_dispatcher_send_timeout_ms }) log_cfg (conn, msg) = do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL -> do
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
WS.sendDataMessage conn msg -- | We need a timeout here for the following reason:
-- when a message is sent and the user disconnects the WS
-- connection (e.g. refreshes the page), it seems that this message sending hangs.
-- We don't want to block the thread indefinitely.
timeoutKickedIn <- timeout (_nc_dispatcher_send_timeout_ms * 1000) $ WS.sendDataMessage conn msg
case timeoutKickedIn of
Nothing ->
$(logLoc) ioL ERROR $ "[sendMessageThrottled] couldn't send msg in timely fashion."
Just _ -> pure ()
-- | Custom filtering of list of Subscriptions based on -- | Custom filtering of list of Subscriptions based on
......
...@@ -232,7 +232,6 @@ performAction env _state bm = do ...@@ -232,7 +232,6 @@ performAction env _state bm = do
-- | Uses temporary file to add documents into corpus -- | Uses temporary file to add documents into corpus
AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do
-- TODO CES.filnally
$(logLocM) DEBUG "[performAction] add to corpus with temporary file" $(logLocM) DEBUG "[performAction] add to corpus with temporary file"
CES.finally (addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh) CES.finally (addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh)
(removeLargeObject $ _wtf_file_oid _actf_args) (removeLargeObject $ _wtf_file_oid _actf_args)
......
...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where ...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Types qualified as WT
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
...@@ -44,25 +45,34 @@ sendJobWithCfg gcConfig job = do ...@@ -44,25 +45,34 @@ sendJobWithCfg gcConfig job = do
Just wd -> do Just wd -> do
b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd let queueName = _wdQueue wd
let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay } let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay }
withLogger (gcConfig ^. gc_logging) $ \ioL -> withLogger (gcConfig ^. gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" $(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job' W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> SendJob -> SendJob updateJobData :: WorkerSettings -> Job -> SendJob -> SendJob
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddCorpusTempFileAsync {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddCorpusWithQuery {}) sj = withLongTimeout ws sj
updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddToAnnuaireWithForm {}) sj = withLongTimeout ws sj
updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddWithFile {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (JSONPost {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 } updateJobData ws (DocumentsFromWriteNodes {}) sj = withLongTimeout ws sj
updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 } updateJobData ws (FrameCalcUpload {}) sj = withLongTimeout ws sj
updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 } updateJobData ws (JSONPost {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (UploadDocument {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (ImportRemoteDocuments {}) sj = sj { W.timeout = 3000 } updateJobData ws (NgramsPostCharts {}) sj = withLongTimeout ws sj
updateJobData (ImportRemoteTerms {}) sj = sj { W.timeout = 3000 } updateJobData ws (RecomputeGraph {}) sj = withLongTimeout ws sj
updateJobData ws (UpdateNode {}) sj = withLongTimeout ws sj
updateJobData ws (UploadDocument {}) sj = withLongTimeout ws sj
updateJobData ws (ImportRemoteDocuments {}) sj = withLongTimeout ws sj
updateJobData ws (ImportRemoteTerms {}) sj = withLongTimeout ws sj
-- | ForgotPasswordAsync, PostNodeAsync -- | ForgotPasswordAsync, PostNodeAsync
updateJobData _ sj = sj { W.resendOnKill = False updateJobData ws _ sj = withDefaultTimeout ws $ sj { W.resendOnKill = False }
, W.timeout = 60 }
withDefaultTimeout :: WorkerSettings -> SendJob -> SendJob
withDefaultTimeout (WorkerSettings { _wsDefaultJobTimeout }) sj = sj { W.timeout = _wsDefaultJobTimeout }
withLongTimeout :: WorkerSettings -> SendJob -> SendJob
withLongTimeout (WorkerSettings { _wsLongJobTimeout }) sj = sj { W.timeout = _wsLongJobTimeout }
...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where where
import Conduit import Conduit
import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch) import Control.Exception.Safe (catch, MonadCatch)
import Data.Conduit qualified as C import Data.Conduit qualified as C
...@@ -173,7 +174,7 @@ flowDataText :: forall env err m. ...@@ -173,7 +174,7 @@ flowDataText :: forall env err m.
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
) )
=> User => User
...@@ -208,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m ...@@ -208,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -228,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m ...@@ -228,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -479,7 +480,7 @@ extractNgramsFromDocument :: ( UniqParameters doc ...@@ -479,7 +480,7 @@ extractNgramsFromDocument :: ( UniqParameters doc
, ExtractNgrams m doc , ExtractNgrams m doc
, IsDBCmd err env m , IsDBCmd err env m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
...@@ -525,7 +526,7 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -525,7 +526,7 @@ extractNgramsFromDocuments :: forall doc env err m.
, ExtractNgrams m doc , ExtractNgrams m doc
, IsDBCmd env err m , IsDBCmd env err m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
......
...@@ -174,15 +174,15 @@ ...@@ -174,15 +174,15 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git" git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs: subdirs:
- "gargantext-graph-core" - "gargantext-graph-core"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-pgmq/" - "haskell-bee-pgmq/"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-tests/" - "haskell-bee-tests/"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee/" - "haskell-bee/"
...@@ -198,7 +198,7 @@ ...@@ -198,7 +198,7 @@
git: "https://gitlab.iscpif.fr/gargantext/haskell-infomap.git" git: "https://gitlab.iscpif.fr/gargantext/haskell-infomap.git"
subdirs: subdirs:
- . - .
- commit: 1dd92f0aa8e9f8096064e5656c336e562680f4e3 - commit: 9a869df2842eccc86a0f31a69fb8dc5e5ca218a8
git: "https://gitlab.iscpif.fr/gargantext/haskell-pgmq" git: "https://gitlab.iscpif.fr/gargantext/haskell-pgmq"
subdirs: subdirs:
- . - .
......
...@@ -66,11 +66,18 @@ from = "" ...@@ -66,11 +66,18 @@ from = ""
login_type = "Normal" login_type = "Normal"
[notifications]
# We do not hardcode the bind and connect here, because the test infrastructure # We do not hardcode the bind and connect here, because the test infrastructure
# will randomize the connection endpoints via IPC. # will randomize the connection endpoints via IPC.
central-exchange = { bind = "", connect = "" } [notifications.central-exchange]
dispatcher = { bind = "", connect = "" } bind = ""
connect = ""
send_timeout_ms = 200
[notifications.dispatcher]
bind = ""
connect = ""
send_timeout_ms = 500
throttle_ms = 500
[nlp] [nlp]
...@@ -85,6 +92,11 @@ default_visibility_timeout = 1 ...@@ -85,6 +92,11 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 1 default_delay = 1
# default timeout (in seconds)
default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
......
...@@ -87,8 +87,11 @@ withTestNotificationConfig cfg action = do ...@@ -87,8 +87,11 @@ withTestNotificationConfig cfg action = do
action $ cfg & gc_notifications_config action $ cfg & gc_notifications_config
.~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp .~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp
, _nc_central_exchange_connect = "ipc://" <> ce_fp , _nc_central_exchange_connect = "ipc://" <> ce_fp
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "ipc://" <> ds_fp , _nc_dispatcher_bind = "ipc://" <> ds_fp
, _nc_dispatcher_connect = "ipc://" <> ds_fp , _nc_dispatcher_connect = "ipc://" <> ds_fp
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500
} }
setup :: IO TestEnv setup :: IO TestEnv
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment