Module : Main.hs
Description : Gargantext central exchange for async notifications
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer :
Stability : experimental
Portability : POSIX
{-# LANGUAGE Strict #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Monad (join, mapM_)
import Data.ByteString.Char8 qualified as C
import Data.Text qualified as T
import Gargantext.Core.AsyncUpdates.CentralExchange (gServer)
import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect)
import Gargantext.Prelude
import Nanomsg
import Options.Applicative
data Command =
| SimpleServer
| WSServer
| Client
parser :: Parser (IO ())
parser = subparser
( command "ce-server" (info (pure gServer) idm)
<> command "simple-server" (info (pure simpleServer) idm)
<> command "ws-server" (info (pure wsServer) idm)
<> command "client" (info (pure gClient) idm) )
main :: IO ()
main = join $ execParser (info parser idm)
simpleServer :: IO ()
simpleServer = do
withSocket Pull $ \s -> do
_ <- bind s ceBind
putText "[simpleServer] receiving"
forever $ do
mr <- recv s
C.putStrLn mr
-- case mr of
-- Nothing -> pure ()
-- Just r -> C.putStrLn r
-- threadDelay 10000
wsServer :: IO ()
wsServer = do
withSocket Pull $ \ws -> do
_ <- bind ws "ws://*:5560"
forever $ do
putText "[wsServer] receiving"
r <- recv ws
C.putStrLn r
gClient :: IO ()
gClient = do
withSocket Push $ \s -> do
_ <- connect s ceConnect
-- let str = C.unwords (take 10 $ repeat "hello")
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}"
C.putStrLn $ C.pack "sending: " <> str
send s str
withSocket Push $ \s -> do
_ <- connect s ceConnect
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}"
C.putStrLn $ C.pack "sending: " <> str2
send s str2
......@@ -165,10 +165,11 @@ source-repository-package
tag: 4fd2edf30c141600ffad6d730cc4c1c08a6dbce4
-- FIXME(adn) Compat-shim while we wait for upstream to catch-up
type: git
tag: 23be4130804d86979eaee5caffe323a1c7f2b0d6
tag: f54fe61f06685c5af95ddff782e139d8d4e26186
-- source-repository-package
-- type: git
......@@ -40,7 +40,8 @@ import Gargantext.API.Job
import Gargantext.API.Prelude (GargM, IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (Dispatcher, HasDispatcher(..))
import Gargantext.Core.AsyncUpdates.Dispatcher (Dispatcher)
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (HasDispatcher(..))
import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
import Gargantext.Core.NodeStory
......@@ -161,7 +162,7 @@ instance HasMail Env where
instance HasNLPServer Env where
nlpServer = env_nlp
instance HasDispatcher Env where
instance HasDispatcher Env Dispatcher where
hasDispatcher = env_dispatcher
instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
......@@ -208,7 +208,7 @@ newEnv logger port settingsFile@(SettingsFile sf) = do
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env)
!dispatcher <- D.dispatcher (_gc_notifications_config config_env)
!dispatcher <- D.newDispatcher (_gc_notifications_config config_env)
{- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks.
......@@ -14,7 +14,10 @@
module Gargantext.Core.AsyncUpdates.CentralExchange where
module Gargantext.Core.AsyncUpdates.CentralExchange (
, notify
) where
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan
......@@ -26,7 +29,7 @@ import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
import Nanomsg (Pull(..), Push(..), bind, connect, recv, send, withSocket)
......@@ -58,7 +61,7 @@ gServer (NotificationsConfig { .. }) = do
withLogger () $ \ioLogger -> do
forever $ do
-- putText "[central_exchange] receiving"
r <- recvMalloc s 4096
r <- recv s
logMsg ioLogger INFO $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
......@@ -16,7 +16,14 @@
{-# LANGUAGE TypeOperators #-}
module Gargantext.Core.AsyncUpdates.Dispatcher where
module Gargantext.Core.AsyncUpdates.Dispatcher (
Dispatcher -- opaque
, newDispatcher
, terminateDispatcher
-- * Querying a dispatcher
, dispatcherSubscriptions
) where
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan
......@@ -30,7 +37,7 @@ import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS
import Servant.Job.Types (JobStatus(_job_id))
import StmContainers.Set qualified as SSet
......@@ -43,10 +50,21 @@ Dispatcher is a service, which provides couple of functionalities:
- dispatches these messages to connected users
dispatcher :: NotificationsConfig -> IO Dispatcher
dispatcher nc = do
data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
terminateDispatcher :: Dispatcher -> IO ()
terminateDispatcher = killThread . d_ce_listener
dispatcherSubscriptions :: Dispatcher -> SSet.Set Subscription
dispatcherSubscriptions = d_subscriptions
newDispatcher :: NotificationsConfig -> IO Dispatcher
newDispatcher nc = do
subscriptions <- SSet.newIO
-- let server = wsServer authSettings subscriptions
......@@ -78,7 +96,7 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions =
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do
-- putText "[dispatcher_listener] receiving"
r <- recvMalloc s 1024
r <- recv s
-- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r
......@@ -136,8 +154,8 @@ sendDataMessageThrottled (conn, msg) =
-- CETypes.CEMessage.
-- For example, we can add CEMessage.Broadcast to propagate a
-- notification to all connections.
filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions
_filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
_filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions
ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool
ceMessageSubPred (CETypes.UpdateJobProgress js) (Subscription { s_topic }) =
......@@ -200,15 +200,8 @@ instance ToJSON WSRequest where
, "token" .= token ]
toJSON WSDeauthorize = Aeson.object [ "request" .= ( "deauthorize" :: Text ) ]
data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
class HasDispatcher env where
hasDispatcher :: Getter env Dispatcher
class HasDispatcher env dispatcher where
hasDispatcher :: Getter env dispatcher
-- | A notification is sent to clients who subscribed to specific topics
......@@ -27,6 +27,7 @@ import Gargantext.API.Admin.Types (HasSettings(settings), Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.AsyncUpdates.Dispatcher (Dispatcher, dispatcherSubscriptions)
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger)
import Network.WebSockets qualified as WS
......@@ -42,15 +43,15 @@ newtype WSAPI mode = WSAPI {
} deriving Generic
wsServer :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WSAPI (AsServerT m)
wsServer :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasSettings env ) => WSAPI (AsServerT m)
wsServer = WSAPI { wsAPIServer = streamData }
streamData :: ( IsGargServer env err m, HasDispatcher env, HasSettings env )
streamData :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasSettings env )
=> WS.PendingConnection -> m ()
streamData pc = do
authSettings <- view settings
d <- view hasDispatcher
let subscriptions = d_subscriptions d
let subscriptions = dispatcherSubscriptions d
key <- getWSKey pc
c <- liftBase $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c)
......@@ -76,6 +76,10 @@
git: ""
- "llvm-hs-pure"
- commit: f54fe61f06685c5af95ddff782e139d8d4e26186
git: ""
- .
- commit: 74a3296dfe1f0c4a3ade91336dcc689330e84156
git: ""
......@@ -4,7 +4,6 @@ module Main where
import Gargantext.Prelude hiding (isInfixOf)
import Control.Concurrent (forkIO, killThread)
import Control.Monad
import Data.Text (isInfixOf)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
......@@ -42,6 +41,18 @@ startCoreNLPServer = do
stopCoreNLPServer :: ProcessHandle -> IO ()
stopCoreNLPServer = interruptProcessGroupOf
withNotifications :: ((NotificationsConfig, ThreadId, D.Dispatcher) -> IO a) -> IO a
withNotifications = bracket startNotifications stopNotifications
startNotifications :: IO (NotificationsConfig, ThreadId, D.Dispatcher)
startNotifications = do
central_exchange <- forkIO $ CE.gServer nc
dispatcher <- D.newDispatcher nc
pure (nc, central_exchange, dispatcher)
stopNotifications :: (NotificationsConfig, ThreadId, D.Dispatcher) -> IO ()
stopNotifications (_nc, central_exchange, dispatcher) = do
killThread central_exchange
D.terminateDispatcher dispatcher
nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
......@@ -49,18 +60,6 @@ nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
, _nc_dispatcher_bind = "tcp://*:15561"
, _nc_dispatcher_connect = "tcp://localhost:15561" }
startNotifications :: IO (NotificationsConfig, ThreadId, DT.Dispatcher)
startNotifications = do
central_exchange <- forkIO $ CE.gServer nc
dispatcher <- D.dispatcher nc
pure (nc, central_exchange, dispatcher)
stopNotifications :: (NotificationsConfig, ThreadId, DT.Dispatcher) -> IO ()
stopNotifications (_nc, central_exchange, dispatcher) = do
killThread central_exchange
killThread $ DT.d_ce_listener dispatcher
-- It's especially important to use Hspec for DB tests, because,
-- unlike 'tasty', 'Hspec' has explicit control over parallelism,
-- and it's important that DB tests are run according to a very
......@@ -77,7 +76,7 @@ main = do
hSetBuffering stdout NoBuffering
-- TODO Ideally remove start/stop notifications and use
-- Test/API/Setup to initialize this in env
bracket startNotifications stopNotifications $ \(nc', _, _) -> do
withNotifications $ \(nc', _, _) -> do
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do
API.tests nc'
