[notification] test WS unsubscription

Ref: #238, #341, #418
parent 1bec4e19
Pipeline #6963 passed with stages
in 33 minutes and 54 seconds
...@@ -20,7 +20,7 @@ module Test.API.Notifications ( ...@@ -20,7 +20,7 @@ module Test.API.Notifications (
import Control.Concurrent (threadDelay) import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TSem (newTSem, signalTSem) import Control.Concurrent.STM.TSem (newTSem, signalTSem, waitTSem)
import Control.Lens ((^.)) import Control.Lens ((^.))
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
...@@ -31,6 +31,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT ...@@ -31,6 +31,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.System.Logging (logMsg, LogLevel(DEBUG), withLogger) import Gargantext.System.Logging (logMsg, LogLevel(DEBUG), withLogger)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Prelude import Prelude
import System.Timeout qualified as Timeout
import Test.API.Setup (SpecContext(..), withTestDBAndPort) import Test.API.Setup (SpecContext(..), withTestDBAndPort)
import Test.Database.Types (test_config) import Test.Database.Types (test_config)
import Test.Hspec import Test.Hspec
...@@ -51,14 +52,14 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -51,14 +52,14 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
-- This semaphore is used to inform the main thread that the WS -- This semaphore is used to inform the main thread that the WS
-- client has subscribed. I think it's better to use async -- client has subscribed. I think it's better to use async
-- locking mechanisms than blindly call 'threadDelay'. -- locking mechanisms than blindly call 'threadDelay'.
waitWSTSem <- atomically $ newTSem 0 wsTSem <- atomically $ newTSem 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification)) tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection -- setup a websocket connection
let wsConnect conn = withLogger () $ \ioL -> do let wsConnect conn = withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic -- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic) WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request -- inform the test process that we sent the subscription request
atomically $ signalTSem waitWSTSem atomically $ signalTSem wsTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification" -- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn d <- WS.receiveData conn
...@@ -68,7 +69,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -68,7 +69,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
-- wait for ws process to inform us about topic subscription -- wait for ws process to inform us about topic subscription
waitForTSem waitWSTSem 500 waitForTSem wsTSem 500
threadDelay 300_000 threadDelay 300_000
CE.notify nc $ CET.Ping CE.notify nc $ CET.Ping
...@@ -76,18 +77,74 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -76,18 +77,74 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
-- the ping value that should come from the notification -- the ping value that should come from the notification
waitForTChanValue tchan (Just DT.NPing) 1_000 waitForTChanValue tchan (Just DT.NPing) 1_000
it "ping WS unsubscribe works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
let topic = DT.Ping
-- Setup a WS client connection. Subscribe to a topic and
-- confirm the notification works. Then unsubscribe from it, and
-- check that a new notification didn't arrive.
wsTSem <- atomically $ newTSem 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect conn = withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request
atomically $ signalTSem wsTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn
-- logMsg ioL DEBUG $ "[wsConnect] received " <> show d
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
-- now ubsubscribe from a topic and make sure nothing arrives
WS.sendTextData conn $ Aeson.encode (DT.WSUnsubscribe topic)
-- Signal that we finished unsubscribing
atomically $ signalTSem wsTSem
mTimeout <- Timeout.timeout (200_000) $ do
-- NOTE This shouldn't happen now, we will test the tchan
d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
case mTimeout of
-- It should have timed out
Nothing -> atomically $ writeTChan tchan Nothing
-- | write something incorrect so the test will fail
Just _ -> atomically $ writeTChan tchan (Just DT.NPing)
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
-- wait for ws process to inform us about topic subscription
waitForTSem wsTSem 500
threadDelay 300_000
CE.notify nc $ CET.Ping
-- the ping value that should come from the notification
waitForTChanValue tchan (Just DT.NPing) 1_000
-- wait for lock from ws (it should have unsubscribed by now)
waitForTSem wsTSem 500
-- send the notification (which the client shouldn't receive)
CE.notify nc $ CET.Ping
-- wait for the value
waitForTChanValue tchan Nothing 1_000
it "simple update tree WS notification works" $ \(SpecContext testEnv port _app _) -> do it "simple update tree WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config let nc = (test_config testEnv) ^. gc_notifications_config
let topic = DT.UpdateTree 0 let topic = DT.UpdateTree 0
waitWSTSem <- atomically $ newTSem 0 -- initially locked wsTSem <- atomically $ newTSem 0 -- initially locked
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification)) tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection -- setup a websocket connection
let wsConnect conn = withLogger () $ \ioL -> do let wsConnect conn = withLogger () $ \ioL -> do
-- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic -- logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic) WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
-- inform the test process that we sent the subscription request -- inform the test process that we sent the subscription request
atomically $ signalTSem waitWSTSem atomically $ signalTSem wsTSem
-- logMsg ioL DEBUG $ "[wsConnect] waiting for notification" -- logMsg ioL DEBUG $ "[wsConnect] waiting for notification"
d <- WS.receiveData conn d <- WS.receiveData conn
...@@ -96,7 +153,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -96,7 +153,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
atomically $ writeTChan tchan dec atomically $ writeTChan tchan dec
withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do withAsyncWSConnection ("127.0.0.1", port) wsConnect $ \_a -> do
waitForTSem waitWSTSem 500 waitForTSem wsTSem 500
let nodeId = 0 let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment