[notifications / jobs] fixes according to Alfredo's remarks

parent b3224bee
Pipeline #7681 passed with stages
in 93 minutes and 46 seconds
...@@ -121,7 +121,6 @@ sendTimeout ioLogger sock payload = withFrozenCallStack $ do ...@@ -121,7 +121,6 @@ sendTimeout ioLogger sock payload = withFrozenCallStack $ do
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
Just () -> pure () Just () -> pure ()
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] message sent."
notify :: HasCallStack => GargConfig -> CEMessage -> IO () notify :: HasCallStack => GargConfig -> CEMessage -> IO ()
notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
...@@ -130,7 +129,6 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do ...@@ -130,7 +129,6 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect
let do_work = do let do_work = do
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] sending to " <> _nc_central_exchange_connect
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str) $(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str -- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err -- putText $ "[notify] err: " <> show err
......
...@@ -167,7 +167,7 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -167,7 +167,7 @@ sendNotification throttleTChan ceMessage sub = do
-- | Static send timeout, in microseconds -- | Static send timeout, in microseconds
sendTimeoutUs :: Int sendTimeoutUs :: Int
sendTimeoutUs = 50_000 sendTimeoutUs = 500_000
-- | The "true" message sending to websocket. After it was withheld -- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here -- for a while (for throttling), it is finally sent here
...@@ -175,6 +175,10 @@ sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO ( ...@@ -175,6 +175,10 @@ sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO (
sendDataMessageThrottled log_cfg (conn, msg) = do sendDataMessageThrottled log_cfg (conn, msg) = do
withLogger log_cfg $ \ioL -> do withLogger log_cfg $ \ioL -> do
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
-- | We need a timeout here for the following reason:
-- when a message is sent and the user disconnects the WS
-- connection (e.g. refreshes the page), it seems that this message sending hangs.
-- We don't want to block the thread indefinitely.
timeoutKickedIn <- timeout sendTimeoutUs $ WS.sendDataMessage conn msg timeoutKickedIn <- timeout sendTimeoutUs $ WS.sendDataMessage conn msg
case timeoutKickedIn of case timeoutKickedIn of
Nothing -> Nothing ->
......
...@@ -50,27 +50,35 @@ sendJobWithCfg gcConfig job = do ...@@ -50,27 +50,35 @@ sendJobWithCfg gcConfig job = do
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" $(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job' W.sendJob' job'
-- | In seconds
longJobTimeout :: Int
longJobTimeout = 3000
-- | In seconds
defaultJobTimeout :: Int
defaultJobTimeout = 60
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> SendJob -> SendJob updateJobData :: Job -> SendJob -> SendJob
updateJobData (AddCorpusTempFileAsync {}) sj = sj { W.timeout = 3000 updateJobData (AddCorpusTempFileAsync {}) sj = sj { W.timeout = longJobTimeout
, W.toStrat = WT.TSDelete , W.toStrat = WT.TSDelete
, W.resendOnKill = False } , W.resendOnKill = False }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 } updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 } updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000 updateJobData (AddWithFile {}) sj = sj { W.timeout = longJobTimeout
, W.toStrat = WT.TSDelete , W.toStrat = WT.TSDelete
, W.resendOnKill = False } , W.resendOnKill = False }
updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = 3000 } updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = 3000 } updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (JSONPost {}) sj = sj { W.timeout = 3000 updateJobData (JSONPost {}) sj = sj { W.timeout = longJobTimeout
, W.toStrat = WT.TSDelete , W.toStrat = WT.TSDelete
, W.resendOnKill = False } , W.resendOnKill = False }
updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 } updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 } updateJobData (RecomputeGraph {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 } updateJobData (UpdateNode {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (UploadDocument {}) sj = sj { W.timeout = 3000 } updateJobData (UploadDocument {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (ImportRemoteDocuments {}) sj = sj { W.timeout = 3000 } updateJobData (ImportRemoteDocuments {}) sj = sj { W.timeout = longJobTimeout }
updateJobData (ImportRemoteTerms {}) sj = sj { W.timeout = 3000 } updateJobData (ImportRemoteTerms {}) sj = sj { W.timeout = longJobTimeout }
-- | ForgotPasswordAsync, PostNodeAsync -- | ForgotPasswordAsync, PostNodeAsync
updateJobData _ sj = sj { W.resendOnKill = False updateJobData _ sj = sj { W.resendOnKill = False
, W.timeout = 60 } , W.timeout = defaultJobTimeout }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment