[tests] notifications: test async notifications for update tree

Related to
#418
parent 874785e9
Pipeline #7256 canceled with stages
in 3 minutes and 34 seconds
......@@ -11,6 +11,7 @@ Portability : POSIX
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
......@@ -20,28 +21,37 @@ module Test.API.Notifications (
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TSem (newTSem, signalTSem)
import Control.Concurrent.STM.TSem (newTSem, signalTSem, TSem)
import Control.Lens ((^.))
import Control.Monad (void)
import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson
import Fmt ((+|), (|+))
import Gargantext.API.Admin.Auth.Types (AuthResponse, authRes_token, authRes_tree_id)
import Gargantext.Core.Config (gc_notifications_config)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Types.Individu (GargPassword(..))
import Gargantext.System.Logging (withLogger)
import Network.WebSockets qualified as WS
import Prelude
import System.Timeout qualified as Timeout
import Test.API.Setup (SpecContext(..), withTestDBAndPort)
import Test.API.Prelude (newCorpusForUser)
import Test.API.Routes (mkUrl)
import Test.API.Setup (SpecContext(..), dbEnvSetup, withTestDBAndPort)
import Test.Database.Types (test_config)
import Test.Hspec
import Test.Hspec.Wai.Internal (withApplication)
import Test.Instances ()
import Test.Utils (waitForTChanValue, waitForTSem)
import Text.RawString.QQ (r)
import Test.Utils (protected, waitForTChanValue, waitForTSem, withValidLoginA)
import Test.Utils.Notifications (withAsyncWSConnection)
tests :: Spec
tests = sequential $ aroundAll withTestDBAndPort $ do
tests = sequential $ around withTestDBAndPort $ do
describe "Notifications" $ do
it "ping WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
......@@ -54,20 +64,8 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
-- locking mechanisms than blindly call 'threadDelay'.
wsTSem <- atomically $ newTSem 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect conn = withLogger () $ \_ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request
atomically $ signalTSem wsTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn
-- logMsg ioL DEBUG $ "[wsConnect] received " <> show d
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
withAsyncWSConnection ("127.0.0.1", port) (wsConnection topic wsTSem tchan) $ \_a -> do
-- wait for ws process to inform us about topic subscription
waitForTSem wsTSem 500
......@@ -133,14 +131,78 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
-- wait for the value
waitForTChanValue tchan Nothing 1_000
it "simple update tree WS notification works" $ \(SpecContext testEnv port _app _) -> do
describe "Update tree notifications" $ do
it "simple WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
let topic = DT.UpdateTree 0
wsTSem <- atomically $ newTSem 0 -- initially locked
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect conn = withLogger () $ \_ioL -> do
withAsyncWSConnection ("127.0.0.1", port) (wsConnection topic wsTSem tchan) $ \_a -> do
waitForTSem wsTSem 500
let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
waitForTChanValue tchan (Just $ DT.NUpdateTree nodeId) 1_000
it "WS notification on node creation works" $ \ctx@(SpecContext _testEnv port app _) -> do
checkNotification ctx $ \authRes -> do
let token = authRes ^. authRes_token
let treeId = authRes ^. authRes_tree_id
let query = [r| {"pn_name": "test", "pn_typename": "NodeCorpus"} |]
void $ withApplication app $ do
protected token "POST" (mkUrl port $ "/node/" +| treeId |+ "") query
it "WS notification on node deletion works" $ \ctx@(SpecContext testEnv port app _) -> do
checkNotification ctx $ \authRes -> do
let token = authRes ^. authRes_token
cId <- newCorpusForUser testEnv "alice"
void $ withApplication app $ do
protected token "DELETE" (mkUrl port $ "/node/" +| cId |+ "") ""
it "WS notification on node rename works" $ \ctx@(SpecContext testEnv port app _) -> do
checkNotification ctx $ \authRes -> do
let token = authRes ^. authRes_token
cId <- newCorpusForUser testEnv "alice"
void $ withApplication app $ do
let query = [r| {"name": "newName"} |]
protected token "PUT" (mkUrl port $ "/node/" +| cId |+ "/rename") query
checkNotification :: SpecContext a
-> (AuthResponse -> IO ())
-> IO ()
checkNotification ctx@(SpecContext _testEnv port _app _) act = do
_ <- dbEnvSetup ctx
withValidLoginA port "alice" (GargPassword "alice") $ \_clientEnv authRes -> do
-- Subscribe to user tree notifications
let treeId = authRes ^. authRes_tree_id
let topic = DT.UpdateTree treeId
wsTSem <- atomically $ newTSem 0 -- initially locked
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
withAsyncWSConnection ("127.0.0.1", port) (wsConnection topic wsTSem tchan) $ \_a -> do
waitForTSem wsTSem 500
act authRes
waitForTChanValue tchan (Just $ DT.NUpdateTree treeId) 1_000
wsConnection :: DT.Topic
-> TSem
-> TChan (Maybe DT.Notification)
-> WS.Connection
-> IO ()
wsConnection topic wsTSem tchan conn = withLogger () $ \_ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request
......@@ -152,12 +214,3 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
waitForTSem wsTSem 500
let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
waitForTChanValue tchan (Just $ DT.NUpdateTree nodeId) 1_000
......@@ -16,8 +16,7 @@ import Data.Aeson qualified as JSON
import Data.Text qualified as T
import Gargantext.API.Errors
import Gargantext.Core.Types.Individu
import Gargantext.Core.Types (NodeId)
import Gargantext.Core.Types (NodeType(..))
import Gargantext.Core.Types (NodeId, NodeType(..))
import Gargantext.Core.Worker.Env () -- instance HasNodeError
import Gargantext.Database.Action.User
import Gargantext.Database.Admin.Types.Hyperdata.Corpus
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment