[notifications] improve tests, ref #418, #238, #341

parent 8fb583cb
Pipeline #6962 passed with stages
in 38 minutes and 38 seconds
......@@ -66,6 +66,7 @@ wsServer = WSAPI { wsAPIServer = streamData }
) [ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.ConnectionClosed -> logM DEBUG $ "[wsServer] connection closed"
WS.CloseRequest _ _ -> logM DEBUG $ "[wsServer] close request"
_ -> Exc.throw err ]
......
......@@ -14,6 +14,7 @@ module Gargantext.System.Logging (
) where
import Control.Exception.Safe (MonadMask, bracket)
import Control.Monad (when)
import Control.Monad.IO.Class
import Control.Monad.Trans.Control
import Data.Kind (Type)
......@@ -142,9 +143,7 @@ instance HasLogger IO where
destroyLogger _ = pure ()
logMsg (IOLogger minLvl) lvl msg = do
t <- getCurrentTime
if lvl < minLvl
then pure ()
else do
when (lvl >= minLvl) $ do
let pfx = "[" <> show t <> "] [" <> show lvl <> "] "
putStrLn $ pfx <> msg
logTxt lgr lvl msg = logMsg lgr lvl (T.unpack msg)
......@@ -21,7 +21,7 @@ tests :: Spec
tests = parallel $ aroundAll withTestDBAndPort $ beforeAllWith dbEnvSetup $ do
describe "GraphQL" $ do
describe "get_user_infos" $ do
it "allows 'alice' to see her own info" $ \(SpecContext testEnv port app _) -> do
it "allows 'alice' to see her own info" $ \(SpecContext _testEnv port app _) -> do
withApplication app $ do
withValidLogin port "alice" (GargPassword "alice") $ \_clientEnv token -> do
let query = [r| { "query": "{ user_infos(user_id: 2) { ui_id, ui_email } }" } |]
......
......@@ -19,12 +19,11 @@ module Test.API.Notifications (
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TSem (newTSem, signalTSem)
import Control.Lens ((^.))
import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson
import Data.Maybe (isJust)
import Gargantext.Core.Config (gc_notifications_config)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
......@@ -32,47 +31,76 @@ import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.System.Logging (logMsg, LogLevel(DEBUG), withLogger)
import Network.WebSockets qualified as WS
import Prelude
import System.Timeout qualified as Timeout
import Test.API.Setup (SpecContext(..), withTestDBAndPort)
import Test.Database.Types (test_config)
import Test.Hspec
import Test.Instances ()
import Test.Utils.Notifications
import Test.Utils (waitForTChanValue, waitForTSem)
import Test.Utils.Notifications (withAsyncWSConnection)
tests :: Spec
tests = sequential $ aroundAll withTestDBAndPort $ do
describe "Notifications" $ do
it "simple WS notification works" $ \(SpecContext testEnv port _app _) -> do
it "ping WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
-- withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[ping WS notification works] nc: " <> show nc
let topic = DT.UpdateTree 0
let topic = DT.Ping
-- This semaphore is used to inform the main thread that the WS
-- client has subscribed. I think it's better to use async
-- locking mechanisms than blindly call 'threadDelay'.
waitWSTSem <- atomically $ newTSem 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect =
withWSConnection ("127.0.0.1", port) $ \conn -> do
-- We wait a bit before the server settles
-- threadDelay (100 * millisecond)
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
let wsConnect conn = withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request
atomically $ signalTSem waitWSTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn
-- logMsg ioL DEBUG $ "[wsConnect] received " <> show d
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
-- wait a bit to settle
threadDelay (100 * millisecond)
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
-- wait for ws process to inform us about topic subscription
waitForTSem waitWSTSem 500
threadDelay 300_000
CE.notify nc $ CET.Ping
-- the ping value that should come from the notification
waitForTChanValue tchan (Just DT.NPing) 1_000
it "simple update tree WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
let topic = DT.UpdateTree 0
waitWSTSem <- atomically $ newTSem 0 -- initially locked
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect conn = withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request
atomically $ signalTSem waitWSTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn
-- logMsg ioL DEBUG $ "[wsConnect] received " <> show d
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
withAsync wsConnect $ \_a -> do
-- wait a bit to connect
threadDelay (500 * millisecond)
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
waitForTSem waitWSTSem 500
let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
mTimeout <- Timeout.timeout (5 * 1000000) $ do
md <- atomically $ readTChan tchan
waitForTChanValue tchan (Just $ DT.NUpdateTree nodeId) 1_000
md `shouldBe` Just (DT.NUpdateTree nodeId)
mTimeout `shouldSatisfy` isJust
......@@ -115,7 +115,8 @@ withTestDBAndPort action = withNotifications nc $ \dispatcher -> do
-- An exception can be thrown by the websocket server (when client closes connection)
-- TODO I don't quite understand why the exception has to be caught here
-- and not under 'WS.runClient'
catches (Warp.testWithApplicationSettings stgs (pure app) $ \port -> action (SpecContext testEnv port app ()))
catches (Warp.testWithApplicationSettings stgs (pure app) $ \port ->
action (SpecContext testEnv port app ()))
[ Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.CloseRequest _ _ ->
......
......@@ -23,6 +23,7 @@ import Data.IORef
import Data.Map qualified as Map
import Data.Pool
import Data.Text qualified as T
import Data.Time.Clock (getCurrentTime)
import Database.PostgreSQL.Simple qualified as PG
import Database.Postgres.Temp qualified as Tmp
import Gargantext hiding (to)
......@@ -146,6 +147,7 @@ instance HasLogger (GargM TestEnv BackendInternalError) where
pure $ GargTestLogger mode test_logger_set
destroyLogger GargTestLogger{..} = liftIO $ FL.rmLoggerSet test_logger_set
logMsg (GargTestLogger mode logger_set) lvl msg = do
t <- liftIO $ getCurrentTime
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
......
......@@ -3,6 +3,8 @@
module Test.Utils where
import Control.Concurrent.STM.TChan (TChan, readTChan)
import Control.Concurrent.STM.TSem (TSem, waitTSem)
import Control.Concurrent.STM.TVar (newTVarIO, writeTVar, readTVarIO)
import Control.Exception.Safe ()
import Control.Monad ()
......@@ -318,3 +320,24 @@ waitUntil pred' timeoutMs = do
else do
threadDelay 50000
performTest
-- wait for given number of milliseconds for a given tchan value
waitForTChanValue :: (HasCallStack, Eq a, Show a) => TChan a -> a -> Int -> IO ()
waitForTChanValue tchan expected timeoutMs = do
mTimeout <- Timeout.timeout (timeoutMs * 1000) $ do
v <- atomically $ readTChan tchan
unless (v == expected) $ panicTrace $ "[waitForTChanValue] v != expected (" <> show v <> " != " <> show expected <> ")"
-- v `shouldBe` expected
-- no timeout should have occurred
-- mTimeout `shouldSatisfy` isJust
when (isNothing mTimeout) $
panicTrace $ "[waitForTChanValue] timeout when waiting for " <> show expected <> " on tchan"
waitForTSem :: HasCallStack => TSem -> Int -> IO ()
waitForTSem tsem timeoutMs = do
mTimeout <- Timeout.timeout (timeoutMs * 1000) $ do
atomically $ waitTSem tsem
when (isNothing mTimeout) $
panicTrace $ "[waitForTSem] timeout when waiting TSem"
......@@ -5,6 +5,7 @@
module Test.Utils.Notifications where
import Control.Concurrent.Async (Async, withAsync)
import Control.Exception.Safe qualified as Exc
import Control.Monad (void)
import Data.ByteString qualified as BS
......@@ -46,3 +47,9 @@ withWSConnection' (host, port, path) cb = Exc.catches (
-- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
-- | Same as 'withWSConnection', but spawns an async thread
withAsyncWSConnection :: (String, Int) -> WS.ClientApp () -> (Async () -> IO ()) -> IO ()
withAsyncWSConnection hp wsCb cb =
withAsync (withWSConnection hp wsCb) cb
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment