[ws] dispatcher & central exchange ports in constants module

parent a66c60ed
Pipeline #6195 failed with stages
in 57 minutes and 8 seconds
...@@ -18,6 +18,7 @@ import Control.Monad (join, mapM_) ...@@ -18,6 +18,7 @@ import Control.Monad (join, mapM_)
import Data.ByteString.Char8 qualified as C import Data.ByteString.Char8 qualified as C
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.Core.AsyncUpdates.CentralExchange (gServer) import Gargantext.Core.AsyncUpdates.CentralExchange (gServer)
import Gargantext.Core.AsyncUpdates.Constants (cePort)
import Gargantext.Prelude import Gargantext.Prelude
import Nanomsg import Nanomsg
import Options.Applicative import Options.Applicative
...@@ -51,14 +52,14 @@ wsServer = do ...@@ -51,14 +52,14 @@ wsServer = do
gClient :: IO () gClient :: IO ()
gClient = do gClient = do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560" _ <- connect s (tcp://localhost:" <> show cePort)
-- let str = C.unwords (take 10 $ repeat "hello") -- let str = C.unwords (take 10 $ repeat "hello")
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}" let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}"
C.putStrLn $ C.pack "sending: " <> str C.putStrLn $ C.pack "sending: " <> str
send s str send s str
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560" _ <- connect s ("tcp://localhost:" <> show cePort)
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}" let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}"
C.putStrLn $ C.pack "sending: " <> str2 C.putStrLn $ C.pack "sending: " <> str2
send s str2 send s str2
......
...@@ -157,11 +157,11 @@ library ...@@ -157,11 +157,11 @@ library
Gargantext.API.Routes.Named.Tree Gargantext.API.Routes.Named.Tree
Gargantext.API.Routes.Named.Viz Gargantext.API.Routes.Named.Viz
Gargantext.API.Routes.Types Gargantext.API.Routes.Types
Gargantext.API.WebSockets
Gargantext.Core Gargantext.Core
Gargantext.Core.AsyncUpdates Gargantext.Core.AsyncUpdates
Gargantext.Core.AsyncUpdates.CentralExchange Gargantext.Core.AsyncUpdates.CentralExchange
Gargantext.Core.AsyncUpdates.CentralExchange.Types Gargantext.Core.AsyncUpdates.CentralExchange.Types
Gargantext.Core.AsyncUpdates.Constants
Gargantext.Core.AsyncUpdates.Dispatcher Gargantext.Core.AsyncUpdates.Dispatcher
Gargantext.Core.Mail.Types Gargantext.Core.Mail.Types
Gargantext.Core.Methods.Similarities Gargantext.Core.Methods.Similarities
......
...@@ -50,6 +50,7 @@ import Gargantext.API.EKG ...@@ -50,6 +50,7 @@ import Gargantext.API.EKG
import Gargantext.API.Middleware (logStdoutDevSanitised) import Gargantext.API.Middleware (logStdoutDevSanitised)
import Gargantext.API.Routes import Gargantext.API.Routes
import Gargantext.API.Server (server) import Gargantext.API.Server (server)
import Gargantext.Core.AsyncUpdates.Constants qualified as AUConstants
import Gargantext.Database.Prelude qualified as DB import Gargantext.Database.Prelude qualified as DB
import Gargantext.Prelude hiding (putStrLn) import Gargantext.Prelude hiding (putStrLn)
import Gargantext.System.Logging import Gargantext.System.Logging
...@@ -91,6 +92,8 @@ portRouteInfo port = do ...@@ -91,6 +92,8 @@ portRouteInfo port = do
putStrLn $ " - Web GarganText Frontend..................: " <> "http://localhost:" <> toUrlPiece port <> "/index.html" putStrLn $ " - Web GarganText Frontend..................: " <> "http://localhost:" <> toUrlPiece port <> "/index.html"
putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece port <> "/swagger-ui" putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece port <> "/swagger-ui"
putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece port <> "/gql" putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece port <> "/gql"
putStrLn $ " - Central exchange.........................: " <> "nanomsg://localhost:" <> show AUConstants.cePort
putStrLn $ " - Dispatcher internal......................: " <> "namosg://localhost:" <> show AUConstants.dispatcherInternalPort
putStrLn "==========================================================================================================" putStrLn "=========================================================================================================="
-- | Stops the gargantext server and cancels all the periodic actions -- | Stops the gargantext server and cancels all the periodic actions
......
...@@ -48,7 +48,7 @@ import Gargantext.API.Node.ShareURL qualified as ShareURL ...@@ -48,7 +48,7 @@ import Gargantext.API.Node.ShareURL qualified as ShareURL
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.API.Public qualified as Public import Gargantext.API.Public qualified as Public
import Gargantext.API.Routes.Types import Gargantext.API.Routes.Types
import Gargantext.API.WebSockets qualified as WebSockets import Gargantext.Core.AsyncUpdates.Dispatcher qualified as Dispatcher
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Viz.Graph.API import Gargantext.Core.Viz.Graph.API
import Gargantext.Database.Admin.Types.Hyperdata import Gargantext.Database.Admin.Types.Hyperdata
...@@ -224,7 +224,11 @@ type GargPrivateAPI' = ...@@ -224,7 +224,11 @@ type GargPrivateAPI' =
-- :<|> "auth" :> Capture "node_id" Int :> NodeAPI -- :<|> "auth" :> Capture "node_id" Int :> NodeAPI
--------------------------------------------------------------------- ---------------------------------------------------------------------
type API = WithCustomErrorScheme (WebSockets.API :<|> SwaggerAPI :<|> GargAPI :<|> GraphQL.API :<|> FrontEndAPI) type API = WithCustomErrorScheme ( Dispatcher.WSAPI
:<|> SwaggerAPI
:<|> GargAPI
:<|> GraphQL.API
:<|> FrontEndAPI )
-- | API for serving @swagger.json@ -- | API for serving @swagger.json@
type SwaggerAPI = SwaggerSchemaUI "swagger-ui" "swagger.json" type SwaggerAPI = SwaggerSchemaUI "swagger-ui" "swagger.json"
......
{-|
Module : Gargantext.API.WebSockets
Description : WebSockets API
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.WebSockets
where
import Data.Text (pack)
import Network.WebSockets (PendingConnection, acceptRequest, sendTextData, withPingThread)
import Protolude
import Servant
import Servant.API.WebSocket qualified as WS
type API = "ws" :> WS.WebSocketPending
server :: Server API
server = streamData
where
streamData :: MonadIO m => PendingConnection -> m ()
streamData pc = do
c <- liftIO $ acceptRequest pc
liftIO $ withPingThread c 10 (pure ()) $ do
forM_ [1..] $ \i -> do
sendTextData c (pack $ show (i :: Int)) >> threadDelay (1*ms)
ms = 1000000
...@@ -22,6 +22,7 @@ import Data.Aeson qualified as Aeson ...@@ -22,6 +22,7 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as C import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (cePort, dispatcherInternalPort)
import Gargantext.Prelude import Gargantext.Prelude
import Nanomsg import Nanomsg
...@@ -37,8 +38,8 @@ gServer :: IO () ...@@ -37,8 +38,8 @@ gServer :: IO ()
gServer = do gServer = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do withSocket Push $ \s_dispatcher -> do
_ <- bind s "tcp://*:5560" _ <- bind s ("tcp://*:" <> show cePort)
_ <- connect s_dispatcher "tcp://localhost:5561" _ <- connect s_dispatcher ("tcp://localhost:" <> show dispatcherInternalPort)
forever $ do forever $ do
putText "[central_exchange] receiving" putText "[central_exchange] receiving"
r <- recv s r <- recv s
...@@ -68,6 +69,6 @@ gServer = do ...@@ -68,6 +69,6 @@ gServer = do
notify :: CEMessage -> IO () notify :: CEMessage -> IO ()
notify ceMessage = do notify ceMessage = do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560" _ <- connect s ("tcp://localhost:" <> show cePort)
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
send s $ BSL.toStrict str send s $ BSL.toStrict str
{-|
Module : Gargantext.Core.AsyncUpdates.Constants
Description : Various constants
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.Constants where
import Gargantext.Prelude
-- | Port where the central exchange listens (on localhost)
cePort :: Int
cePort = 5560
-- | Port where the dispatcher listens (on localhost) for messages from central exchange
dispatcherInternalPort :: Int
dispatcherInternalPort = 5561
...@@ -31,6 +31,7 @@ import Data.UUID.V4 as UUID ...@@ -31,6 +31,7 @@ import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id)) import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings) import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.Types (NodeId, UserId) import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar) import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
...@@ -158,7 +159,7 @@ dispatcher authSettings = do ...@@ -158,7 +159,7 @@ dispatcher authSettings = do
let server = wsServer authSettings subscriptions let server = wsServer authSettings subscriptions
d_ce_listener <- forkIO (ce_listener subscriptions) d_ce_listener <- forkIO (dispatcher_listener subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions pure $ Dispatcher { d_subscriptions = subscriptions
, d_ws_server = server , d_ws_server = server
...@@ -295,10 +296,10 @@ instance ToJSON Notification where ...@@ -295,10 +296,10 @@ instance ToJSON Notification where
] ]
ce_listener :: SSet.Set Subscription -> IO () dispatcher_listener :: SSet.Set Subscription -> IO ()
ce_listener subscriptions = do dispatcher_listener subscriptions = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
_ <- bind s "tcp://*:5561" _ <- bind s ("tcp://*:" <> show AUConstants.dispatcherInternalPort)
forever $ do forever $ do
putText "[ce_listener] receiving" putText "[ce_listener] receiving"
r <- recv s r <- recv s
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment