Commit 7c0d6ba0 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

ws: make Dispatcher abstract

The `Dispatcher` type is now opaque and not exported by
`.AsyncUpdates.Dispatcher`, and it has been moved out of the `.Types`.

This ensures that we can make the internal record fields private, and
offer accessors for things like `terminateDispatcher`. This preserve
information hiding and allows us to change the internal way of
terminating a dispatcher (for example switching away from normal
`forkIO` in favour of `async` & co) while not breaking client's code.
parent 65053486
Pipeline #6567 passed with stages
in 51 minutes and 45 seconds
...@@ -40,7 +40,8 @@ import Gargantext.API.Job ...@@ -40,7 +40,8 @@ import Gargantext.API.Job
import Gargantext.API.Prelude (GargM, IsGargServer) import Gargantext.API.Prelude (GargM, IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (Dispatcher, HasDispatcher(..)) import Gargantext.Core.AsyncUpdates.Dispatcher (Dispatcher)
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (HasDispatcher(..))
import Gargantext.Core.Mail.Types (HasMail, mailSettings) import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..)) import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
...@@ -161,7 +162,7 @@ instance HasMail Env where ...@@ -161,7 +162,7 @@ instance HasMail Env where
instance HasNLPServer Env where instance HasNLPServer Env where
nlpServer = env_nlp nlpServer = env_nlp
instance HasDispatcher Env where instance HasDispatcher Env Dispatcher where
hasDispatcher = env_dispatcher hasDispatcher = env_dispatcher
instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
......
...@@ -213,7 +213,7 @@ newEnv logger port (IniFile file) settingsFile = do ...@@ -213,7 +213,7 @@ newEnv logger port (IniFile file) settingsFile = do
!nlp_env <- nlpServerMap <$> NLP.readConfig file !nlp_env <- nlpServerMap <$> NLP.readConfig file
!central_exchange <- forkIO CE.gServer !central_exchange <- forkIO CE.gServer
!dispatcher <- D.dispatcher !dispatcher <- D.newDispatcher
{- An 'Env' by default doesn't have strict fields, but when constructing one in production {- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks. we want to force them to WHNF to avoid accumulating unnecessary thunks.
......
...@@ -17,7 +17,12 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -17,7 +17,12 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
module Gargantext.Core.AsyncUpdates.Dispatcher ( module Gargantext.Core.AsyncUpdates.Dispatcher (
dispatcher Dispatcher -- opaque
, newDispatcher
, terminateDispatcher
-- * Querying a dispatcher
, dispatcherSubscriptions
) where ) where
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
...@@ -44,10 +49,21 @@ Dispatcher is a service, which provides couple of functionalities: ...@@ -44,10 +49,21 @@ Dispatcher is a service, which provides couple of functionalities:
- dispatches these messages to connected users - dispatches these messages to connected users
-} -}
dispatcher :: IO Dispatcher data Dispatcher =
dispatcher = do Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
}
terminateDispatcher :: Dispatcher -> IO ()
terminateDispatcher = killThread . d_ce_listener
dispatcherSubscriptions :: Dispatcher -> SSet.Set Subscription
dispatcherSubscriptions = d_subscriptions
newDispatcher :: IO Dispatcher
newDispatcher = do
subscriptions <- SSet.newIO subscriptions <- SSet.newIO
-- let server = wsServer authSettings subscriptions -- let server = wsServer authSettings subscriptions
......
...@@ -201,15 +201,8 @@ instance ToJSON WSRequest where ...@@ -201,15 +201,8 @@ instance ToJSON WSRequest where
, "token" .= token ] , "token" .= token ]
toJSON WSDeauthorize = Aeson.object [ "request" .= ( "deauthorize" :: Text ) ] toJSON WSDeauthorize = Aeson.object [ "request" .= ( "deauthorize" :: Text ) ]
data Dispatcher = class HasDispatcher env dispatcher where
Dispatcher { d_subscriptions :: SSet.Set Subscription hasDispatcher :: Getter env dispatcher
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
}
class HasDispatcher env where
hasDispatcher :: Getter env Dispatcher
-- | A notification is sent to clients who subscribed to specific topics -- | A notification is sent to clients who subscribed to specific topics
......
...@@ -27,6 +27,7 @@ import Gargantext.API.Admin.Types (HasSettings(settings), Settings, jwtSettings) ...@@ -27,6 +27,7 @@ import Gargantext.API.Admin.Types (HasSettings(settings), Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer) import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions import Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
import Gargantext.Core.AsyncUpdates.Dispatcher.Types import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.AsyncUpdates.Dispatcher (Dispatcher, dispatcherSubscriptions)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger) import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
...@@ -42,15 +43,15 @@ newtype WSAPI mode = WSAPI { ...@@ -42,15 +43,15 @@ newtype WSAPI mode = WSAPI {
} deriving Generic } deriving Generic
wsServer :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WSAPI (AsServerT m) wsServer :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasSettings env ) => WSAPI (AsServerT m)
wsServer = WSAPI { wsAPIServer = streamData } wsServer = WSAPI { wsAPIServer = streamData }
where where
streamData :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) streamData :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasSettings env )
=> WS.PendingConnection -> m () => WS.PendingConnection -> m ()
streamData pc = do streamData pc = do
authSettings <- view settings authSettings <- view settings
d <- view hasDispatcher d <- view hasDispatcher
let subscriptions = d_subscriptions d let subscriptions = dispatcherSubscriptions d
key <- getWSKey pc key <- getWSKey pc
c <- liftBase $ WS.acceptRequest pc c <- liftBase $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c) let ws = WSKeyConnection (key, c)
......
...@@ -8,7 +8,6 @@ import Control.Monad ...@@ -8,7 +8,6 @@ import Control.Monad
import Data.Text (isInfixOf) import Data.Text (isInfixOf)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.Dispatcher qualified as D import Gargantext.Core.AsyncUpdates.Dispatcher qualified as D
import Gargantext.Core.AsyncUpdates.Dispatcher.Types qualified as DT
import Shelly hiding (FilePath) import Shelly hiding (FilePath)
import System.IO import System.IO
import System.Process import System.Process
...@@ -40,19 +39,19 @@ startCoreNLPServer = do ...@@ -40,19 +39,19 @@ startCoreNLPServer = do
stopCoreNLPServer :: ProcessHandle -> IO () stopCoreNLPServer :: ProcessHandle -> IO ()
stopCoreNLPServer = interruptProcessGroupOf stopCoreNLPServer = interruptProcessGroupOf
withNotifications :: ((ThreadId, DT.Dispatcher) -> IO a) -> IO a withNotifications :: ((ThreadId, D.Dispatcher) -> IO a) -> IO a
withNotifications = bracket startNotifications stopNotifications withNotifications = bracket startNotifications stopNotifications
where where
startNotifications :: IO (ThreadId, DT.Dispatcher) startNotifications :: IO (ThreadId, D.Dispatcher)
startNotifications = do startNotifications = do
central_exchange <- forkIO CE.gServer central_exchange <- forkIO CE.gServer
dispatcher <- D.dispatcher dispatcher <- D.newDispatcher
pure (central_exchange, dispatcher) pure (central_exchange, dispatcher)
stopNotifications :: (ThreadId, DT.Dispatcher) -> IO () stopNotifications :: (ThreadId, D.Dispatcher) -> IO ()
stopNotifications (central_exchange, dispatcher) = do stopNotifications (central_exchange, dispatcher) = do
killThread central_exchange killThread central_exchange
killThread $ DT.d_ce_listener dispatcher D.terminateDispatcher dispatcher
-- It's especially important to use Hspec for DB tests, because, -- It's especially important to use Hspec for DB tests, because,
-- unlike 'tasty', 'Hspec' has explicit control over parallelism, -- unlike 'tasty', 'Hspec' has explicit control over parallelism,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment