Commit f0f88f92 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli Committed by Alfredo Di Napoli

refactor(tests): use IPC communication for dispatcher and CE

parent c814a806
......@@ -14,6 +14,8 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Core.Notifications.CentralExchange (
gServer
, notify
......@@ -29,8 +31,8 @@ import Gargantext.Core.Config (GargConfig, gc_notifications_config, gc_logging)
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Notifications.CentralExchange.Types
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), Push(..), bind, connect, recv, send, withSocket)
import Gargantext.System.Logging (LogLevel(..), withLogger, logLoc)
import Nanomsg (Pull(..), Push(..), bind, connect, recv, send, withSocket, shutdown)
import System.Timeout (timeout)
{-
......@@ -48,29 +50,29 @@ with many users having updates.
gServer :: GargConfig -> IO ()
gServer cfg = do
withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do
withLogger log_cfg $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[central_exchange] binding to " <> T.unpack _nc_central_exchange_bind
_ <- bind s $ T.unpack _nc_central_exchange_bind
withLogger log_cfg $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[central_exchange] connecting to " <> T.unpack _nc_dispatcher_bind
_ <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect
tChan <- TChan.newTChanIO
-- | We have 2 threads: one that listens for nanomsg messages
-- | and puts them on the 'tChan' and the second one that reads
-- | the 'tChan' and calls Dispatcher accordingly. This is to
-- | make reading nanomsg as fast as possible.
void $ Async.concurrently (worker s_dispatcher tChan) $ do
withLogger log_cfg $ \ioLogger -> do
forever $ do
-- putText "[central_exchange] receiving"
r <- recv s
logMsg ioLogger DEBUG $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
withLogger log_cfg $ \ioLogger -> do
withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do
$(logLoc) ioLogger DEBUG $ "[central_exchange] binding to " <> _nc_central_exchange_bind
bindEndpoint <- bind s $ T.unpack _nc_central_exchange_bind
$(logLoc) ioLogger DEBUG $ "[central_exchange] bound to " <> show bindEndpoint
$(logLoc) ioLogger DEBUG $ "[central_exchange] connecting to " <> _nc_dispatcher_bind
dispatchEndpoint <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect
$(logLoc) ioLogger DEBUG $ "[central_exchange] connected to " <> show dispatchEndpoint
tChan <- TChan.newTChanIO
-- | We have 2 threads: one that listens for nanomsg messages
-- | and puts them on the 'tChan' and the second one that reads
-- | the 'tChan' and calls Dispatcher accordingly. This is to
-- | make reading nanomsg as fast as possible.
void $ Async.concurrently (worker s_dispatcher tChan) $ do
forever $ do
$(logLoc) ioLogger DEBUG $ "[central_exchange] receiving"
r <- recv s
$(logLoc) ioLogger DEBUG $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
where
NotificationsConfig{..} = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging
......@@ -80,7 +82,7 @@ gServer cfg = do
r <- atomically $ TChan.readTChan tChan
case Aeson.decode (BSL.fromStrict r) of
Just (UpdateTreeFirstLevel _node_id) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
......@@ -99,25 +101,32 @@ gServer cfg = do
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
Just (UpdateWorkerProgress _ji _jl) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
void $ timeout 100_000 $ send s_dispatcher r
Just Ping -> do
void $ timeout 100_000 $ send s_dispatcher r
Nothing ->
logMsg ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
$(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
notify :: GargConfig -> CEMessage -> IO ()
notify cfg ceMessage = do
notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
Async.withAsync (pure ()) $ \_ -> do
withSocket Push $ \s -> do
_ <- connect s $ T.unpack _nc_central_exchange_connect
let str = Aeson.encode ceMessage
withLogger log_cfg $ \ioLogger ->
logMsg ioLogger DEBUG $ "[central_exchange] sending: " <> (T.unpack $ TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err
void $ timeout 100_000 $ send s $ BSL.toStrict str
connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect
let do_work = do
let str = Aeson.encode ceMessage
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending to " <> _nc_central_exchange_connect
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err
timeoutKickedIn <- timeout 100_000 $ send s $ BSL.toStrict str
case timeoutKickedIn of
Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
Just () ->
$(logLoc) ioLogger DEBUG $ "[central_exchange] message sent."
do_work `finally` shutdown s connectEndpoint
where
NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging
......@@ -67,10 +67,10 @@ login_type = "Normal"
[notifications]
central-exchange = { bind = "tcp://*:15560", connect = "tcp://localhost:15560" }
dispatcher = { bind = "tcp://*:15561", connect = "tcp://localhost:15561" }
# central-exchange = { bind = "ipc:///tmp/ce.ipc", connect = "ipc:///tmp/ce.ipc" }
# dispatcher = { bind = "ipc:///tmp/d.ipc", connect = "ipc:///tmp/d.ipc" }
# We do not hardcode the bind and connect here, because the test infrastructure
# will randomize the connection endpoints via IPC.
central-exchange = { bind = "", connect = "" }
dispatcher = { bind = "", connect = "" }
[nlp]
......
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Setup (
......@@ -20,14 +20,12 @@ import Control.Monad.Reader
import Data.ByteString.Lazy.Char8 qualified as C8L
import Data.Cache qualified as InMemory
import Data.Streaming.Network (bindPortTCP)
import Gargantext.API (makeApp)
import Gargantext.API.Admin.EnvTypes (Env (..), env_dispatcher)
import Gargantext.API.Errors.Types
import Gargantext.API (makeApp)
import Gargantext.API.Prelude
import Gargantext.Core.Config (gc_logging)
import Gargantext.Core.Config (gc_notifications_config)
import Gargantext.Core.Config (_gc_secrets, gc_frontend_config)
import Gargantext.Core.Config.Types (NotificationsConfig(..), fc_appPort, jwtSettings)
import Gargantext.Core.Config hiding (jwtSettings)
import Gargantext.Core.Config.Types (fc_appPort, jwtSettings)
import Gargantext.Core.Notifications (withNotifications)
import Gargantext.Core.Types.Individu
import Gargantext.Database.Action.Flow
......@@ -45,10 +43,9 @@ import Gargantext.System.Logging
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Types
import Network.Wai (Application, responseLBS)
import Network.Wai.Handler.Warp.Internal
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.Warp (runSettingsSocket)
import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.Warp.Internal
import Network.WebSockets qualified as WS
import Prelude hiding (show)
import Servant.Auth.Client ()
......@@ -99,20 +96,16 @@ newTestEnv testEnv logger port = do
, _env_jwt_settings
}
nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
, _nc_central_exchange_connect = "tcp://localhost:15560"
, _nc_dispatcher_bind = "tcp://*:15561"
, _nc_dispatcher_connect = "tcp://localhost:15561" }
-- | Run the gargantext server on a random port, picked by Warp, which allows
-- for concurrent tests to be executed in parallel, if we need to.
-- NOTE: We don't need to change the 'NotificationConfig' at this point, because
-- the 'TestEnv' will already contain the correct values for us to use.
-- (cfg 'setup' in 'Test.Database.Setup').
withTestDBAndPort :: (SpecContext () -> IO ()) -> IO ()
withTestDBAndPort action = withTestDB $ \testEnv -> do
withNotifications (cfg testEnv) $ \dispatcher -> do
withLoggerIO (log_cfg testEnv) $ \ioLogger -> do
let cfg = test_config testEnv
withNotifications cfg $ \dispatcher -> do
withLoggerIO (log_cfg cfg) $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
<&> env_dispatcher .~ dispatcher
app <- makeApp env
......@@ -126,21 +119,20 @@ withTestDBAndPort action = withTestDB $ \testEnv -> do
[ Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ ->
withLogger (log_cfg testEnv) $ \ioLogger' ->
withLogger (log_cfg cfg) $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndPort] CloseRequest caught"
WS.ConnectionClosed ->
withLogger (log_cfg testEnv) $ \ioLogger' ->
withLogger (log_cfg cfg) $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndPort] ConnectionClosed caught"
_ -> do
withLogger (log_cfg testEnv) $ \ioLogger' ->
withLogger (log_cfg cfg) $ \ioLogger' ->
logTxt ioLogger' ERROR $ "[withTestDBAndPort] unknown exception: " <> show err
throw err
-- re-throw any other exceptions
, Handler $ \(err :: SomeException) -> throw err ]
where
cfg te = (test_config te) & gc_notifications_config .~ nc
log_cfg te = (cfg te) ^. gc_logging
log_cfg cfg = cfg ^. gc_logging
-- | Starts the backend server /and/ the microservices proxy, the former at
-- a random port, the latter at a predictable port.
......@@ -216,7 +208,7 @@ testWithApplicationOnPort mkApp userPort action = do
sock <- bindPortTCP userPort "127.0.0.1"
result <-
Async.race
(runSettingsSocket appSettings sock app)
(Warp.runSettingsSocket appSettings sock app)
(waitFor started >> action)
case result of
Left () -> UnliftIO.throwString "Unexpected: runSettingsSocket exited"
......
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
module Test.Database.Setup (
withTestDB
......@@ -16,10 +17,9 @@ import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PG
import Database.PostgreSQL.Simple.Options qualified as Client
import Database.PostgreSQL.Simple.Options qualified as Opts
import Database.Postgres.Temp qualified as Tmp
import Gargantext.Core.Config
import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Types
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.Config.Worker (wsDatabase, wsDefinitions)
import Gargantext.Core.NLP (nlpServerMap)
......@@ -32,8 +32,10 @@ import Paths_gargantext
import Prelude qualified
import Shelly hiding (FilePath, run)
import Shelly qualified as SH
import System.Environment
import Test.Database.Types
import Test.Utils.Db (tmpDBToConnInfo)
import UnliftIO.Temporary (withTempFile)
-- | Test DB settings.
......@@ -73,6 +75,22 @@ tmpPgConfig = Tmp.defaultConfig <>
, Client.password = pure dbPassword
}
-- | Overrides the 'NotificationsConfig' of the input 'GargConfig' with something suitable
-- for testing. It will replace the bind sites for the CE and the dispatcher with random temp files
-- to be used for IPC communication.
withTestNotificationConfig :: GargConfig -> (GargConfig -> IO a) -> IO a
withTestNotificationConfig cfg action = do
tmpDir <- fromMaybe "/tmp" <$> lookupEnv "TMPDIR"
withTempFile tmpDir "ce_test.ipc" $ \(T.pack -> ce_fp) _hdl1 -> do
withTempFile tmpDir "ds_test.ipc" $ \(T.pack -> ds_fp) _hdl2 -> do
action $ cfg & gc_notifications_config
.~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp
, _nc_central_exchange_connect = "ipc://" <> ce_fp
, _nc_dispatcher_bind = "ipc://" <> ds_fp
, _nc_dispatcher_connect = "ipc://" <> ds_fp
}
setup :: IO TestEnv
setup = do
res <- Tmp.startConfig tmpPgConfig
......@@ -80,51 +98,52 @@ setup = do
Left err -> Prelude.fail $ show err
Right db -> do
let connInfo = tmpDBToConnInfo db
gargConfig <- testTomlPath >>= readConfig
gargConfig0 <- testTomlPath >>= readConfig
-- fix db since we're using tmp-postgres
<&> (gc_database_config .~ connInfo)
-- <&> (gc_worker . wsDatabase .~ connInfo)
<&> (gc_worker . wsDatabase .~ (connInfo { PG.connectDatabase = "pgmq_test" }))
-- putText $ "[setup] database: " <> show (gargConfig ^. gc_database_config)
-- putText $ "[setup] worker db: " <> show (gargConfig ^. gc_worker . wsDatabase)
let log_cfg = gargConfig ^. gc_logging
let idleTime = 60.0
let maxResources = 2
let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
pool <- newPool (setNumStripes (Just 2) poolConfig)
bootstrapDB db pool gargConfig
ugen <- emptyCounter
test_nodeStory <- fromDBNodeStoryEnv pool
withLoggerIO log_cfg $ \logger -> do
let wPoolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
wPool <- newPool (setNumStripes (Just 2) wPoolConfig)
wNodeStory <- fromDBNodeStoryEnv wPool
_w_env_job_state <- newTVarIO Nothing
withLoggerIO log_cfg $ \wioLogger -> do
let wEnv = WorkerEnv { _w_env_config = gargConfig
, _w_env_logger = wioLogger
, _w_env_pool = wPool
, _w_env_nodeStory = wNodeStory
, _w_env_mail = errorTrace "[wEnv] w_env_mail requested but not available"
, _w_env_nlp = nlpServerMap $ gargConfig ^. gc_nlp_config
, _w_env_job_state }
wState <- initWorkerState wEnv (fromJust $ head $ gargConfig ^. gc_worker . wsDefinitions)
test_worker_tid <- forkIO $ Worker.run wState
pure $ TestEnv { test_db = DBHandle pool db
, test_config = gargConfig
, test_nodeStory
, test_usernameGen = ugen
, test_logger = logger
, test_worker_tid
}
withTestNotificationConfig gargConfig0 $ \gargConfig -> do
let log_cfg = gargConfig ^. gc_logging
let idleTime = 60.0
let maxResources = 2
let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
pool <- newPool (setNumStripes (Just 2) poolConfig)
bootstrapDB db pool gargConfig
ugen <- emptyCounter
test_nodeStory <- fromDBNodeStoryEnv pool
withLoggerIO log_cfg $ \logger -> do
let wPoolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
wPool <- newPool (setNumStripes (Just 2) wPoolConfig)
wNodeStory <- fromDBNodeStoryEnv wPool
_w_env_job_state <- newTVarIO Nothing
withLoggerIO log_cfg $ \wioLogger -> do
let wEnv = WorkerEnv { _w_env_config = gargConfig
, _w_env_logger = wioLogger
, _w_env_pool = wPool
, _w_env_nodeStory = wNodeStory
, _w_env_mail = errorTrace "[wEnv] w_env_mail requested but not available"
, _w_env_nlp = nlpServerMap $ gargConfig ^. gc_nlp_config
, _w_env_job_state }
wState <- initWorkerState wEnv (fromJust $ head $ gargConfig ^. gc_worker . wsDefinitions)
test_worker_tid <- forkIO $ Worker.run wState
pure $ TestEnv { test_db = DBHandle pool db
, test_config = gargConfig
, test_nodeStory
, test_usernameGen = ugen
, test_logger = logger
, test_worker_tid
}
withTestDB :: (TestEnv -> IO ()) -> IO ()
withTestDB = bracket setup teardown
......@@ -134,7 +153,7 @@ testEnvToPgConnectionInfo TestEnv{..} =
PG.ConnectInfo { PG.connectHost = "0.0.0.0"
, PG.connectPort = fromIntegral $ fromMaybe 5432
$ getLast
$ Opts.port
$ Client.port
$ Tmp.toConnectionOptions
$ _DBTmp test_db
, PG.connectUser = dbUser
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment