[config] notifications: configurable CE timeout & Dispatcher throttle

parent 43e24106
Pipeline #7701 passed with stages
in 44 minutes and 16 seconds
...@@ -141,5 +141,7 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig ...@@ -141,5 +141,7 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig
defaultNotificationsConfig = defaultNotificationsConfig =
CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560" CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560"
, _nc_central_exchange_connect = "tcp://localhost:5560" , _nc_central_exchange_connect = "tcp://localhost:5560"
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "tcp://*:5561" , _nc_dispatcher_bind = "tcp://*:5561"
, _nc_dispatcher_connect = "tcp://localhost:5561" } , _nc_dispatcher_connect = "tcp://localhost:5561"
, _nc_dispatcher_throttle_ms = 500 }
...@@ -123,9 +123,17 @@ smtp_host = "localhost" ...@@ -123,9 +123,17 @@ smtp_host = "localhost"
# HOST_password = password # HOST_password = password
[notifications] [notifications.central-exchange]
central-exchange = { bind = "tcp://*:5560", connect = "tcp://127.0.0.1:5560" } bind = "tcp://:5560"
dispatcher = { bind = "tcp://*:5561", connect = "tcp://127.0.0.1:5561" } connect = "tcp://127.0.0.1:5560"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 200
[notifications.dispatcher]
bind = "tcp://:5561"
connect = "tcp://127.0.0.1:5561"
# Same dispatcher messages are throttled, this is the throttle delay
throttle_ms = 500
[nlp] [nlp]
......
...@@ -323,21 +323,25 @@ makeLenses ''APIsConfig ...@@ -323,21 +323,25 @@ makeLenses ''APIsConfig
data NotificationsConfig = data NotificationsConfig =
NotificationsConfig { _nc_central_exchange_bind :: ~T.Text NotificationsConfig { _nc_central_exchange_bind :: ~T.Text
, _nc_central_exchange_connect :: ~T.Text , _nc_central_exchange_connect :: ~T.Text
, _nc_ce_send_timeout_ms :: ~Int
, _nc_dispatcher_bind :: ~T.Text , _nc_dispatcher_bind :: ~T.Text
, _nc_dispatcher_connect :: ~T.Text } , _nc_dispatcher_connect :: ~T.Text
, _nc_dispatcher_throttle_ms :: ~Int }
deriving (Show, Eq) deriving (Show, Eq)
instance FromValue NotificationsConfig where instance FromValue NotificationsConfig where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
(_nc_central_exchange_bind, _nc_central_exchange_connect) <- (_nc_central_exchange_bind, _nc_central_exchange_connect, _nc_ce_send_timeout_ms) <-
reqKeyOf "central-exchange" $ parseTableFromValue $ do reqKeyOf "central-exchange" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "send_timeout_ms"
(_nc_dispatcher_bind, _nc_dispatcher_connect) <- pure (b, c, t)
(_nc_dispatcher_bind, _nc_dispatcher_connect, _nc_dispatcher_throttle_ms) <-
reqKeyOf "dispatcher" $ parseTableFromValue $ do reqKeyOf "dispatcher" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "throttle_ms"
pure (b, c, t)
return $ NotificationsConfig { .. } return $ NotificationsConfig { .. }
instance ToValue NotificationsConfig where instance ToValue NotificationsConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -345,8 +349,10 @@ instance ToTable NotificationsConfig where ...@@ -345,8 +349,10 @@ instance ToTable NotificationsConfig where
toTable (NotificationsConfig { .. }) = toTable (NotificationsConfig { .. }) =
table [ "central-exchange" .= table [ "central-exchange" .=
table [ "bind" .= _nc_central_exchange_bind table [ "bind" .= _nc_central_exchange_bind
, "connect" .= _nc_central_exchange_connect ] , "connect" .= _nc_central_exchange_connect
, "send_timeout_ms" .= _nc_ce_send_timeout_ms ]
, "dispatcher" .= , "dispatcher" .=
table [ "bind" .= _nc_dispatcher_bind table [ "bind" .= _nc_dispatcher_bind
, "connect" .= _nc_dispatcher_connect ] , "connect" .= _nc_dispatcher_connect
, "throttle" .= _nc_dispatcher_throttle_ms ]
] ]
...@@ -74,7 +74,7 @@ gServer cfg = do ...@@ -74,7 +74,7 @@ gServer cfg = do
-- C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig{..} = cfg ^. gc_notifications_config nc@NotificationsConfig{..} = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
worker s_dispatcher tChan = do worker s_dispatcher tChan = do
withLogger log_cfg $ \ioLogger -> do withLogger log_cfg $ \ioLogger -> do
...@@ -99,24 +99,20 @@ gServer cfg = do ...@@ -99,24 +99,20 @@ gServer cfg = do
-- process, independent of the server. -- process, independent of the server.
-- send the same message that we received -- send the same message that we received
-- void $ sendNonblocking s_dispatcher r -- void $ sendNonblocking s_dispatcher r
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just (UpdateWorkerProgress _ji _jl) -> do Just (UpdateWorkerProgress _ji _jl) -> do
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just Ping -> do Just Ping -> do
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r $(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
-- | A static send timeout in microseconds. -- | Sends the given payload ensure the send doesn't take more than the
send_timeout_us :: Int -- 'nc_ce_send_timeout_ms', logging a message if the timeouts kicks in.
send_timeout_us = 50_000 sendTimeout :: Sender a => NotificationsConfig -> Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout (NotificationsConfig { _nc_ce_send_timeout_ms }) ioLogger sock payload = withFrozenCallStack $ do
-- | Sends the given payload ensure the send doesn't take more than the static timeoutKickedIn <- timeout (_nc_ce_send_timeout_ms * 1000) $ send sock $ payload
-- 'send_timeout_ns', logging a message if the timeouts kicks in.
sendTimeout :: Sender a => Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout ioLogger sock payload = withFrozenCallStack $ do
timeoutKickedIn <- timeout send_timeout_us $ send sock $ payload
case timeoutKickedIn of case timeoutKickedIn of
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
...@@ -132,8 +128,8 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do ...@@ -132,8 +128,8 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str) $(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str -- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err -- putText $ "[notify] err: " <> show err
sendTimeout ioLogger s (BSL.toStrict str) sendTimeout nc ioLogger s (BSL.toStrict str)
do_work `finally` shutdown s connectEndpoint do_work `finally` shutdown s connectEndpoint
where where
NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config nc@NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
...@@ -87,7 +87,8 @@ dispatcherListener config subscriptions = do ...@@ -87,7 +87,8 @@ dispatcherListener config subscriptions = do
-- NOTE I'm not sure that we need more than 1 worker here, but in -- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
Async.withAsync (throttle 500_000 throttleTChan (sendDataMessageThrottled log_cfg)) $ \_ -> do Async.withAsync (throttle (_nc_dispatcher_throttle_ms * 1000) throttleTChan
(sendDataMessageThrottled log_cfg)) $ \_ -> do
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do forever $ do
-- putText "[dispatcher_listener] receiving" -- putText "[dispatcher_listener] receiving"
...@@ -95,7 +96,7 @@ dispatcherListener config subscriptions = do ...@@ -95,7 +96,7 @@ dispatcherListener config subscriptions = do
-- C.putStrLn $ "[dispatcher_listener] " <> r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig { _nc_dispatcher_bind } = config ^. gc_notifications_config NotificationsConfig { _nc_dispatcher_bind, _nc_dispatcher_throttle_ms } = config ^. gc_notifications_config
log_cfg = config ^. gc_logging log_cfg = config ^. gc_logging
worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do
tId <- myThreadId tId <- myThreadId
......
...@@ -66,11 +66,17 @@ from = "" ...@@ -66,11 +66,17 @@ from = ""
login_type = "Normal" login_type = "Normal"
[notifications]
# We do not hardcode the bind and connect here, because the test infrastructure # We do not hardcode the bind and connect here, because the test infrastructure
# will randomize the connection endpoints via IPC. # will randomize the connection endpoints via IPC.
central-exchange = { bind = "", connect = "" } [notifications.central-exchange]
dispatcher = { bind = "", connect = "" } bind = ""
connect = ""
send_timeout_ms = 200
[notifications.dispatcher]
bind = ""
connect = ""
throttle_ms = 500
[nlp] [nlp]
......
...@@ -87,8 +87,10 @@ withTestNotificationConfig cfg action = do ...@@ -87,8 +87,10 @@ withTestNotificationConfig cfg action = do
action $ cfg & gc_notifications_config action $ cfg & gc_notifications_config
.~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp .~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp
, _nc_central_exchange_connect = "ipc://" <> ce_fp , _nc_central_exchange_connect = "ipc://" <> ce_fp
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "ipc://" <> ds_fp , _nc_dispatcher_bind = "ipc://" <> ds_fp
, _nc_dispatcher_connect = "ipc://" <> ds_fp , _nc_dispatcher_connect = "ipc://" <> ds_fp
, _nc_dispatcher_throttle_ms = 500
} }
setup :: IO TestEnv setup :: IO TestEnv
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment