[websockets] initial implementation of async notifications

I'm currently able to do the following:
- start gargantext-server (as it hosts central exchange and
dispatcher, currently)
- start a websocket connection:
  websocat ws://localhost:8008/ws
- subscibe to a topic (in websocat):
  {"request": "subscribe", "topic": {"type": "update_tree", "node_id":
  15}}
- optionally subscibe to other node_ids or start other websocat's with
  different subscriptions (can be multiple)
- fire up
  cabal v2-run gargantext-central-exchange -- client
  This triggers a node_id: 15 and node_id: 16 notification to be sent

You can send your own notifications, e.g. with Python:
import json
import nanomsg as n
s = n.Socket(n.PUSH)
s.connect('tcp://localhost:5560')
s.send(json.dumps({'type': 'update_tree_first_level', 'node_id': 15}))
parent 9ea7d5f3
...@@ -17,6 +17,7 @@ import Control.Concurrent (threadDelay) ...@@ -17,6 +17,7 @@ import Control.Concurrent (threadDelay)
import Control.Monad (join, mapM_) import Control.Monad (join, mapM_)
import Data.ByteString.Char8 qualified as C import Data.ByteString.Char8 qualified as C
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.Core.AsyncUpdates.CentralExchange (gServer)
import Gargantext.Prelude import Gargantext.Prelude
import Nanomsg import Nanomsg
import Options.Applicative import Options.Applicative
...@@ -26,12 +27,13 @@ data Command = ...@@ -26,12 +27,13 @@ data Command =
Server Server
| Client | Client
parser :: Parser (IO ()) parser :: Parser (IO ())
parser = subparser parser = subparser
( command "server" (info (pure gServer) idm) ( command "server" (info (pure gServer) idm)
<> command "client" (info (pure gClient) idm) ) <> command "client" (info (pure gClient) idm) )
main :: IO () main :: IO ()
main = join $ execParser (info parser idm) main = join $ execParser (info parser idm)
...@@ -39,32 +41,13 @@ gClient :: IO () ...@@ -39,32 +41,13 @@ gClient :: IO ()
gClient = do gClient = do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560" _ <- connect s "tcp://localhost:5560"
let str = C.unwords (take 1000 $ repeat "hello") -- let str = C.unwords (take 10 $ repeat "hello")
C.putStrLn $ C.pack "sending: " <> (C.take 100 str) let str = "{\"type\": \"update_tree_first_level\", \"node_id\": 15}"
replicateM_ 100 $ C.putStrLn $ C.pack "sending: " <> str
send s str send s str
gServer :: IO ()
gServer = do
withSocket Pull $ \s -> do
_ <- bind s "tcp://*:5560"
forever $ do
putText "receiving"
r <- recv s
C.putStrLn $ C.take 100 r
-- r <- recv' s
-- -- putText $ T.pack $ C.unpack r
-- putText "..."
-- case r of
-- Nothing -> pure ()
-- Just r' -> C.putStrLn $ C.take 100 r'
-- threadDelay 1000000
-- withSocket Pub $ \s -> do
-- _ <- bind s "tcp://*:5560"
-- mapM_ (\num -> sendNumber s num) (cycle [1..1000000 :: Int])
-- where
-- sendNumber s number = do
-- threadDelay 1000 -- let's conserve some cycles
-- let numAsString = show number
-- send s (C.pack numAsString)
withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560"
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": 16}"
C.putStrLn $ C.pack "sending: " <> str2
send s str2
...@@ -140,6 +140,9 @@ library ...@@ -140,6 +140,9 @@ library
Gargantext.API.WebSockets Gargantext.API.WebSockets
Gargantext.Core Gargantext.Core
Gargantext.Core.AsyncUpdates Gargantext.Core.AsyncUpdates
Gargantext.Core.AsyncUpdates.CentralExchange
Gargantext.Core.AsyncUpdates.CentralExchange.Types
Gargantext.Core.AsyncUpdates.Dispatcher
Gargantext.Core.Mail.Types Gargantext.Core.Mail.Types
Gargantext.Core.Methods.Similarities Gargantext.Core.Methods.Similarities
Gargantext.Core.Methods.Similarities.Conditional Gargantext.Core.Methods.Similarities.Conditional
...@@ -556,6 +559,7 @@ library ...@@ -556,6 +559,7 @@ library
, morpheus-graphql-subscriptions >= 0.17.0 && < 0.25 , morpheus-graphql-subscriptions >= 0.17.0 && < 0.25
, morpheus-graphql-tests >= 0.17.0 && < 0.25 , morpheus-graphql-tests >= 0.17.0 && < 0.25
, mtl ^>= 2.2.2 , mtl ^>= 2.2.2
, nanomsg-haskell >= 0.2.4 && < 0.3
, natural-transformation ^>= 0.4 , natural-transformation ^>= 0.4
, network-uri ^>= 2.6.4.1 , network-uri ^>= 2.6.4.1
, opaleye ^>= 0.9.6.1 , opaleye ^>= 0.9.6.1
......
...@@ -2,8 +2,9 @@ ...@@ -2,8 +2,9 @@
let let
myBuildInputs = [ myBuildInputs = [
pkgs.pkgs.docker-compose pkgs.pkgs.docker-compose
pkgs.pkgs.haskell-language-server #pkgs.pkgs.haskell-language-server
pkgs.pkgs.stack #pkgs.pkgs.stack
pkgs.pkgs.websocat
]; ];
in in
pkgs.pkgs.mkShell { pkgs.pkgs.mkShell {
......
...@@ -13,6 +13,8 @@ module Gargantext.API.Admin.EnvTypes ( ...@@ -13,6 +13,8 @@ module Gargantext.API.Admin.EnvTypes (
, env_logger , env_logger
, env_manager , env_manager
, env_self_url , env_self_url
, env_central_exchange
, env_dispatcher
, menv_firewall , menv_firewall
, dev_env_logger , dev_env_logger
...@@ -35,6 +37,7 @@ import Gargantext.API.Admin.Types ...@@ -35,6 +37,7 @@ import Gargantext.API.Admin.Types
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Job import Gargantext.API.Job
import Gargantext.API.Prelude (GargM) import Gargantext.API.Prelude (GargM)
import Gargantext.Core.AsyncUpdates.Dispatcher (Dispatcher)
import Gargantext.Core.Mail.Types (HasMail, mailSettings) import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..)) import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
...@@ -122,6 +125,8 @@ data Env = Env ...@@ -122,6 +125,8 @@ data Env = Env
, _env_config :: ~GargConfig , _env_config :: ~GargConfig
, _env_mail :: ~MailConfig , _env_mail :: ~MailConfig
, _env_nlp :: ~NLPServerMap , _env_nlp :: ~NLPServerMap
, _env_central_exchange :: ~ThreadId
, _env_dispatcher :: ~Dispatcher
} }
deriving (Generic) deriving (Generic)
......
...@@ -32,6 +32,8 @@ import Gargantext.API.Admin.Settings.CORS ...@@ -32,6 +32,8 @@ import Gargantext.API.Admin.Settings.CORS
import Gargantext.API.Admin.Types import Gargantext.API.Admin.Types
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.Dispatcher qualified as D
import Gargantext.Core.NLP (nlpServerMap) import Gargantext.Core.NLP (nlpServerMap)
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (databaseParameters, hasConfig) import Gargantext.Database.Prelude (databaseParameters, hasConfig)
...@@ -201,6 +203,10 @@ newEnv logger port file = do ...@@ -201,6 +203,10 @@ newEnv logger port file = do
!config_mail <- Mail.readConfig file !config_mail <- Mail.readConfig file
!nlp_env <- nlpServerMap <$> NLP.readConfig file !nlp_env <- nlpServerMap <$> NLP.readConfig file
!central_exchange <- forkIO CE.gServer
!dispatcher <- D.dispatcher
{- An 'Env' by default doesn't have strict fields, but when constructing one in production {- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks. we want to force them to WHNF to avoid accumulating unnecessary thunks.
-} -}
...@@ -216,6 +222,8 @@ newEnv logger port file = do ...@@ -216,6 +222,8 @@ newEnv logger port file = do
, _env_config = config_env , _env_config = config_env
, _env_mail = config_mail , _env_mail = config_mail
, _env_nlp = nlp_env , _env_nlp = nlp_env
, _env_central_exchange = central_exchange
, _env_dispatcher = dispatcher
} }
newPool :: ConnectInfo -> IO (Pool Connection) newPool :: ConnectInfo -> IO (Pool Connection)
......
...@@ -20,7 +20,7 @@ import Data.Text.Encoding qualified as TE ...@@ -20,7 +20,7 @@ import Data.Text.Encoding qualified as TE
import Data.Version (showVersion) import Data.Version (showVersion)
import Gargantext.API.Admin.Auth (auth, forgotPassword, forgotPasswordAsync) import Gargantext.API.Admin.Auth (auth, forgotPassword, forgotPasswordAsync)
import Gargantext.API.Admin.Auth.Types (AuthContext) import Gargantext.API.Admin.Auth.Types (AuthContext)
import Gargantext.API.Admin.EnvTypes (Env) import Gargantext.API.Admin.EnvTypes (Env, env_dispatcher)
import Gargantext.API.Admin.FrontEnd (frontEndServer) import Gargantext.API.Admin.FrontEnd (frontEndServer)
import Gargantext.API.Auth.PolicyCheck () import Gargantext.API.Auth.PolicyCheck ()
import Gargantext.API.Errors import Gargantext.API.Errors
...@@ -30,7 +30,7 @@ import Gargantext.API.Public qualified as Public ...@@ -30,7 +30,7 @@ import Gargantext.API.Public qualified as Public
import Gargantext.API.Routes (API, GargVersion, GargAPI) import Gargantext.API.Routes (API, GargVersion, GargAPI)
import Gargantext.API.Swagger (swaggerDoc) import Gargantext.API.Swagger (swaggerDoc)
import Gargantext.API.ThrowAll (serverPrivateGargAPI) import Gargantext.API.ThrowAll (serverPrivateGargAPI)
import Gargantext.API.WebSockets qualified as WebSockets import Gargantext.Core.AsyncUpdates.Dispatcher qualified as Dispatcher
import Gargantext.Database.Prelude (hasConfig) import Gargantext.Database.Prelude (hasConfig)
import Gargantext.Prelude hiding (Handler, catch) import Gargantext.Prelude hiding (Handler, catch)
import Gargantext.Prelude.Config (gc_url_backend_api) import Gargantext.Prelude.Config (gc_url_backend_api)
...@@ -59,7 +59,7 @@ server :: Env -> IO (Server API) ...@@ -59,7 +59,7 @@ server :: Env -> IO (Server API)
server env = do server env = do
-- orchestrator <- scrapyOrchestrator env -- orchestrator <- scrapyOrchestrator env
pure $ \errScheme -> pure $ \errScheme ->
WebSockets.server (Dispatcher.d_ws_server $ env ^. env_dispatcher)
:<|> swaggerSchemaUIServer swaggerDoc :<|> swaggerSchemaUIServer swaggerDoc
:<|> hoistServerWithContext :<|> hoistServerWithContext
(Proxy :: Proxy GargAPI) (Proxy :: Proxy GargAPI)
......
{-|
Module : Gargantext.Core.AsyncUpdates.CentralExchange
Description : Central exchange (asynchronous notifications)
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.CentralExchange where
-- import Control.Concurrent (threadDelay)
import Control.Concurrent.Async qualified as Async
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Prelude
import Nanomsg
{-
Central exchange is a service, which gathers messages from various
places and informs the Dispatcher (which will then inform users about
various events).
-}
gServer :: IO ()
gServer = do
withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do
_ <- bind s "tcp://*:5560"
_ <- connect s_dispatcher "tcp://localhost:5561"
forever $ do
putText "[central_exchange] receiving"
r <- recv s
C.putStrLn r
case Aeson.decode (BSL.fromStrict r) of
Just (UpdateTreeFirstLevel node_id) -> do
putText $ "[central_exchange] update tree: " <> show node_id
putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
-- NOTE: If we're flooded with messages, and send is
-- slow, we might be spawning many threads...
-- NOTE: Currently we just forward the message that we
-- got. So in theory central exchange isn't needed (we
-- could ping dispatcher directly). However, I think
-- it's better to have this as a separate
-- component. Currently I built this inside
-- gargantext-server but maybe it can be a separate
-- process, independent of the server.
Async.withAsync (pure ()) $ \_ -> do
send s_dispatcher r
_ -> putText "[central_exchange] unknown"
{-|
Module : Gargantext.Core.AsyncUpdates.CentralExchange.Types
Description : Types for asynchronous notifications (central exchange)
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.CentralExchange.Types where
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude
{-
Central exchange is a service, which gathers messages from various
places and informs the Dispatcher (which will then inform users about
various events).
-}
-- INTERNAL MESSAGES
data CEMessage =
UpdateTreeFirstLevel NodeId
deriving (Show, Eq)
instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_tree_first_level" -> do
node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where
toJSON (UpdateTreeFirstLevel node_id) = object [
"type" .= toJSON ("update_tree_first_level" :: Text)
, "node_id" .= toJSON node_id
]
{-|
Module : Gargantext.Core.AsyncUpdates.Dispatcher
Description : Dispatcher (handles websocket connections, accepts message from central exchange)
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-}
module Gargantext.Core.AsyncUpdates.Dispatcher where
import Control.Concurrent.Async qualified as Async
import Data.Aeson ((.:), (.=))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg
import Network.WebSockets qualified as WS
import Servant
import Servant.API.WebSocket qualified as WS
{-
Dispatcher is a service, which provides couple of functionalities:
- handles WebSocket connections and manages them
- accepts messages from central exchange
- dispatches these messages to connected users
-}
data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
-- UpdateJob JobID
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
UpdateTree NodeId
deriving (Eq, Show)
instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_tree" -> do
node_id <- o .: "node_id"
pure $ UpdateTree node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON Topic where
toJSON (UpdateTree node_id) = Aeson.object [
"type" .= toJSON ("update_tree" :: Text)
, "node_id" .= toJSON node_id
]
data ConnectedUser =
CUUser UserId
| CUPublic
deriving (Eq, Show)
newtype WSKeyConnection = WSKeyConnection (ByteString, WS.Connection)
eqWSKeyConnection :: WSKeyConnection -> WSKeyConnection -> Bool
eqWSKeyConnection ws1 ws2 = wsKey ws1 == wsKey ws2
showWSKeyConnection :: WSKeyConnection -> Text
showWSKeyConnection ws = "WSKeyConnection " <> show (wsKey ws)
wsKey :: WSKeyConnection -> ByteString
wsKey (WSKeyConnection (key, _conn)) = key
wsConn :: WSKeyConnection -> WS.Connection
wsConn (WSKeyConnection (_key, conn)) = conn
data Subscription =
Subscription {
s_connected_user :: ConnectedUser
, s_ws_key_connection :: WSKeyConnection
, s_topic :: Topic }
eqSub :: Subscription -> Subscription -> Bool
eqSub sub1 sub2 =
s_connected_user sub1 == s_connected_user sub2 &&
s_ws_key_connection sub2 `eqWSKeyConnection` s_ws_key_connection sub2 &&
s_topic sub1 == s_topic sub2
showSub :: Subscription -> Text
showSub sub =
"Subscription " <> show (s_connected_user sub) <>
" " <> showWSKeyConnection (s_ws_key_connection sub) <>
" " <> show (s_topic sub)
subKey :: Subscription -> ByteString
subKey sub = wsKey $ s_ws_key_connection sub
{-
We accept requests for subscription/unsubscription via websocket.
We could instead handle 1 websocket connection per every topic
subscription (e.g. parse headers in WS.PendingConnection. However, WS
by default can handle 65k concurrent connections. With multiple users
having multiple components open, we could exhaust that limit quickly.
Hence, we architect this to have 1 websocket connection per web
browser.
-}
data WSRequest =
WSSubscribe Topic
| WSUnsubscribe Topic
deriving (Eq, Show)
instance FromJSON WSRequest where
parseJSON = Aeson.withObject "WSRequest" $ \o -> do
request <- o .: "request"
case request of
"subscribe" -> do
topic <- o .: "topic"
pure $ WSSubscribe topic
"unsubscribe" -> do
topic <- o .: "topic"
pure $ WSUnsubscribe topic
s -> prependFailure "parsing request type failed, " (typeMismatch "request" s)
data Dispatcher =
Dispatcher { d_subscriptions :: TVar [Subscription]
, d_ws_server :: Server WSAPI
, d_ce_listener :: ThreadId
}
dispatcher :: IO Dispatcher
dispatcher = do
subscriptions <- newTVarIO ([] :: [Subscription])
let server = wsServer subscriptions
d_ce_listener <- forkIO (ce_listener subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions
, d_ws_server = server
, d_ce_listener = d_ce_listener }
-- | TODO Allow only 1 topic subscription per connection. It doesn't
-- | make sense to send multiple notifications of the same type to the
-- | same connection.
insertSubscription :: TVar [Subscription] -> Subscription -> IO [Subscription]
insertSubscription subscriptions sub =
atomically $ do
s <- readTVar subscriptions
let ss = s <> [sub]
writeTVar subscriptions ss
pure ss
removeSubscription :: TVar [Subscription] -> Subscription -> IO [Subscription]
removeSubscription subscriptions sub =
atomically $ do
s <- readTVar subscriptions
let ss = filter (\sub' -> not $ sub `eqSub` sub') s
writeTVar subscriptions ss
pure ss
removeSubscriptionsForWSKey :: TVar [Subscription] -> WSKeyConnection -> IO [Subscription]
removeSubscriptionsForWSKey subscriptions ws =
atomically $ do
s <- readTVar subscriptions
let ss = filter (\sub -> subKey sub /= wsKey ws) s
writeTVar subscriptions ss
pure ss
type WSAPI = "ws" :> WS.WebSocketPending
wsServer :: TVar [Subscription] -> Server WSAPI
wsServer subscriptions = streamData
where
streamData :: MonadIO m => WS.PendingConnection -> m ()
streamData pc = do
let reqHead = WS.pendingRequest pc
-- WebSocket specification says that a pending request should send
-- some unique, Sec-WebSocket-Key string. We use this to compare
-- connections (WS.Connection doesn't implement an Eq instance).
let mKey = head $ filter (\(k, _) -> k == "Sec-WebSocket-Key") $ WS.requestHeaders reqHead
let key = snd $ fromMaybe (panicTrace "Sec-WebSocket-Key not found!") mKey
putText $ show $ WS.requestHeaders reqHead
c <- liftIO $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c)
_ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure ()
wsLoop ws = flip finally disconnect $ do
putText "[wsLoop] connecting"
forever $ do
dm <- WS.receiveDataMessage (wsConn ws)
case dm of
WS.Text dm' _ -> do
case Aeson.decode dm' of
Nothing -> putText "[wsLoop] unknown message"
Just (WSSubscribe topic) -> do
-- TODO Fix s_connected_user based on header
let sub = Subscription { s_connected_user = CUPublic
, s_ws_key_connection = ws
, s_topic = topic }
ss <- insertSubscription subscriptions sub
putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
Just (WSUnsubscribe topic) -> do
-- TODO Fix s_connected_user based on header
let sub = Subscription { s_connected_user = CUPublic
, s_ws_key_connection = ws
, s_topic = topic }
ss <- removeSubscription subscriptions sub
putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
_ -> putText "[wsLoop] binary ws messages not supported"
where
disconnect = do
putText "[wsLoop] disconnecting..."
ss <- removeSubscriptionsForWSKey subscriptions ws
putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
data Notification = Notification Topic
deriving (Eq, Show)
instance ToJSON Notification where
toJSON (Notification topic) = Aeson.object [
"notification" .= toJSON topic
]
ce_listener :: TVar [Subscription] -> IO ()
ce_listener subscriptions = do
withSocket Pull $ \s -> do
_ <- bind s "tcp://*:5561"
forever $ do
putText "[ce_listener] receiving"
r <- recv s
C.putStrLn r
case Aeson.decode (BSL.fromStrict r) of
Nothing -> putText "[ce_listener] unknown message from central exchange"
Just ceMessage -> do
subs <- atomically $ readTVar subscriptions
-- TODO This isn't safe: we atomically fetch subscriptions,
-- then send notifications one by one. In the meantime, a
-- subscription could end or new ones could appear
let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification ceMessage) filteredSubs
where
sendNotification :: CETypes.CEMessage -> Subscription -> IO ()
sendNotification ceMessage sub = do
let ws = s_ws_key_connection sub
-- send the same thing to everyone for now
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode $ Notification $ s_topic sub) Nothing)
-- Custom filtering of list of Subscriptions based on
-- CETypes.CEMessage.
-- For example, we can add CEMessage.Broadcast to propagate a
-- notification to all connections.
filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
filterCEMessageSubs (CETypes.UpdateTreeFirstLevel node_id) subscriptions =
filter (\sub -> s_topic sub == UpdateTree node_id) subscriptions
...@@ -24,7 +24,8 @@ import qualified Test.Offline.Stemming.Lancaster as Lancaster ...@@ -24,7 +24,8 @@ import qualified Test.Offline.Stemming.Lancaster as Lancaster
import qualified Test.Parsers.Date as PD import qualified Test.Parsers.Date as PD
import qualified Test.Utils.Crypto as Crypto import qualified Test.Utils.Crypto as Crypto
import qualified Test.Utils.Jobs as Jobs import qualified Test.Utils.Jobs as Jobs
import qualified Test.Core.Similarity as Similarity import qualified Test.Core.Similarity as Similarity
import qualified Test.Core.AsyncUpdates as AsyncUpdates
import Test.Tasty import Test.Tasty
import Test.Tasty.Hspec import Test.Tasty.Hspec
...@@ -38,6 +39,7 @@ main = do ...@@ -38,6 +39,7 @@ main = do
nlpSpec <- testSpec "NLP" NLP.test nlpSpec <- testSpec "NLP" NLP.test
jobsSpec <- testSpec "Jobs" Jobs.test jobsSpec <- testSpec "Jobs" Jobs.test
similaritySpec <- testSpec "Similarity" Similarity.test similaritySpec <- testSpec "Similarity" Similarity.test
asyncUpdatesSpec <- testSpec "AsyncUpdates" AsyncUpdates.test
defaultMain $ testGroup "Gargantext" defaultMain $ testGroup "Gargantext"
[ utilSpec [ utilSpec
...@@ -53,4 +55,5 @@ main = do ...@@ -53,4 +55,5 @@ main = do
, similaritySpec , similaritySpec
, Phylo.tests , Phylo.tests
, testGroup "Stemming" [ Lancaster.tests ] , testGroup "Stemming" [ Lancaster.tests ]
, asyncUpdatesSpec
] ]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment