{-| Module : Gargantext.Core.Notifications.CentralExchange Description : Central exchange (asynchronous notifications) Copyright : (c) CNRS, 2017-Present License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341 Docs: https://dev.sub.gargantext.org/#/share/Notes/187918 -} {-# LANGUAGE TemplateHaskell #-} module Gargantext.Core.Notifications.CentralExchange ( gServer , notify ) where import Control.Concurrent.Async qualified as Async import Control.Concurrent.STM.TChan qualified as TChan import Data.Aeson qualified as Aeson import Data.ByteString.Lazy qualified as BSL import Data.Text.Encoding qualified as TE import Data.Text qualified as T import Gargantext.Core.Config (GargConfig, gc_notifications_config, gc_logging) import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.Core.Notifications.CentralExchange.Types import Gargantext.Prelude import Gargantext.System.Logging (LogLevel(..), withLogger, logLoc, Logger) import Nanomsg (Pull(..), Push(..), bind, connect, recv, send, withSocket, shutdown, Socket, Sender) import System.Timeout (timeout) {- Central exchange is a service, which gathers messages from various places and informs the Dispatcher (which will then inform users about various events). The primary goal is to be able to read as many messages as possible and then send them to the Dispatcher. Although nanomsg does some message buffering, we don't want these messages to pile up, especially with many users having updates. -} gServer :: HasCallStack => GargConfig -> IO () gServer cfg = do withLogger log_cfg $ \ioLogger -> do withSocket Pull $ \s -> do withSocket Push $ \s_dispatcher -> do $(logLoc) ioLogger DEBUG $ "[central_exchange] binding to " <> _nc_central_exchange_bind bindEndpoint <- bind s $ T.unpack _nc_central_exchange_bind $(logLoc) ioLogger DEBUG $ "[central_exchange] bound to " <> show bindEndpoint $(logLoc) ioLogger DEBUG $ "[central_exchange] connecting to " <> _nc_dispatcher_bind dispatchEndpoint <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect $(logLoc) ioLogger DEBUG $ "[central_exchange] connected to " <> show dispatchEndpoint tChan <- TChan.newTChanIO -- | We have 2 threads: one that listens for nanomsg messages -- | and puts them on the 'tChan' and the second one that reads -- | the 'tChan' and calls Dispatcher accordingly. This is to -- | make reading nanomsg as fast as possible. void $ Async.concurrently (worker s_dispatcher tChan) $ do forever $ do $(logLoc) ioLogger DEBUG $ "[central_exchange] receiving" r <- recv s $(logLoc) ioLogger DEBUG $ "[central_exchange] received: " <> show r -- C.putStrLn $ "[central_exchange] " <> r atomically $ TChan.writeTChan tChan r where NotificationsConfig{..} = cfg ^. gc_notifications_config log_cfg = cfg ^. gc_logging worker s_dispatcher tChan = do withLogger log_cfg $ \ioLogger -> do forever $ do r <- atomically $ TChan.readTChan tChan case Aeson.decode (BSL.fromStrict r) of Just (UpdateTreeFirstLevel _node_id) -> do -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id -- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id -- To make this more robust, use withAsync so we don't -- block the main thread (send is blocking) -- NOTE: If we're flooded with messages, and send is -- slow, we might be spawning many threads... -- NOTE: Currently we just forward the message that we -- got. So in theory central exchange isn't needed (we -- could ping dispatcher directly). However, I think -- it's better to have this as a separate -- component. Currently I built this inside -- gargantext-server but maybe it can be a separate -- process, independent of the server. -- send the same message that we received -- void $ sendNonblocking s_dispatcher r sendTimeout ioLogger s_dispatcher r Just (UpdateWorkerProgress _ji _jl) -> do -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl sendTimeout ioLogger s_dispatcher r Just Ping -> do sendTimeout ioLogger s_dispatcher r Nothing -> $(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r -- | A static send timeout in microseconds. send_timeout_us :: Int send_timeout_us = 50_000 -- | Sends the given payload ensure the send doesn't take more than the static -- 'send_timeout_ns', logging a message if the timeouts kicks in. sendTimeout :: Sender a => Logger IO -> Socket a -> ByteString -> IO () sendTimeout ioLogger sock payload = withFrozenCallStack $ do timeoutKickedIn <- timeout send_timeout_us $ send sock $ payload case timeoutKickedIn of Nothing -> $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." Just () -> $(logLoc) ioLogger DEBUG $ "[central_exchange] message sent." notify :: HasCallStack => GargConfig -> CEMessage -> IO () notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do Async.withAsync (pure ()) $ \_ -> do withSocket Push $ \s -> do connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect let do_work = do let str = Aeson.encode ceMessage $(logLoc) ioLogger DEBUG $ "[central_exchange] sending to " <> _nc_central_exchange_connect $(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str) -- err <- sendNonblocking s $ BSL.toStrict str -- putText $ "[notify] err: " <> show err sendTimeout ioLogger s (BSL.toStrict str) do_work `finally` shutdown s connectEndpoint where NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config log_cfg = cfg ^. gc_logging