[config] dispatcher.send_timeout_ms in config

parent b1cec20f
Pipeline #7711 passed with stages
in 47 minutes and 3 seconds
...@@ -144,4 +144,5 @@ defaultNotificationsConfig = ...@@ -144,4 +144,5 @@ defaultNotificationsConfig =
, _nc_ce_send_timeout_ms = 200 , _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "tcp://*:5561" , _nc_dispatcher_bind = "tcp://*:5561"
, _nc_dispatcher_connect = "tcp://localhost:5561" , _nc_dispatcher_connect = "tcp://localhost:5561"
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500 } , _nc_dispatcher_throttle_ms = 500 }
...@@ -132,6 +132,8 @@ send_timeout_ms = 200 ...@@ -132,6 +132,8 @@ send_timeout_ms = 200
[notifications.dispatcher] [notifications.dispatcher]
bind = "tcp://:5561" bind = "tcp://:5561"
connect = "tcp://127.0.0.1:5561" connect = "tcp://127.0.0.1:5561"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 500
# Same dispatcher messages are throttled, this is the throttle delay # Same dispatcher messages are throttled, this is the throttle delay
throttle_ms = 500 throttle_ms = 500
......
...@@ -321,12 +321,13 @@ makeLenses ''APIsConfig ...@@ -321,12 +321,13 @@ makeLenses ''APIsConfig
data NotificationsConfig = data NotificationsConfig =
NotificationsConfig { _nc_central_exchange_bind :: ~T.Text NotificationsConfig { _nc_central_exchange_bind :: ~T.Text
, _nc_central_exchange_connect :: ~T.Text , _nc_central_exchange_connect :: ~T.Text
, _nc_ce_send_timeout_ms :: ~Int , _nc_ce_send_timeout_ms :: ~Int
, _nc_dispatcher_bind :: ~T.Text , _nc_dispatcher_bind :: ~T.Text
, _nc_dispatcher_connect :: ~T.Text , _nc_dispatcher_connect :: ~T.Text
, _nc_dispatcher_throttle_ms :: ~Int } , _nc_dispatcher_send_timeout_ms :: ~Int
, _nc_dispatcher_throttle_ms :: ~Int }
deriving (Show, Eq) deriving (Show, Eq)
instance FromValue NotificationsConfig where instance FromValue NotificationsConfig where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
...@@ -336,12 +337,13 @@ instance FromValue NotificationsConfig where ...@@ -336,12 +337,13 @@ instance FromValue NotificationsConfig where
c <- reqKey "connect" c <- reqKey "connect"
t <- reqKey "send_timeout_ms" t <- reqKey "send_timeout_ms"
pure (b, c, t) pure (b, c, t)
(_nc_dispatcher_bind, _nc_dispatcher_connect, _nc_dispatcher_throttle_ms) <- (_nc_dispatcher_bind, _nc_dispatcher_connect, _nc_dispatcher_send_timeout_ms, _nc_dispatcher_throttle_ms) <-
reqKeyOf "dispatcher" $ parseTableFromValue $ do reqKeyOf "dispatcher" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
t <- reqKey "throttle_ms" t <- reqKey "send_timeout_ms"
pure (b, c, t) tt <- reqKey "throttle_ms"
pure (b, c, t, tt)
return $ NotificationsConfig { .. } return $ NotificationsConfig { .. }
instance ToValue NotificationsConfig where instance ToValue NotificationsConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -354,5 +356,6 @@ instance ToTable NotificationsConfig where ...@@ -354,5 +356,6 @@ instance ToTable NotificationsConfig where
, "dispatcher" .= , "dispatcher" .=
table [ "bind" .= _nc_dispatcher_bind table [ "bind" .= _nc_dispatcher_bind
, "connect" .= _nc_dispatcher_connect , "connect" .= _nc_dispatcher_connect
, "send_timeout_ms" .= _nc_dispatcher_send_timeout_ms
, "throttle" .= _nc_dispatcher_throttle_ms ] , "throttle" .= _nc_dispatcher_throttle_ms ]
] ]
...@@ -88,7 +88,7 @@ dispatcherListener config subscriptions = do ...@@ -88,7 +88,7 @@ dispatcherListener config subscriptions = do
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
Async.withAsync (throttle (_nc_dispatcher_throttle_ms * 1000) throttleTChan Async.withAsync (throttle (_nc_dispatcher_throttle_ms * 1000) throttleTChan
(sendDataMessageThrottled log_cfg)) $ \_ -> do (sendDataMessageThrottled nc log_cfg)) $ \_ -> do
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do forever $ do
-- putText "[dispatcher_listener] receiving" -- putText "[dispatcher_listener] receiving"
...@@ -96,7 +96,7 @@ dispatcherListener config subscriptions = do ...@@ -96,7 +96,7 @@ dispatcherListener config subscriptions = do
-- C.putStrLn $ "[dispatcher_listener] " <> r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig { _nc_dispatcher_bind, _nc_dispatcher_throttle_ms } = config ^. gc_notifications_config nc@NotificationsConfig { _nc_dispatcher_bind, _nc_dispatcher_throttle_ms } = config ^. gc_notifications_config
log_cfg = config ^. gc_logging log_cfg = config ^. gc_logging
worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do
tId <- myThreadId tId <- myThreadId
...@@ -166,21 +166,17 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -166,21 +166,17 @@ sendNotification throttleTChan ceMessage sub = do
atomically $ do atomically $ do
TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing)) TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
-- | Static send timeout, in microseconds
sendTimeoutUs :: Int
sendTimeoutUs = 500_000
-- | The "true" message sending to websocket. After it was withheld -- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here -- for a while (for throttling), it is finally sent here
sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO () sendDataMessageThrottled :: NotificationsConfig -> LogConfig -> (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled log_cfg (conn, msg) = do sendDataMessageThrottled (NotificationsConfig { _nc_dispatcher_send_timeout_ms }) log_cfg (conn, msg) = do
withLogger log_cfg $ \ioL -> do withLogger log_cfg $ \ioL -> do
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
-- | We need a timeout here for the following reason: -- | We need a timeout here for the following reason:
-- when a message is sent and the user disconnects the WS -- when a message is sent and the user disconnects the WS
-- connection (e.g. refreshes the page), it seems that this message sending hangs. -- connection (e.g. refreshes the page), it seems that this message sending hangs.
-- We don't want to block the thread indefinitely. -- We don't want to block the thread indefinitely.
timeoutKickedIn <- timeout sendTimeoutUs $ WS.sendDataMessage conn msg timeoutKickedIn <- timeout (_nc_dispatcher_send_timeout_ms * 1000) $ WS.sendDataMessage conn msg
case timeoutKickedIn of case timeoutKickedIn of
Nothing -> Nothing ->
$(logLoc) ioL ERROR $ "[sendMessageThrottled] couldn't send msg in timely fashion." $(logLoc) ioL ERROR $ "[sendMessageThrottled] couldn't send msg in timely fashion."
......
...@@ -76,6 +76,7 @@ send_timeout_ms = 200 ...@@ -76,6 +76,7 @@ send_timeout_ms = 200
[notifications.dispatcher] [notifications.dispatcher]
bind = "" bind = ""
connect = "" connect = ""
send_timeout_ms = 500
throttle_ms = 500 throttle_ms = 500
......
...@@ -90,6 +90,7 @@ withTestNotificationConfig cfg action = do ...@@ -90,6 +90,7 @@ withTestNotificationConfig cfg action = do
, _nc_ce_send_timeout_ms = 200 , _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "ipc://" <> ds_fp , _nc_dispatcher_bind = "ipc://" <> ds_fp
, _nc_dispatcher_connect = "ipc://" <> ds_fp , _nc_dispatcher_connect = "ipc://" <> ds_fp
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500 , _nc_dispatcher_throttle_ms = 500
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment