[dispatcher] add timeout for dispatch notifications

I think that sometimes, when user refreshed page, the websocket
connection could hang and the user stopped receiving new
notifications. This is modeled after CentralExchange timeout.
parent cd51d109
Pipeline #7662 canceled with stages
...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson ...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T import Data.Text qualified as T
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.Config
( GargConfig, LogConfig, gc_logging, gc_notifications_config )
import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging
( HasLogger(logMsg), LogLevel(..), withLogger, logLoc )
import Nanomsg (Pull(..), bind, recv, withSocket) import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set qualified as SSet import StmContainers.Set qualified as SSet
import Gargantext.Core.Config import System.Timeout (timeout)
import Gargantext.System.Logging
{- {-
...@@ -162,13 +165,21 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -162,13 +165,21 @@ sendNotification throttleTChan ceMessage sub = do
atomically $ do atomically $ do
TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing)) TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
-- | Static send timeout, in microseconds
sendTimeoutUs :: Int
sendTimeoutUs = 50_000
-- | The "true" message sending to websocket. After it was withheld -- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here -- for a while (for throttling), it is finally sent here
sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO () sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled log_cfg (conn, msg) = do sendDataMessageThrottled log_cfg (conn, msg) = do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL -> do
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
WS.sendDataMessage conn msg timeoutKickedIn <- timeout sendTimeoutUs $ WS.sendDataMessage conn msg
case timeoutKickedIn of
Nothing ->
$(logLoc) ioL ERROR $ "[sendMessageThrottled] couldn't send msg in timely fashion."
Just _ -> pure ()
-- | Custom filtering of list of Subscriptions based on -- | Custom filtering of list of Subscriptions based on
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment