[tests] fix notifications test

This was because WS connection wasn't closed properly.
parent 2df4c3bd
Pipeline #6896 passed with stages
in 54 minutes and 41 seconds
......@@ -14,12 +14,11 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
{-# LANGUAGE TypeOperators #-}
module Gargantext.Core.Notifications.Dispatcher (
Dispatcher -- opaque
, newDispatcher
, terminateDispatcher
, withDispatcher
-- * Querying a dispatcher
, dispatcherSubscriptions
......@@ -53,7 +52,6 @@ Dispatcher is a service, which provides couple of functionalities:
data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
......@@ -72,9 +70,16 @@ newDispatcher nc = do
d_ce_listener <- forkIO (dispatcherListener nc subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions
-- , d_ws_server = server
, d_ce_listener = d_ce_listener }
withDispatcher :: NotificationsConfig -> (Dispatcher -> IO a) -> IO a
withDispatcher nc cb = do
subscriptions <- SSet.newIO
Async.withAsync (dispatcherListener nc subscriptions) $ \a -> do
let dispatcher = Dispatcher { d_subscriptions = subscriptions
, d_ce_listener = Async.asyncThreadId a }
cb dispatcher
-- | This is a nanomsg socket listener. We want to read the messages
......@@ -142,6 +142,7 @@ getWSKey pc = do
-- Sec-WebSocket-Key so we want to make that even more unique.
uuid <- liftBase $ UUID.nextRandom
let key = key' <> "-" <> show uuid
liftBase $ putText $ "[getWSKey] request headers: " <> (show $ WS.requestHeaders reqHead)
liftBase $ withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[wsLoop, getWSKey] request headers: " <> (show $ WS.requestHeaders reqHead)
pure key
......@@ -11,25 +11,30 @@ Portability : POSIX
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Notifications (
) where
import Control.Concurrent (forkIO, killThread, threadDelay)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan
import Control.Exception.Safe qualified as Exc
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.Maybe (isJust)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Network.WebSockets.Client qualified as WS
import Network.WebSockets.Connection qualified as WS
import Network.WebSockets qualified as WS
import Prelude
import Test.API.Setup (withTestDBAndNotifications) -- , setupEnvironment, createAliceAndBob)
import Test.API.Setup (withTestDBAndNotifications)
import Test.Hspec
import Test.Instances ()
......@@ -37,41 +42,55 @@ import Test.Instances ()
tests :: NotificationsConfig -> D.Dispatcher -> Spec
tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do
describe "Dispatcher, Central Exchange, WebSockets" $ do
it "simple WS notification works" $ \((_testEnv, port), _) -> do
it "simple WS notification works" $ \((testEnv, port), _) -> do
let topic = DT.UpdateTree 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect = do
WS.runClient "" port "/ws" $ \conn -> do
let wsConnect =
withWSConnection ("", port, "/ws") $ \conn -> do
-- We wait a bit before the server settles
threadDelay (100 * millisecond)
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
-- atomically $ TVar.writeTVar tvar (Aeson.decode d)
putStrLn "[WSClient] after"
-- wait a bit to settle
threadDelay (100 * millisecond)
wsConnection <- forkIO $ wsConnect
-- wait a bit to connect
threadDelay (100 * millisecond)
threadDelay (500 * millisecond)
CE.notify nc $ CET.UpdateTreeFirstLevel 0
withAsync wsConnect $ \_a -> do
-- wait a bit to connect
threadDelay (500 * millisecond)
CE.notify nc $ CET.UpdateTreeFirstLevel 0
md <- atomically $ readTChan tchan
md `shouldSatisfy` isJust
let (Just (DT.Notification topic' message')) = md
topic' `shouldBe` topic
message' `shouldBe` DT.MEmpty
-- d <- TVar.readTVarIO tvar
md <- atomically $ readTChan tchan
killThread wsConnection
millisecond :: Int
millisecond = 1000
md `shouldSatisfy` isJust
let (Just (DT.Notification topic' message')) = md
topic' `shouldBe` topic
message' `shouldBe` DT.MEmpty
-- | Wrap the logic of asynchronous connection closing
-- https://hackage.haskell.org/package/websockets-
withWSConnection :: (String, Int, String) -> WS.ClientApp () -> IO ()
withWSConnection (host, port, path) cb =
WS.runClient host port path $ \conn -> do
cb conn
millisecond :: Int
millisecond = 1000
-- shut down gracefully, otherwise a 'ConnectionException' is thrown
WS.sendClose conn ("" :: BS.ByteString)
-- wait for close response, should throw a 'CloseRequest' exception
Exc.catches (void $ WS.receiveDataMessage conn)
[ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ -> putStrLn "[withWSConnection] closeRequest caught"
_ -> Exc.throw err
-- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Setup (
......@@ -18,7 +19,7 @@ import Data.ByteString.Lazy.Char8 qualified as C8L
import Data.Cache qualified as InMemory
import Data.Streaming.Network (bindPortTCP)
import Gargantext.API (makeApp)
import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..))
import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env(..), env_dispatcher)
import Gargantext.API.Admin.Settings
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
......@@ -49,6 +50,7 @@ import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp (runSettingsSocket)
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.Warp.Internal
import Network.WebSockets qualified as WS
import Prelude
import Servant.Auth.Client ()
import Servant.Client
......@@ -150,11 +152,26 @@ withTestDBAndPort action =
withTestDBAndNotifications :: D.Dispatcher -> (((TestEnv, Warp.Port), Application) -> IO ()) -> IO ()
withTestDBAndNotifications dispatcher action = do
withTestDB $ \testEnv -> do
app <- withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
makeApp $ env { _env_dispatcher = dispatcher }
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app)
withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
<&> env_dispatcher .~ dispatcher
app <- makeApp env
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
-- An exception can be thrown by the websocket server (when client closes connection)
-- TODO I don't quite understand why the exception has to be caught here
-- and not under 'WS.runClient'
catches (Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app))
[ Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ ->
withLogger () $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndNotifications] closeRequest caught"
_ -> throw err
-- re-throw any other exceptions
, Handler $ \(err :: SomeException) -> throw err ]
-- | Starts the backend server /and/ the microservices proxy, the former at
-- a random port, the latter at a predictable port.
......@@ -4,6 +4,7 @@ module Main where
import Gargantext.Prelude hiding (isInfixOf)
import Control.Concurrent.Async (asyncThreadId, withAsync)
import Control.Monad
import Data.Text (isInfixOf)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
......@@ -45,20 +46,13 @@ stopCoreNLPServer ph = do
interruptProcessGroupOf ph
putText "calling stop core nlp - done"
withNotifications :: ((NotificationsConfig, ThreadId, D.Dispatcher) -> IO a) -> IO a
withNotifications = bracket startNotifications stopNotifications
startNotifications :: IO (NotificationsConfig, ThreadId, D.Dispatcher)
startNotifications = do
central_exchange <- forkIO $ CE.gServer nc
dispatcher <- D.newDispatcher nc
pure (nc, central_exchange, dispatcher)
stopNotifications :: (NotificationsConfig, ThreadId, D.Dispatcher) -> IO ()
stopNotifications (_nc, central_exchange, dispatcher) = do
putText "calling stop notifications"
killThread central_exchange
D.terminateDispatcher dispatcher
putText "calling stop notifications - done"
withNotifications cb = do
withAsync (CE.gServer nc) $ \ceA -> do
D.withDispatcher nc $ \d -> do
cb (nc, asyncThreadId ceA, d)
nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
......@@ -82,9 +76,10 @@ main = do
hSetBuffering stdout NoBuffering
-- TODO Ideally remove start/stop notifications and use
-- Test/API/Setup to initialize this in env
withNotifications $ \(nc, _ce, dispatcher) -> do
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> do
withNotifications $ \(nc, _ce, dispatcher) -> hspec $ do
API.tests nc dispatcher
hspec $ do
