[tests] fix notifications test

This was because WS connection wasn't closed properly.
parent 0457d4c4
Pipeline #6860 failed with stages
...@@ -14,12 +14,11 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -14,12 +14,11 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# LANGUAGE TypeOperators #-}
module Gargantext.Core.Notifications.Dispatcher ( module Gargantext.Core.Notifications.Dispatcher (
Dispatcher -- opaque Dispatcher -- opaque
, newDispatcher , newDispatcher
, terminateDispatcher , terminateDispatcher
, withDispatcher
-- * Querying a dispatcher -- * Querying a dispatcher
, dispatcherSubscriptions , dispatcherSubscriptions
...@@ -53,7 +52,6 @@ Dispatcher is a service, which provides couple of functionalities: ...@@ -53,7 +52,6 @@ Dispatcher is a service, which provides couple of functionalities:
data Dispatcher = data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId , d_ce_listener :: ThreadId
} }
...@@ -72,9 +70,16 @@ newDispatcher nc = do ...@@ -72,9 +70,16 @@ newDispatcher nc = do
d_ce_listener <- forkIO (dispatcherListener nc subscriptions) d_ce_listener <- forkIO (dispatcherListener nc subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions pure $ Dispatcher { d_subscriptions = subscriptions
-- , d_ws_server = server
, d_ce_listener = d_ce_listener } , d_ce_listener = d_ce_listener }
withDispatcher :: NotificationsConfig -> (Dispatcher -> IO a) -> IO a
withDispatcher nc cb = do
subscriptions <- SSet.newIO
Async.withAsync (dispatcherListener nc subscriptions) $ \a -> do
let dispatcher = Dispatcher { d_subscriptions = subscriptions
, d_ce_listener = Async.asyncThreadId a }
cb dispatcher
-- | This is a nanomsg socket listener. We want to read the messages -- | This is a nanomsg socket listener. We want to read the messages
......
...@@ -142,6 +142,7 @@ getWSKey pc = do ...@@ -142,6 +142,7 @@ getWSKey pc = do
-- Sec-WebSocket-Key so we want to make that even more unique. -- Sec-WebSocket-Key so we want to make that even more unique.
uuid <- liftBase $ UUID.nextRandom uuid <- liftBase $ UUID.nextRandom
let key = key' <> "-" <> show uuid let key = key' <> "-" <> show uuid
liftBase $ putText $ "[getWSKey] request headers: " <> (show $ WS.requestHeaders reqHead) liftBase $ withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG $ "[wsLoop, getWSKey] request headers: " <> (show $ WS.requestHeaders reqHead)
pure key pure key
...@@ -11,25 +11,30 @@ Portability : POSIX ...@@ -11,25 +11,30 @@ Portability : POSIX
{-# OPTIONS_GHC -Wno-orphans #-} {-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Notifications ( module Test.API.Notifications (
tests tests
) where ) where
import Control.Concurrent (forkIO, killThread, threadDelay) import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Exception.Safe qualified as Exc
import Control.Monad (void)
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.Maybe (isJust) import Data.Maybe (isJust)
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Notifications.Dispatcher qualified as D import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Network.WebSockets.Client qualified as WS import Network.WebSockets qualified as WS
import Network.WebSockets.Connection qualified as WS
import Prelude import Prelude
import Test.API.Setup (withTestDBAndNotifications) -- , setupEnvironment, createAliceAndBob) import Test.API.Setup (withTestDBAndNotifications)
import Test.Hspec import Test.Hspec
import Test.Instances () import Test.Instances ()
...@@ -37,35 +42,29 @@ import Test.Instances () ...@@ -37,35 +42,29 @@ import Test.Instances ()
tests :: NotificationsConfig -> D.Dispatcher -> Spec tests :: NotificationsConfig -> D.Dispatcher -> Spec
tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do
describe "Dispatcher, Central Exchange, WebSockets" $ do describe "Dispatcher, Central Exchange, WebSockets" $ do
it "simple WS notification works" $ \((_testEnv, port), _) -> do it "simple WS notification works" $ \((testEnv, port), _) -> do
let topic = DT.UpdateTree 0 let topic = DT.UpdateTree 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification)) tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection -- setup a websocket connection
let wsConnect = do let wsConnect =
WS.runClient "127.0.0.1" port "/ws" $ \conn -> do withWSConnection ("127.0.0.1", port, "/ws") $ \conn -> do
-- We wait a bit before the server settles -- We wait a bit before the server settles
threadDelay (100 * millisecond) threadDelay (100 * millisecond)
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic) WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec atomically $ writeTChan tchan dec
-- atomically $ TVar.writeTVar tvar (Aeson.decode d)
putStrLn "[WSClient] after"
-- wait a bit to settle -- wait a bit to settle
threadDelay (100 * millisecond) threadDelay (100 * millisecond)
wsConnection <- forkIO $ wsConnect withAsync wsConnect $ \_a -> do
-- wait a bit to connect -- wait a bit to connect
threadDelay (100 * millisecond)
threadDelay (500 * millisecond) threadDelay (500 * millisecond)
CE.notify nc $ CET.UpdateTreeFirstLevel 0 CE.notify nc $ CET.UpdateTreeFirstLevel 0
-- d <- TVar.readTVarIO tvar
md <- atomically $ readTChan tchan md <- atomically $ readTChan tchan
killThread wsConnection
md `shouldSatisfy` isJust md `shouldSatisfy` isJust
let (Just (DT.Notification topic' message')) = md let (Just (DT.Notification topic' message')) = md
...@@ -75,3 +74,23 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc ...@@ -75,3 +74,23 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc
millisecond :: Int millisecond :: Int
millisecond = 1000 millisecond = 1000
-- | Wrap the logic of asynchronous connection closing
-- https://hackage.haskell.org/package/websockets-0.13.0.0/docs/Network-WebSockets-Connection.html#v:sendClose
withWSConnection :: (String, Int, String) -> WS.ClientApp () -> IO ()
withWSConnection (host, port, path) cb =
WS.runClient host port path $ \conn -> do
cb conn
-- shut down gracefully, otherwise a 'ConnectionException' is thrown
WS.sendClose conn ("" :: BS.ByteString)
-- wait for close response, should throw a 'CloseRequest' exception
Exc.catches (void $ WS.receiveDataMessage conn)
[ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ -> putStrLn "[withWSConnection] closeRequest caught"
_ -> Exc.throw err
-- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Setup ( module Test.API.Setup (
SpecContext(..) SpecContext(..)
...@@ -46,6 +47,7 @@ import Network.Wai qualified as Wai ...@@ -46,6 +47,7 @@ import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp (runSettingsSocket) import Network.Wai.Handler.Warp (runSettingsSocket)
import Network.Wai.Handler.Warp qualified as Warp import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.Warp.Internal import Network.Wai.Handler.Warp.Internal
import Network.WebSockets qualified as WS
import Prelude import Prelude
import Servant.Auth.Client () import Servant.Auth.Client ()
import Servant.Client import Servant.Client
...@@ -157,7 +159,20 @@ withTestDBAndNotifications dispatcher action = do ...@@ -157,7 +159,20 @@ withTestDBAndNotifications dispatcher action = do
app <- makeApp env app <- makeApp env
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions } let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app) -- An exception can be thrown by the websocket server (when client closes connection)
-- TODO I don't quite understand why the exception has to be caught here
-- and not under 'WS.runClient'
catches (Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app))
[ Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ ->
withLogger () $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndNotifications] closeRequest caught"
_ -> throw err
-- re-throw any other exceptions
, Handler $ \(err :: SomeException) -> throw err ]
-- | Starts the backend server /and/ the microservices proxy, the former at -- | Starts the backend server /and/ the microservices proxy, the former at
-- a random port, the latter at a predictable port. -- a random port, the latter at a predictable port.
......
...@@ -4,6 +4,7 @@ module Main where ...@@ -4,6 +4,7 @@ module Main where
import Gargantext.Prelude hiding (isInfixOf) import Gargantext.Prelude hiding (isInfixOf)
import Control.Concurrent.Async (asyncThreadId, withAsync)
import Control.Monad import Control.Monad
import Data.Text (isInfixOf) import Data.Text (isInfixOf)
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange qualified as CE
...@@ -44,20 +45,13 @@ stopCoreNLPServer ph = do ...@@ -44,20 +45,13 @@ stopCoreNLPServer ph = do
interruptProcessGroupOf ph interruptProcessGroupOf ph
putText "calling stop core nlp - done" putText "calling stop core nlp - done"
withNotifications :: ((NotificationsConfig, ThreadId, D.Dispatcher) -> IO a) -> IO a withNotifications :: ((NotificationsConfig, ThreadId, D.Dispatcher) -> IO a) -> IO a
withNotifications = bracket startNotifications stopNotifications withNotifications cb = do
where withAsync (CE.gServer nc) $ \ceA -> do
startNotifications :: IO (NotificationsConfig, ThreadId, D.Dispatcher) D.withDispatcher nc $ \d -> do
startNotifications = do cb (nc, asyncThreadId ceA, d)
central_exchange <- forkIO $ CE.gServer nc
dispatcher <- D.newDispatcher nc
pure (nc, central_exchange, dispatcher)
stopNotifications :: (NotificationsConfig, ThreadId, D.Dispatcher) -> IO ()
stopNotifications (_nc, central_exchange, dispatcher) = do
putText "calling stop notifications"
killThread central_exchange
D.terminateDispatcher dispatcher
putText "calling stop notifications - done"
nc :: NotificationsConfig nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560" nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
...@@ -81,9 +75,10 @@ main = do ...@@ -81,9 +75,10 @@ main = do
hSetBuffering stdout NoBuffering hSetBuffering stdout NoBuffering
-- TODO Ideally remove start/stop notifications and use -- TODO Ideally remove start/stop notifications and use
-- Test/API/Setup to initialize this in env -- Test/API/Setup to initialize this in env
withNotifications $ \(nc, _ce, dispatcher) -> do bracket startCoreNLPServer stopCoreNLPServer $ \_ -> do
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do withNotifications $ \(nc, _ce, dispatcher) -> hspec $ do
API.tests nc dispatcher API.tests nc dispatcher
hspec $ do
ReverseProxy.tests ReverseProxy.tests
DB.tests DB.tests
DB.nodeStoryTests DB.nodeStoryTests
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment