[tests] tests with waitUntil

This is hopefully better than dummy threadDelay's
parent 521a44e9
Pipeline #6431 failed with stages
in 19 minutes and 59 seconds
...@@ -17,7 +17,6 @@ Portability : POSIX ...@@ -17,7 +17,6 @@ Portability : POSIX
module Async.Worker.Broker.Types module Async.Worker.Broker.Types
( Queue ( Queue
-- , HasMessageId(..)
, HasBroker(..) , HasBroker(..)
, SerializableMessage , SerializableMessage
) )
...@@ -31,19 +30,6 @@ import Data.Typeable (Typeable) ...@@ -31,19 +30,6 @@ import Data.Typeable (Typeable)
type Queue = String type Queue = String
{-| A message in the queue system must have some properties. In
particular, it must have some sort of 'id'.
-}
-- class (Eq msgId, Show msgId, Typeable msgId) => HasMessageId msg msgId where
-- messageId :: msg -> msgId
-- class HasMessageId m where
-- data family Message m :: Type
-- type family MessageId m :: Type
-- messageId :: Message m -> MessageId m
{- NOTE There are 3 types of messages here: {- NOTE There are 3 types of messages here:
- 'a' the underlying, user-defined message - 'a' the underlying, user-defined message
- 'Job a' worker definition, containing message metadata - 'Job a' worker definition, containing message metadata
......
module Test.Integration.Utils module Test.Integration.Utils
( getPSQLEnvConnectInfo ( getPSQLEnvConnectInfo
, getRedisEnvConnectInfo , getRedisEnvConnectInfo
, randomQueueName ) , randomQueueName
, waitUntil
, waitUntilTVarEq
, waitUntilTVarPred )
where where
import Async.Worker.Broker qualified as B import Async.Worker.Broker qualified as B
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.TVar (TVar, readTVarIO)
import Control.Monad (unless)
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Database.PostgreSQL.Simple qualified as PSQL import Database.PostgreSQL.Simple qualified as PSQL
import Database.Redis qualified as Redis import Database.Redis qualified as Redis
import System.Environment (lookupEnv) import System.Environment (lookupEnv)
import System.Timeout qualified as Timeout
import Test.Hspec (expectationFailure, shouldBe, shouldSatisfy, Expectation, HasCallStack)
import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum) import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum)
-- | PSQL connect info that is fetched from env
getPSQLEnvConnectInfo :: IO PSQL.ConnectInfo getPSQLEnvConnectInfo :: IO PSQL.ConnectInfo
getPSQLEnvConnectInfo = do getPSQLEnvConnectInfo = do
pgUser <- lookupEnv "POSTGRES_USER" pgUser <- lookupEnv "POSTGRES_USER"
...@@ -24,13 +33,67 @@ getPSQLEnvConnectInfo = do ...@@ -24,13 +33,67 @@ getPSQLEnvConnectInfo = do
, PSQL.connectHost = fromMaybe "localhost" pgHost , PSQL.connectHost = fromMaybe "localhost" pgHost
, PSQL.connectPassword = fromMaybe "postgres" pgPass } , PSQL.connectPassword = fromMaybe "postgres" pgPass }
-- | Redis connect info that is fetched from env
getRedisEnvConnectInfo :: IO Redis.ConnectInfo getRedisEnvConnectInfo :: IO Redis.ConnectInfo
getRedisEnvConnectInfo = do getRedisEnvConnectInfo = do
redisHost <- lookupEnv "REDIS_HOST" redisHost <- lookupEnv "REDIS_HOST"
-- https://hackage.haskell.org/package/hedis-0.15.2/docs/Database-Redis.html#v:defaultConnectInfo -- https://hackage.haskell.org/package/hedis-0.15.2/docs/Database-Redis.html#v:defaultConnectInfo
pure $ Redis.defaultConnectInfo { Redis.connectHost = fromMaybe "localhost" redisHost } pure $ Redis.defaultConnectInfo { Redis.connectHost = fromMaybe "localhost" redisHost }
-- | Given a queue prefix, add a random suffix to create a queue name
randomQueueName :: B.Queue -> IO B.Queue randomQueueName :: B.Queue -> IO B.Queue
randomQueueName prefix = do randomQueueName prefix = do
postfix <- randomString (onlyAlphaNum randomASCII) 10 postfix <- randomString (onlyAlphaNum randomASCII) 10
return $ prefix <> "_" <> postfix return $ prefix <> "_" <> postfix
-- | Given a predicate IO action, test it for given number of
-- milliseconds or fail
waitUntil :: HasCallStack => IO Bool -> Int -> Expectation
waitUntil pred' timeoutMs = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
p <- pred'
unless p (expectationFailure "Predicate test failed")
where
performTest = do
p <- pred'
if p
then return ()
else do
threadDelay 50
performTest
-- | Similar to 'waitUntil' but specialized to 'TVar' equality checking
waitUntilTVarEq :: (HasCallStack, Show a, Eq a) => TVar a -> a -> Int -> Expectation
waitUntilTVarEq tvar expected timeoutMs = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
val <- readTVarIO tvar
val `shouldBe` expected
where
performTest = do
val <- readTVarIO tvar
if val == expected
then return ()
else do
threadDelay 50
performTest
-- | Similar to 'waitUntilTVarEq' but with predicate checking
waitUntilTVarPred :: (HasCallStack, Show a, Eq a) => TVar a -> (a -> Bool) -> Int -> Expectation
waitUntilTVarPred tvar predicate timeoutMs = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
val <- readTVarIO tvar
val `shouldSatisfy` predicate
where
performTest = do
val <- readTVarIO tvar
if predicate val
then return ()
else do
threadDelay 50
performTest
...@@ -28,7 +28,7 @@ import Control.Exception (bracket, Exception, throwIO) ...@@ -28,7 +28,7 @@ import Control.Exception (bracket, Exception, throwIO)
import Data.Aeson (ToJSON(..), FromJSON(..), object, (.=), (.:), withObject) import Data.Aeson (ToJSON(..), FromJSON(..), object, (.=), (.:), withObject)
import Data.Set qualified as Set import Data.Set qualified as Set
import Test.Hspec import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName) import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntilTVarEq, waitUntilTVarPred)
data TestEnv b = data TestEnv b =
...@@ -152,10 +152,7 @@ workerTests brokerInitParams = ...@@ -152,10 +152,7 @@ workerTests brokerInitParams =
let job = mkDefaultSendJob' broker queueName msg let job = mkDefaultSendJob' broker queueName msg
sendJob' job sendJob' job
threadDelay (500 * millisecond) waitUntilTVarEq events [ EMessageReceived msg, EJobFinished msg ] 500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobFinished msg ]
-- queue should be empty -- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -173,10 +170,7 @@ workerTests brokerInitParams = ...@@ -173,10 +170,7 @@ workerTests brokerInitParams =
let job = mkDefaultSendJob' broker queueName msg let job = mkDefaultSendJob' broker queueName msg
sendJob' job sendJob' job
threadDelay (500 * millisecond) waitUntilTVarEq events [ EMessageReceived msg, EJobError msg ] 500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobError msg ]
-- queue should be empty (error jobs archived by default) -- queue should be empty (error jobs archived by default)
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -194,10 +188,7 @@ workerTests brokerInitParams = ...@@ -194,10 +188,7 @@ workerTests brokerInitParams =
let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESDelete } let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESDelete }
sendJob' job sendJob' job
threadDelay (500 * millisecond) waitUntilTVarEq events [ EMessageReceived msg, EJobError msg ] 500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobError msg ]
-- queue should be empty (error job deleted) -- queue should be empty (error job deleted)
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -215,10 +206,7 @@ workerTests brokerInitParams = ...@@ -215,10 +206,7 @@ workerTests brokerInitParams =
let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESRepeat } let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESRepeat }
sendJob' job sendJob' job
threadDelay (100 * millisecond) waitUntilTVarEq events [ EMessageReceived msg, EJobError msg ] 500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobError msg ]
-- NOTE It doesn't make sense to check queue size here, the -- NOTE It doesn't make sense to check queue size here, the
-- worker just continues to run the errored task in background -- worker just continues to run the errored task in background
...@@ -237,10 +225,7 @@ workerTests brokerInitParams = ...@@ -237,10 +225,7 @@ workerTests brokerInitParams =
let job = mkDefaultSendJob broker queueName msg 1 let job = mkDefaultSendJob broker queueName msg 1
sendJob' job sendJob' job
threadDelay (2*second) waitUntilTVarPred events (\e -> take 2 e == [ EMessageReceived msg, EJobTimeout msg ]) 2500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobTimeout msg ]
-- NOTE It doesn't make sense to check queue size here, the -- NOTE It doesn't make sense to check queue size here, the
-- worker just continues to run the errored task in background -- worker just continues to run the errored task in background
...@@ -259,10 +244,7 @@ workerTests brokerInitParams = ...@@ -259,10 +244,7 @@ workerTests brokerInitParams =
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSArchive } let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSArchive }
sendJob' job sendJob' job
threadDelay (2*second) waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 2500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobTimeout msg ]
-- Queue should be empty, since we archive timed out jobs -- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -280,10 +262,7 @@ workerTests brokerInitParams = ...@@ -280,10 +262,7 @@ workerTests brokerInitParams =
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSDelete } let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSDelete }
sendJob' job sendJob' job
threadDelay (2*second) waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 2500
events2 <- readTVarIO events
events2 `shouldBe` [ EMessageReceived msg, EJobTimeout msg ]
-- Queue should be empty, since we archive timed out jobs -- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -301,12 +280,9 @@ workerTests brokerInitParams = ...@@ -301,12 +280,9 @@ workerTests brokerInitParams =
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSRepeatNElseArchive 1 } let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSRepeatNElseArchive 1 }
sendJob' job sendJob' job
threadDelay (3*second)
events2 <- readTVarIO events
-- | Should have been run 2 times, then archived -- | Should have been run 2 times, then archived
events2 `shouldBe` [ EMessageReceived msg, EJobTimeout msg waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg
, EMessageReceived msg, EJobTimeout msg ] , EMessageReceived msg, EJobTimeout msg ] 3500
-- Queue should be empty, since we archive timed out jobs -- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -329,13 +305,11 @@ workerTests brokerInitParams = ...@@ -329,13 +305,11 @@ workerTests brokerInitParams =
let job2 = mkDefaultSendJob' broker queueName msg2 let job2 = mkDefaultSendJob' broker queueName msg2
sendJob' job2 sendJob' job2
threadDelay (500 * millisecond)
events2 <- readTVarIO events
-- The jobs don't have to be process exactly in this order so we just use Set here -- The jobs don't have to be process exactly in this order so we just use Set here
Set.fromList events2 `shouldBe` waitUntilTVarPred events (
Set.fromList [ EMessageReceived msg1, EJobFinished msg1 \e -> Set.fromList e == Set.fromList
, EMessageReceived msg2, EJobFinished msg2 ] [ EMessageReceived msg1, EJobFinished msg1
, EMessageReceived msg2, EJobFinished msg2 ]) 500
-- queue should be empty -- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
...@@ -357,12 +331,10 @@ workerTests brokerInitParams = ...@@ -357,12 +331,10 @@ workerTests brokerInitParams =
let job = mkDefaultSendJob' broker queueName msg let job = mkDefaultSendJob' broker queueName msg
sendJob' job sendJob' job
threadDelay (500 * millisecond) waitUntilTVarPred events (
\e -> Set.fromList e ==
events2 <- readTVarIO events Set.fromList [ EMessageReceived msgErr, EJobError msgErr
Set.fromList events2 `shouldBe` , EMessageReceived msg, EJobFinished msg ]) 500
Set.fromList [ EMessageReceived msgErr, EJobError msgErr
, EMessageReceived msg, EJobFinished msg ]
-- queue should be empty -- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName queueLen2 <- BT.getQueueSize broker queueName
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment