[toml] notifications

parent 9e4f7476
Pipeline #6564 canceled with stages
in 11 minutes and 3 seconds
{-|
Module : Main.hs
Description : Gargantext central exchange for async notifications
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE Strict #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Monad (join, mapM_)
import Data.ByteString.Char8 qualified as C
import Data.Text qualified as T
import Gargantext.Core.AsyncUpdates.CentralExchange (gServer)
import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect)
import Gargantext.Prelude
import Nanomsg
import Options.Applicative
data Command =
CEServer
| SimpleServer
| WSServer
| Client
parser :: Parser (IO ())
parser = subparser
( command "ce-server" (info (pure gServer) idm)
<> command "simple-server" (info (pure simpleServer) idm)
<> command "ws-server" (info (pure wsServer) idm)
<> command "client" (info (pure gClient) idm) )
main :: IO ()
main = join $ execParser (info parser idm)
simpleServer :: IO ()
simpleServer = do
withSocket Pull $ \s -> do
_ <- bind s ceBind
putText "[simpleServer] receiving"
forever $ do
mr <- recvMalloc s 1024
C.putStrLn mr
-- case mr of
-- Nothing -> pure ()
-- Just r -> C.putStrLn r
-- threadDelay 10000
wsServer :: IO ()
wsServer = do
withSocket Pull $ \ws -> do
_ <- bind ws "ws://*:5560"
forever $ do
putText "[wsServer] receiving"
r <- recv ws
C.putStrLn r
gClient :: IO ()
gClient = do
withSocket Push $ \s -> do
_ <- connect s ceConnect
-- let str = C.unwords (take 10 $ repeat "hello")
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}"
C.putStrLn $ C.pack "sending: " <> str
send s str
withSocket Push $ \s -> do
_ <- connect s ceConnect
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}"
C.putStrLn $ C.pack "sending: " <> str2
send s str2
...@@ -120,6 +120,11 @@ smtp_host = "localhost" ...@@ -120,6 +120,11 @@ smtp_host = "localhost"
# HOST_password = password # HOST_password = password
[notifications]
central-exchange = { bind = "tcp://*:5560", connect = "tcp://localhost:5560" }
dispatcher = { bind = "tcp://*:5561", connect = "tcp://localhost:5561" }
[nlp] [nlp]
# Possible choices (see Gargantext.Core.NLP): # Possible choices (see Gargantext.Core.NLP):
# - spacy:// (for http:// Spacy) # - spacy:// (for http:// Spacy)
......
...@@ -168,7 +168,6 @@ library ...@@ -168,7 +168,6 @@ library
Gargantext.Core.AsyncUpdates Gargantext.Core.AsyncUpdates
Gargantext.Core.AsyncUpdates.CentralExchange Gargantext.Core.AsyncUpdates.CentralExchange
Gargantext.Core.AsyncUpdates.CentralExchange.Types Gargantext.Core.AsyncUpdates.CentralExchange.Types
Gargantext.Core.AsyncUpdates.Constants
Gargantext.Core.AsyncUpdates.Dispatcher Gargantext.Core.AsyncUpdates.Dispatcher
Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
Gargantext.Core.AsyncUpdates.Dispatcher.Types Gargantext.Core.AsyncUpdates.Dispatcher.Types
...@@ -799,27 +798,6 @@ executable gargantext-server ...@@ -799,27 +798,6 @@ executable gargantext-server
, unordered-containers ^>= 0.2.16.0 , unordered-containers ^>= 0.2.16.0
, vector ^>= 0.7.3 , vector ^>= 0.7.3
executable gargantext-central-exchange
import:
defaults
main-is: Main.hs
other-modules:
Paths_gargantext
hs-source-dirs:
bin/gargantext-central-exchange
build-depends:
bytestring ^>= 0.10.12.0
, gargantext
, gargantext-prelude
, nanomsg-haskell >= 0.2.4 && < 0.3
-- , nng-haskell
, optparse-applicative >= 0.18.1.0 && < 0.19
, optparse-generic ^>= 1.4.7
, postgresql-simple ^>= 0.6.4
, text ^>= 1.2.4.1
, unordered-containers ^>= 0.2.16.0
, vector ^>= 0.7.3
common testDependencies common testDependencies
build-depends: build-depends:
base >=4.7 && <5 base >=4.7 && <5
......
...@@ -44,15 +44,15 @@ import Data.Text.Encoding qualified as TE ...@@ -44,15 +44,15 @@ import Data.Text.Encoding qualified as TE
import Data.Text.IO (putStrLn) import Data.Text.IO (putStrLn)
import Data.Validity import Data.Validity
import Gargantext.API.Admin.Auth.Types (AuthContext) import Gargantext.API.Admin.Auth.Types (AuthContext)
import Gargantext.API.Admin.EnvTypes (Env, Mode(..)) import Gargantext.API.Admin.EnvTypes (Env, Mode(..), _env_config)
import Gargantext.API.Admin.Settings (newEnv) import Gargantext.API.Admin.Settings (newEnv)
import Gargantext.API.Admin.Types (FireWall(..), PortNumber, cookieSettings, jwtSettings, settings, corsSettings, microservicesSettings) import Gargantext.API.Admin.Types (FireWall(..), PortNumber, cookieSettings, jwtSettings, settings, corsSettings, microservicesSettings)
import Gargantext.API.Middleware (logStdoutDevSanitised) import Gargantext.API.Middleware (logStdoutDevSanitised)
import Gargantext.API.Routes.Named (API) import Gargantext.API.Routes.Named (API)
import Gargantext.API.Routes.Named.EKG import Gargantext.API.Routes.Named.EKG
import Gargantext.API.Server.Named (server) import Gargantext.API.Server.Named (server)
import Gargantext.Core.AsyncUpdates.Constants qualified as AUConstants import Gargantext.Core.Config (_gc_notifications_config)
import Gargantext.Core.Config.Types (CORSOrigin(..), CORSSettings, SettingsFile(..), corsAllowedOrigins, msProxyPort) import Gargantext.Core.Config.Types (CORSOrigin(..), CORSSettings, NotificationsConfig(..), SettingsFile(..), corsAllowedOrigins, msProxyPort)
import Gargantext.Database.Prelude qualified as DB import Gargantext.Database.Prelude qualified as DB
import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp) import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp)
import Gargantext.Prelude hiding (putStrLn) import Gargantext.Prelude hiding (putStrLn)
...@@ -75,7 +75,7 @@ startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mod ...@@ -75,7 +75,7 @@ startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mod
env <- newEnv logger port sf env <- newEnv logger port sf
let proxyPort = env ^. settings.microservicesSettings.msProxyPort let proxyPort = env ^. settings.microservicesSettings.msProxyPort
runDbCheck env runDbCheck env
portRouteInfo port proxyPort portRouteInfo (_gc_notifications_config $ _env_config env) port proxyPort
app <- makeApp env app <- makeApp env
mid <- makeGargMiddleware (env ^. settings.corsSettings) mode mid <- makeGargMiddleware (env ^. settings.corsSettings) mode
periodicActions <- schedulePeriodicActions env periodicActions <- schedulePeriodicActions env
...@@ -96,8 +96,8 @@ startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mod ...@@ -96,8 +96,8 @@ startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mod
"' before running gargantext-server (only the first time)." "' before running gargantext-server (only the first time)."
oneHour = Clock.fromNanoSecs 3600_000_000_000 oneHour = Clock.fromNanoSecs 3600_000_000_000
portRouteInfo :: PortNumber -> PortNumber -> IO () portRouteInfo :: NotificationsConfig -> PortNumber -> PortNumber -> IO ()
portRouteInfo mainPort proxyPort = do portRouteInfo nc mainPort proxyPort = do
putStrLn "==========================================================================================================" putStrLn "=========================================================================================================="
putStrLn " GarganText Main Routes" putStrLn " GarganText Main Routes"
putStrLn "==========================================================================================================" putStrLn "=========================================================================================================="
...@@ -105,8 +105,8 @@ portRouteInfo mainPort proxyPort = do ...@@ -105,8 +105,8 @@ portRouteInfo mainPort proxyPort = do
putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece mainPort <> "/swagger-ui" putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece mainPort <> "/swagger-ui"
putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece mainPort <> "/gql" putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece mainPort <> "/gql"
putStrLn $ " - Microservices proxy .....................: " <> "http://localhost:" <> toUrlPiece proxyPort putStrLn $ " - Microservices proxy .....................: " <> "http://localhost:" <> toUrlPiece proxyPort
putStrLn $ " - Central exchange.........................: " <> "nanomsg: " <> pack AUConstants.ceBind putStrLn $ " - Central exchange.........................: " <> "nanomsg: " <> _nc_central_exchange_bind nc
putStrLn $ " - Dispatcher internal......................: " <> "nanomsg: " <> pack AUConstants.dispatcherBind putStrLn $ " - Dispatcher internal......................: " <> "nanomsg: " <> _nc_dispatcher_bind nc
putStrLn $ " - WebSocket address........................: " <> "ws://localhost:" <> toUrlPiece mainPort <> "/ws" putStrLn $ " - WebSocket address........................: " <> "ws://localhost:" <> toUrlPiece mainPort <> "/ws"
putStrLn "==========================================================================================================" putStrLn "=========================================================================================================="
......
...@@ -174,7 +174,9 @@ instance Jobs.MonadJob (GargM Env err) GargJob (Seq JobLog) JobLog where ...@@ -174,7 +174,9 @@ instance Jobs.MonadJob (GargM Env err) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs) getJobEnv = asks (view env_jobs)
instance CET.HasCentralExchangeNotification Env where instance CET.HasCentralExchangeNotification Env where
ce_notify m = liftBase $ CE.notify m ce_notify m = do
nc <- asks (view env_config)
liftBase $ CE.notify (_gc_notifications_config nc) m
-- | The /concrete/ 'JobHandle' in use with our 'GargM' (production) monad. Its -- | The /concrete/ 'JobHandle' in use with our 'GargM' (production) monad. Its
-- constructor it's not exported, to not leak internal details of its implementation. -- constructor it's not exported, to not leak internal details of its implementation.
...@@ -297,7 +299,9 @@ data DevEnv = DevEnv ...@@ -297,7 +299,9 @@ data DevEnv = DevEnv
makeLenses ''DevEnv makeLenses ''DevEnv
instance CET.HasCentralExchangeNotification DevEnv where instance CET.HasCentralExchangeNotification DevEnv where
ce_notify m = liftBase $ CE.notify m ce_notify m = do
nc <- asks (view dev_env_config)
liftBase $ CE.notify (_gc_notifications_config nc) m
-- | Our /mock/ job handle. -- | Our /mock/ job handle.
data DevJobHandle = DevJobHandle data DevJobHandle = DevJobHandle
......
...@@ -207,8 +207,8 @@ newEnv logger port settingsFile@(SettingsFile sf) = do ...@@ -207,8 +207,8 @@ newEnv logger port settingsFile@(SettingsFile sf) = do
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_js_id_timeout) & Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_js_id_timeout)
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env !jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!central_exchange <- forkIO CE.gServer !central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env)
!dispatcher <- D.dispatcher !dispatcher <- D.dispatcher (_gc_notifications_config config_env)
{- An 'Env' by default doesn't have strict fields, but when constructing one in production {- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks. we want to force them to WHNF to avoid accumulating unnecessary thunks.
......
...@@ -20,10 +20,12 @@ import Control.Concurrent.Async qualified as Async ...@@ -20,10 +20,12 @@ import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TChan qualified as TChan
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect, dispatcherConnect) import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg) import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket) import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
{- {-
...@@ -39,12 +41,12 @@ with many users having updates. ...@@ -39,12 +41,12 @@ with many users having updates.
-} -}
gServer :: IO () gServer :: NotificationsConfig -> IO ()
gServer = do gServer (NotificationsConfig { .. }) = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do withSocket Push $ \s_dispatcher -> do
_ <- bind s ceBind _ <- bind s $ T.unpack _nc_central_exchange_bind
_ <- connect s_dispatcher dispatcherConnect _ <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
...@@ -53,9 +55,11 @@ gServer = do ...@@ -53,9 +55,11 @@ gServer = do
-- | the 'tChan' and calls Dispatcher accordingly. This is to -- | the 'tChan' and calls Dispatcher accordingly. This is to
-- | make reading nanomsg as fast as possible. -- | make reading nanomsg as fast as possible.
void $ Async.concurrently (worker s_dispatcher tChan) $ do void $ Async.concurrently (worker s_dispatcher tChan) $ do
withLogger () $ \ioLogger -> do
forever $ do forever $ do
-- putText "[central_exchange] receiving" -- putText "[central_exchange] receiving"
r <- recvMalloc s 1024 r <- recvMalloc s 4096
logMsg ioLogger INFO $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
...@@ -69,7 +73,7 @@ gServer = do ...@@ -69,7 +73,7 @@ gServer = do
-- send the same message that we received -- send the same message that we received
send s_dispatcher r send s_dispatcher r
Just (UpdateTreeFirstLevel node_id) -> do Just (UpdateTreeFirstLevel node_id) -> do
logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id logMsg ioLogger INFO $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id -- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't -- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking) -- block the main thread (send is blocking)
...@@ -89,10 +93,12 @@ gServer = do ...@@ -89,10 +93,12 @@ gServer = do
_ -> logMsg ioLogger DEBUG $ "[central_exchange] unknown message" _ -> logMsg ioLogger DEBUG $ "[central_exchange] unknown message"
notify :: CEMessage -> IO () notify :: NotificationsConfig -> CEMessage -> IO ()
notify ceMessage = do notify (NotificationsConfig { _nc_central_exchange_connect }) ceMessage = do
Async.withAsync (pure ()) $ \_ -> do Async.withAsync (pure ()) $ \_ -> do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s ceConnect _ <- connect s $ T.unpack _nc_central_exchange_connect
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
withLogger () $ \ioLogger ->
logMsg ioLogger INFO $ "[central_exchange] sending: " <> (T.unpack $ TE.decodeUtf8 $ BSL.toStrict str)
send s $ BSL.toStrict str send s $ BSL.toStrict str
{-|
Module : Gargantext.Core.AsyncUpdates.Constants
Description : Various constants
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.Constants where
import Prelude qualified
-- NOTE IDP is fast and we're on local network so it shouldn't be a
-- problem with dropping packets. Otherwise, use TCP
-- https://nanomsg.org
-- | Bind address for central exchange (for tcp: tcp://*:5560)
ceBind :: Prelude.String
ceBind = "ipc:///tmp/central-exchange.ipc"
-- ceBind = "tcp://*:5560"
-- | Connect address for central exchange (for tcp: tcp://localhost:5560)
ceConnect :: Prelude.String
ceConnect = "ipc:///tmp/central-exchange.ipc"
-- ceConnect = "tcp://localhost:5560"
-- | Bind address for dispatcher (for tcp: tcp://*:5561)
dispatcherBind :: Prelude.String
dispatcherBind = "ipc:///tmp/dispatcher.ipc"
-- dispatcherBind = "tcp://*:5561"
-- | Connect address for dispatcher (for tcp: tcp://localhost:5561)
dispatcherConnect :: Prelude.String
dispatcherConnect = "ipc:///tmp/dispatcher.ipc"
-- dispatcherConnect = "tcp://localhost:5561"
...@@ -23,10 +23,11 @@ import Control.Concurrent.STM.TChan qualified as TChan ...@@ -23,10 +23,11 @@ import Control.Concurrent.STM.TChan qualified as TChan
import Control.Concurrent.Throttle (throttle) import Control.Concurrent.Throttle (throttle)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.AsyncUpdates.Dispatcher.Types import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg) import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket) import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
...@@ -44,13 +45,13 @@ Dispatcher is a service, which provides couple of functionalities: ...@@ -44,13 +45,13 @@ Dispatcher is a service, which provides couple of functionalities:
-} -}
dispatcher :: IO Dispatcher dispatcher :: NotificationsConfig -> IO Dispatcher
dispatcher = do dispatcher nc = do
subscriptions <- SSet.newIO subscriptions <- SSet.newIO
-- let server = wsServer authSettings subscriptions -- let server = wsServer authSettings subscriptions
d_ce_listener <- forkIO (dispatcherListener subscriptions) d_ce_listener <- forkIO (dispatcherListener nc subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions pure $ Dispatcher { d_subscriptions = subscriptions
-- , d_ws_server = server -- , d_ws_server = server
...@@ -61,10 +62,10 @@ dispatcher = do ...@@ -61,10 +62,10 @@ dispatcher = do
-- | This is a nanomsg socket listener. We want to read the messages -- | This is a nanomsg socket listener. We want to read the messages
-- | as fast as possible and then process them gradually in a separate -- | as fast as possible and then process them gradually in a separate
-- | thread. -- | thread.
dispatcherListener :: SSet.Set Subscription -> IO () dispatcherListener :: NotificationsConfig -> SSet.Set Subscription -> IO ()
dispatcherListener subscriptions = do dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
_ <- bind s AUConstants.dispatcherBind _ <- bind s $ T.unpack _nc_dispatcher_bind
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
......
...@@ -34,7 +34,6 @@ import Gargantext.API.Admin.Orchestrator.Types (JobLog) ...@@ -34,7 +34,6 @@ import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings) import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer) import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.Types (NodeId, UserId) import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar) import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
......
...@@ -39,6 +39,7 @@ module Gargantext.Core.Config ( ...@@ -39,6 +39,7 @@ module Gargantext.Core.Config (
, gc_mail_config , gc_mail_config
, gc_database_config , gc_database_config
, gc_nlp_config , gc_nlp_config
, gc_notifications_config
, mkProxyUrl , mkProxyUrl
) where ) where
...@@ -88,6 +89,7 @@ data GargConfig = GargConfig { _gc_backend_name :: !T.Text ...@@ -88,6 +89,7 @@ data GargConfig = GargConfig { _gc_backend_name :: !T.Text
, _gc_mail_config :: !MailConfig , _gc_mail_config :: !MailConfig
, _gc_database_config :: !PSQL.ConnectInfo , _gc_database_config :: !PSQL.ConnectInfo
, _gc_nlp_config :: !NLPConfig , _gc_nlp_config :: !NLPConfig
, _gc_notifications_config :: !NotificationsConfig
} }
deriving (Generic, Show) deriving (Generic, Show)
...@@ -104,6 +106,7 @@ instance FromValue GargConfig where ...@@ -104,6 +106,7 @@ instance FromValue GargConfig where
frames <- reqKeyOf "external" $ parseTableFromValue $ reqKey "frames" frames <- reqKeyOf "external" $ parseTableFromValue $ reqKey "frames"
jobs <- reqKey "jobs" jobs <- reqKey "jobs"
apis <- reqKey "apis" apis <- reqKey "apis"
_gc_notifications_config <- reqKey "notifications"
return $ GargConfig { _gc_backend_name = _fc_backend_name return $ GargConfig { _gc_backend_name = _fc_backend_name
, _gc_url = _fc_url , _gc_url = _fc_url
, _gc_url_backend_api = _fc_url_backend_api , _gc_url_backend_api = _fc_url_backend_api
...@@ -124,7 +127,8 @@ instance FromValue GargConfig where ...@@ -124,7 +127,8 @@ instance FromValue GargConfig where
, _gc_frontend_config , _gc_frontend_config
, _gc_mail_config , _gc_mail_config
, _gc_database_config = unTOMLConnectInfo db_config , _gc_database_config = unTOMLConnectInfo db_config
, _gc_nlp_config } , _gc_nlp_config
, _gc_notifications_config }
......
...@@ -23,6 +23,7 @@ module Gargantext.Core.Config.Types ...@@ -23,6 +23,7 @@ module Gargantext.Core.Config.Types
, FrontendConfig(..) , FrontendConfig(..)
, JobsConfig(..) , JobsConfig(..)
, APIsConfig(..) , APIsConfig(..)
, NotificationsConfig(..)
, SecretsConfig(..) , SecretsConfig(..)
, corsUseOriginsForHosts , corsUseOriginsForHosts
...@@ -186,3 +187,25 @@ instance FromValue APIsConfig where ...@@ -186,3 +187,25 @@ instance FromValue APIsConfig where
_ac_pubmed_api_key <- reqKeyOf "pubmed" $ parseTableFromValue $ reqKey "api_key" _ac_pubmed_api_key <- reqKeyOf "pubmed" $ parseTableFromValue $ reqKey "api_key"
_ac_epo_api_url <- reqKeyOf "epo" $ parseTableFromValue $ reqKey "api_url" _ac_epo_api_url <- reqKeyOf "epo" $ parseTableFromValue $ reqKey "api_url"
return $ APIsConfig { .. } return $ APIsConfig { .. }
data NotificationsConfig =
NotificationsConfig { _nc_central_exchange_bind :: !T.Text
, _nc_central_exchange_connect :: !T.Text
, _nc_dispatcher_bind :: !T.Text
, _nc_dispatcher_connect :: !T.Text }
deriving (Show, Eq)
instance FromValue NotificationsConfig where
fromValue = parseTableFromValue $ do
(_nc_central_exchange_bind, _nc_central_exchange_connect) <-
reqKeyOf "central-exchange" $ parseTableFromValue $ do
b <- reqKey "bind"
c <- reqKey "connect"
pure (b, c)
(_nc_dispatcher_bind, _nc_dispatcher_connect) <-
reqKeyOf "dispatcher" $ parseTableFromValue $ do
b <- reqKey "bind"
c <- reqKey "connect"
pure (b, c)
return $ NotificationsConfig { .. }
[cors]
allowed-origins = [
"https://demo.gargantext.org"
, "https://formation.gargantext.org"
, "https://academia.sub.gargantext.org"
, "https://cnrs.gargantext.org"
, "https://imt.sub.gargantext.org"
, "https://helloword.gargantext.org"
, "https://complexsystems.gargantext.org"
, "https://europa.gargantext.org"
, "https://earth.sub.gargantext.org"
, "https://health.sub.gargantext.org"
, "https://msh.sub.gargantext.org"
, "https://dev.sub.gargantext.org"
, "http://localhost:8008"
]
use-origins-for-hosts = true
[microservices.proxy]
port = 8009
enabled = false
...@@ -58,6 +58,11 @@ from = "" ...@@ -58,6 +58,11 @@ from = ""
login_type = "Normal" login_type = "Normal"
[notifications]
central-exchange = { bind = "tcp://*:15560", connect = "tcp://localhost:15560" }
dispatcher = { bind = "tcp://*:15561", connect = "tcp://localhost:15561" }
[nlp] [nlp]
EN = "corenlp://localhost:9000" EN = "corenlp://localhost:9000"
FR = "spacy://localhost:8001" FR = "spacy://localhost:8001"
......
module Test.API where module Test.API where
import Gargantext.Core.Config.Types (NotificationsConfig)
import Prelude import Prelude
import Test.Hspec import Test.Hspec
import qualified Test.API.Authentication as Auth import qualified Test.API.Authentication as Auth
...@@ -10,8 +11,8 @@ import qualified Test.API.Notifications as Notifications ...@@ -10,8 +11,8 @@ import qualified Test.API.Notifications as Notifications
import qualified Test.API.Private as Private import qualified Test.API.Private as Private
import qualified Test.API.UpdateList as UpdateList import qualified Test.API.UpdateList as UpdateList
tests :: Spec tests :: NotificationsConfig -> Spec
tests = describe "API" $ do tests _nc = describe "API" $ do
Auth.tests Auth.tests
Private.tests Private.tests
GraphQL.tests GraphQL.tests
...@@ -19,4 +20,4 @@ tests = describe "API" $ do ...@@ -19,4 +20,4 @@ tests = describe "API" $ do
UpdateList.tests UpdateList.tests
-- | TODO This would work if I managed to get forking dispatcher & -- | TODO This would work if I managed to get forking dispatcher &
-- exchange listeners properly -- exchange listeners properly
-- Notifications.tests -- Notifications.tests nc
...@@ -18,12 +18,13 @@ module Test.API.Notifications ( ...@@ -18,12 +18,13 @@ module Test.API.Notifications (
) where ) where
import Control.Concurrent (forkIO, killThread, threadDelay) import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TVar qualified as TVar
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET
import Gargantext.Core.AsyncUpdates.Dispatcher.Types qualified as DT import Gargantext.Core.AsyncUpdates.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Network.WebSockets.Client qualified as WS import Network.WebSockets.Client qualified as WS
import Network.WebSockets.Connection qualified as WS import Network.WebSockets.Connection qualified as WS
import Prelude import Prelude
...@@ -34,41 +35,47 @@ import Test.Instances () ...@@ -34,41 +35,47 @@ import Test.Instances ()
import Text.RawString.QQ (r) import Text.RawString.QQ (r)
tests :: Spec tests :: NotificationsConfig -> Spec
tests = sequential $ aroundAll withTestDBAndPort $ do tests nc = sequential $ aroundAll withTestDBAndPort $ do
describe "Dispatcher, Central Exchange, WebSockets" $ do describe "Dispatcher, Central Exchange, WebSockets" $ do
it "simple WS notification works" $ \((_testEnv, port), _) -> do it "simple WS notification works" $ \((_testEnv, port), _) -> do
tchan <- TChan.newTChanIO tvar <- TVar.newTVarIO Nothing
-- setup a websocket connection -- setup a websocket connection
let wsConnect = do let wsConnect = do
putStrLn $ "Creating WS client (port " <> show port <> ")" putStrLn $ "Creating WS client (port " <> show port <> ")"
WS.runClient "127.0.0.1" port "/ws" $ \conn -> do WS.runClient "127.0.0.1" port "/ws" $ \conn -> do
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe $ DT.UpdateTree 0) WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe $ DT.UpdateTree 0)
d <- WS.receiveData conn d <- WS.receiveData conn
atomically $ TChan.writeTChan tchan (Aeson.eitherDecode d) putStrLn ("received: " <> show d)
atomically $ TVar.writeTVar tvar (Aeson.decode d)
putStrLn "After WS client" putStrLn "After WS client"
-- wait a bit to settle -- wait a bit to settle
putStrLn "settling a bit initially" putStrLn "settling a bit initially"
threadDelay 1000000 threadDelay (500 * millisecond)
putStrLn "forking wsConnection" putStrLn "forking wsConnection"
wsConnection <- forkIO $ wsConnect wsConnection <- forkIO $ wsConnect
-- wait a bit to connect -- wait a bit to connect
threadDelay 1000000 threadDelay (500 * millisecond)
putStrLn "settling a bit for connection" putStrLn "settling a bit for connection"
threadDelay 1000000 threadDelay (500 * millisecond)
let msg = CET.UpdateTreeFirstLevel 0 let msg = CET.UpdateTreeFirstLevel 0
putStrLn "Notifying CE" putStrLn "Notifying CE"
CE.notify msg CE.notify nc msg
threadDelay (500 * millisecond)
putStrLn "Reading tvar with timeout" putStrLn "Reading tvar with timeout"
d <- Timeout.timeout 1000000 (atomically $ TChan.readTChan tchan) d <- TVar.readTVarIO tvar
putStrLn "Killing wsConnection thread" putStrLn "Killing wsConnection thread"
killThread wsConnection killThread wsConnection
putStrLn "Checking d" putStrLn "Checking d"
d `shouldBe` (Just $ Right msg) d `shouldBe` (Just msg)
millisecond :: Int
millisecond = 1000
...@@ -10,6 +10,7 @@ import Data.Text (isInfixOf) ...@@ -10,6 +10,7 @@ import Data.Text (isInfixOf)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.Dispatcher qualified as D import Gargantext.Core.AsyncUpdates.Dispatcher qualified as D
import Gargantext.Core.AsyncUpdates.Dispatcher.Types qualified as DT import Gargantext.Core.AsyncUpdates.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Shelly hiding (FilePath) import Shelly hiding (FilePath)
import System.IO import System.IO
import System.Process import System.Process
...@@ -42,15 +43,21 @@ stopCoreNLPServer :: ProcessHandle -> IO () ...@@ -42,15 +43,21 @@ stopCoreNLPServer :: ProcessHandle -> IO ()
stopCoreNLPServer = interruptProcessGroupOf stopCoreNLPServer = interruptProcessGroupOf
startNotifications :: IO (ThreadId, DT.Dispatcher) nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
, _nc_central_exchange_connect = "tcp://localhost:15560"
, _nc_dispatcher_bind = "tcp://*:15561"
, _nc_dispatcher_connect = "tcp://localhost:15561" }
startNotifications :: IO (NotificationsConfig, ThreadId, DT.Dispatcher)
startNotifications = do startNotifications = do
central_exchange <- forkIO CE.gServer central_exchange <- forkIO $ CE.gServer nc
dispatcher <- D.dispatcher dispatcher <- D.dispatcher nc
pure (central_exchange, dispatcher) pure (nc, central_exchange, dispatcher)
stopNotifications :: (ThreadId, DT.Dispatcher) -> IO () stopNotifications :: (NotificationsConfig, ThreadId, DT.Dispatcher) -> IO ()
stopNotifications (central_exchange, dispatcher) = do stopNotifications (_nc, central_exchange, dispatcher) = do
killThread central_exchange killThread central_exchange
killThread $ DT.d_ce_listener dispatcher killThread $ DT.d_ce_listener dispatcher
...@@ -70,9 +77,9 @@ main = do ...@@ -70,9 +77,9 @@ main = do
hSetBuffering stdout NoBuffering hSetBuffering stdout NoBuffering
-- TODO Ideally remove start/stop notifications and use -- TODO Ideally remove start/stop notifications and use
-- Test/API/Setup to initialize this in env -- Test/API/Setup to initialize this in env
bracket startNotifications stopNotifications $ \_ -> do bracket startNotifications stopNotifications $ \(nc', _, _) -> do
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do
API.tests API.tests nc'
ReverseProxy.tests ReverseProxy.tests
DB.tests DB.tests
DB.nodeStoryTests DB.nodeStoryTests
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment