[websockets] ce/dispatcher bind/connect constants

This makes it easier to change protocols, ports etc.
parent 0d3eb314
Pipeline #6226 failed with stages
...@@ -18,7 +18,7 @@ import Control.Monad (join, mapM_) ...@@ -18,7 +18,7 @@ import Control.Monad (join, mapM_)
import Data.ByteString.Char8 qualified as C import Data.ByteString.Char8 qualified as C
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.Core.AsyncUpdates.CentralExchange (gServer) import Gargantext.Core.AsyncUpdates.CentralExchange (gServer)
import Gargantext.Core.AsyncUpdates.Constants (cePort) import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect)
import Gargantext.Prelude import Gargantext.Prelude
import Nanomsg import Nanomsg
import Options.Applicative import Options.Applicative
...@@ -45,7 +45,7 @@ main = join $ execParser (info parser idm) ...@@ -45,7 +45,7 @@ main = join $ execParser (info parser idm)
simpleServer :: IO () simpleServer :: IO ()
simpleServer = do simpleServer = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
_ <- bind s "tcp://*:5560" _ <- bind s ceBind
putText "[simpleServer] receiving" putText "[simpleServer] receiving"
forever $ do forever $ do
mr <- recvMalloc s 1024 mr <- recvMalloc s 1024
...@@ -67,14 +67,14 @@ wsServer = do ...@@ -67,14 +67,14 @@ wsServer = do
gClient :: IO () gClient :: IO ()
gClient = do gClient = do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s ("tcp://localhost:" <> show cePort) _ <- connect s ceConnect
-- let str = C.unwords (take 10 $ repeat "hello") -- let str = C.unwords (take 10 $ repeat "hello")
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}" let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}"
C.putStrLn $ C.pack "sending: " <> str C.putStrLn $ C.pack "sending: " <> str
send s str send s str
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s ("tcp://localhost:" <> show cePort) _ <- connect s ceConnect
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}" let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}"
C.putStrLn $ C.pack "sending: " <> str2 C.putStrLn $ C.pack "sending: " <> str2
send s str2 send s str2
......
...@@ -93,8 +93,8 @@ portRouteInfo port = do ...@@ -93,8 +93,8 @@ portRouteInfo port = do
putStrLn $ " - Web GarganText Frontend..................: " <> "http://localhost:" <> toUrlPiece port <> "/index.html" putStrLn $ " - Web GarganText Frontend..................: " <> "http://localhost:" <> toUrlPiece port <> "/index.html"
putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece port <> "/swagger-ui" putStrLn $ " - Swagger UI (API documentation)...........: " <> "http://localhost:" <> toUrlPiece port <> "/swagger-ui"
putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece port <> "/gql" putStrLn $ " - Playground GraphQL (API documentation)...: " <> "http://localhost:" <> toUrlPiece port <> "/gql"
putStrLn $ " - Central exchange.........................: " <> "nanomsg://localhost:" <> show AUConstants.cePort putStrLn $ " - Central exchange.........................: " <> "nanomsg: " <> pack AUConstants.ceBind
putStrLn $ " - Dispatcher internal......................: " <> "nanomsg://localhost:" <> show AUConstants.dispatcherInternalPort putStrLn $ " - Dispatcher internal......................: " <> "nanomsg: " <> pack AUConstants.dispatcherBind
putStrLn $ " - WebSocket address........................: " <> "ws://localhost:" <> toUrlPiece port <> "/ws" putStrLn $ " - WebSocket address........................: " <> "ws://localhost:" <> toUrlPiece port <> "/ws"
putStrLn "==========================================================================================================" putStrLn "=========================================================================================================="
......
...@@ -23,7 +23,7 @@ import Data.Aeson qualified as Aeson ...@@ -23,7 +23,7 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as C import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (cePort, dispatcherInternalPort) import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect, dispatcherConnect)
-- import Gargantext.Core.AsyncUpdates.Nanomsg (withSafeSocket) -- import Gargantext.Core.AsyncUpdates.Nanomsg (withSafeSocket)
import Gargantext.Prelude import Gargantext.Prelude
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket) import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
...@@ -45,8 +45,8 @@ gServer :: IO () ...@@ -45,8 +45,8 @@ gServer :: IO ()
gServer = do gServer = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do withSocket Push $ \s_dispatcher -> do
_ <- bind s ("tcp://*:" <> show cePort) _ <- bind s ceBind
_ <- connect s_dispatcher ("tcp://localhost:" <> show dispatcherInternalPort) _ <- connect s_dispatcher dispatcherConnect
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
...@@ -94,6 +94,6 @@ notify :: CEMessage -> IO () ...@@ -94,6 +94,6 @@ notify :: CEMessage -> IO ()
notify ceMessage = do notify ceMessage = do
Async.withAsync (pure ()) $ \_ -> do Async.withAsync (pure ()) $ \_ -> do
withSocket Push $ \s -> do withSocket Push $ \s -> do
_ <- connect s ("tcp://localhost:" <> show cePort) _ <- connect s ceConnect
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
send s $ BSL.toStrict str send s $ BSL.toStrict str
...@@ -16,13 +16,26 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -16,13 +16,26 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.AsyncUpdates.Constants where module Gargantext.Core.AsyncUpdates.Constants where
import Gargantext.Prelude import Prelude qualified
-- | Port where the central exchange listens (on localhost) -- NOTE IDP is fast and we're on local network so it shouldn't be a
cePort :: Int -- problem with dropping packets. Otherwise, use TCP
cePort = 5560 -- https://nanomsg.org
-- | Port where the dispatcher listens (on localhost) for messages from central exchange
dispatcherInternalPort :: Int
dispatcherInternalPort = 5561
-- | Bind address for central exchange (for tcp: tcp://*:5560)
ceBind :: Prelude.String
ceBind = "ipc:///tmp/central-exchange.ipc"
-- ceBind = "tcp://*:5560"
-- | Connect address for central exchange (for tcp: tcp://localhost:5560)
ceConnect :: Prelude.String
ceConnect = "ipc:///tmp/central-exchange.ipc"
-- ceConnect = "tcp://localhost:5560"
-- | Bind address for dispatcher (for tcp: tcp://*:5561)
dispatcherBind :: Prelude.String
dispatcherBind = "ipc:///tmp/dispatcher.ipc"
-- dispatcherBind = "tcp://*:5561"
-- | Connect address for dispatcher (for tcp: tcp://localhost:5561)
dispatcherConnect :: Prelude.String
dispatcherConnect = "ipc:///tmp/dispatcher.ipc"
-- dispatcherConnect = "tcp://localhost:5561"
...@@ -208,7 +208,7 @@ wsServer = WSAPI { wsAPIServer = streamData } ...@@ -208,7 +208,7 @@ wsServer = WSAPI { wsAPIServer = streamData }
dispatcher_listener :: SSet.Set Subscription -> IO () dispatcher_listener :: SSet.Set Subscription -> IO ()
dispatcher_listener subscriptions = do dispatcher_listener subscriptions = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
_ <- bind s ("tcp://*:" <> show AUConstants.dispatcherInternalPort) _ <- bind s AUConstants.dispatcherBind
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment