More tests, refactorings

parent ffa0f57f
Pipeline #6509 failed with stages
in 22 minutes and 9 seconds
......@@ -14,6 +14,7 @@ import Async.Worker.Types (State(..), PerformAction, getJob, formatStr, TimeoutS
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (Exception, throwIO)
import Control.Monad (void)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
......@@ -75,7 +76,7 @@ main :: IO ()
main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
let brokerInitParams = PGMQBrokerInitParams connInfo :: BrokerInitParams PGMQBroker (Job Message)
let brokerInitParams = PGMQBrokerInitParams connInfo 10 :: BrokerInitParams PGMQBroker (Job Message)
let queue = "simple_worker"
......@@ -108,17 +109,17 @@ main = do
let mkJob msg = mkDefaultSendJob' broker queue msg
mapM_ (\idx -> do
sendJob' $ mkJob $ Ping
sendJob' $ mkJob $ Wait 1
sendJob' $ mkJob $ Echo $ "hello " <> show idx
sendJob' $ mkJob $ Error $ "error " <> show idx
void $ sendJob' $ mkJob $ Ping
void $ sendJob' $ mkJob $ Wait 1
void $ sendJob' $ mkJob $ Echo $ "hello " <> show idx
void $ sendJob' $ mkJob $ Error $ "error " <> show idx
) tasksLst
-- a job that will timeout
let timedOut =
(mkDefaultSendJob broker queue (Wait 5) 1)
{ toStrat = TSRepeatNElseArchive 3 }
sendJob' timedOut
void $ sendJob' timedOut
threadDelay (10*second)
......
......@@ -8,6 +8,6 @@ packages:
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: 268398735f61008af099918a24b3fb57f9533ba3
tag: fcb7d4fb811e5b7239078b48268c469c8d28fdf9
tests: true
......@@ -83,9 +83,11 @@ library
, haskell-pgmq >= 0.1.0.0 && < 0.2
, hedis >= 0.15.2 && < 0.16
, mtl >= 2.2 && < 2.4
, postgresql-libpq >= 0.10 && < 0.11
, postgresql-simple >= 0.6 && < 0.8
, safe >= 0.3 && < 0.4
, safe-exceptions >= 0.1.7 && < 0.2
, stm >= 2.5.3 && < 3
, text >= 1.2 && < 2.2
, time >= 1.10 && < 1.15
, units >= 2.4 && < 2.5
......@@ -204,4 +206,4 @@ test-suite test-integration
OverloadedStrings
RecordWildCards
ghc-options: -threaded
ghc-options: -threaded -fprof-auto
......@@ -17,8 +17,9 @@ Asynchronous worker.
module Async.Worker
( -- * Running
run
( KillWorkerSafely(..)
-- * Running
, run
-- * Sending jobs
, sendJob
-- ** 'SendJob' wrappers
......@@ -36,11 +37,28 @@ import Async.Worker.Broker
{- | Various worker types, in particular 'State'
-}
import Async.Worker.Types
import Control.Exception.Safe (catch, fromException, throwIO, SomeException)
import Control.Monad (forever)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO, newTVarIO, writeTVar)
import Control.Exception.Safe (catch, fromException, throwIO, SomeException, Exception)
import Control.Monad (forever, void, when)
import Debug.Trace (traceStack)
import System.Timeout qualified as Timeout
-- | If you want to stop a worker safely, use `throwTo'
-- 'workerThreadId' 'KillWorkerSafely'. This way the worker will stop
-- whatever is doing now and resend the message back to the
-- broker. This way you won't lose your jobs. If you don't care about
-- resuming a job, just set 'resendWhenWorkerKilled' property to
-- 'False'.
data KillWorkerSafely = KillWorkerSafely
deriving (Show)
instance Exception KillWorkerSafely
-- | This is the main function to start a worker. It's an infinite
-- loop of reading the next broker message, processing it and handling
-- any errors, issues that might arrise in the meantime.
run :: (HasWorkerBroker b a) => State b a -> IO ()
run state@(State { .. }) = do
createQueue broker queueName
......@@ -48,30 +66,61 @@ run state@(State { .. }) = do
where
loop :: IO ()
loop = do
-- TODO try...catch for main loop. This should catch exceptions
-- but also job timeout events (we want to stick to the practice
-- of keeping only one try...catch in the whole function)
-- TVar to hold currently processed job. This is used for
-- exception handling.
mBrokerMessageTVar <- newTVarIO Nothing -- :: IO (TVar (Maybe (BrokerMessage b (Job a))))
catch (do
brokerMessage <- readMessageWaiting broker queueName
atomically $ writeTVar mBrokerMessageTVar (Just brokerMessage)
handleMessage state brokerMessage
callWorkerJobEvent onJobFinish state brokerMessage
) (\err ->
atomically $ writeTVar mBrokerMessageTVar Nothing
) (\err -> do
mBrokerMessage <- readTVarIO mBrokerMessageTVar
case fromException err of
Just jt@(JobTimeout {}) -> handleTimeoutError state jt
Just KillWorkerSafely -> do
case mBrokerMessage of
Just brokerMessage -> do
let job = toA $ getMessage brokerMessage
let mdata = metadata job
-- Should we resend this message?
when (resendWhenWorkerKilled mdata) $ do
putStrLn $ formatStr state $ "resending job: " <> show job
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCount mdata + 1 } })
size <- getQueueSize broker queueName
putStrLn $ formatStr state $ "queue size: " <> show size
-- In any case, deinit the broker (i.e. close connection)
-- deinitBroker broker
-- kill worker
throwIO KillWorkerSafely
Nothing -> pure ()
Nothing -> case fromException err of
Just je@(JobException {}) -> handleJobError state je
_ -> handleUnknownError state err)
Just jt@(JobTimeout {}) -> handleTimeoutError state jt
Nothing -> case mBrokerMessage of
Just brokerMessage -> do
callWorkerJobEvent onJobError state brokerMessage
handleJobError state brokerMessage
_ -> handleUnknownError state err)
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state@(State { .. }) brokerMessage = do
callWorkerJobEvent onMessageReceived state brokerMessage
let msgId = messageId brokerMessage
let msg = getMessage brokerMessage
let job' = toA msg
putStrLn $ formatStr state $ "received job: " <> show (job job')
-- putStrLn $ formatStr state $ "received job: " <> show (job job')
let mdata = metadata job'
let t = jobTimeout job'
mTimeout <- Timeout.timeout (t * microsecond) (wrapPerformActionInJobException state brokerMessage)
let timeoutS = t * microsecond
-- Inform the broker how long a task could take. This way we prevent
-- the broker from sending this task to another worker (e.g. 'vt' in
-- PGMQ).
setMessageTimeout broker queueName msgId timeoutS
-- mTimeout <- Timeout.timeout timeoutS (wrapPerformActionInJobException state brokerMessage)
mTimeout <- Timeout.timeout timeoutS (runAction state brokerMessage)
let archiveHandler = do
case archiveStrategy mdata of
......@@ -91,20 +140,18 @@ handleMessage state@(State { .. }) brokerMessage = do
-- onMessageFetched broker queue msg
-- | It's important to know if an exception occured inside a job. This
-- way we can apply error recovering strategy and adjust this job in
-- the broker
wrapPerformActionInJobException :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
wrapPerformActionInJobException state@(State { onJobError }) brokerMessage = do
catch (do
runAction state brokerMessage
)
(\err -> do
callWorkerJobEvent onJobError state brokerMessage
let wrappedErr = JobException { jeBMessage = brokerMessage,
jeException = err }
throwIO wrappedErr
)
-- -- | It's important to know if an exception occured inside a job. This
-- -- way we can apply error recovering strategy and adjust this job in
-- -- the broker
-- wrapPerformActionInJobException :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
-- wrapPerformActionInJobException state@(State { onJobError }) brokerMessage = do
-- catch (do
-- runAction state brokerMessage
-- )
-- (\(err :: SomeException) -> do
-- callWorkerJobEvent onJobError state brokerMessage
-- throwIO err
-- )
callWorkerJobEvent :: (HasWorkerBroker b a)
......@@ -116,11 +163,11 @@ callWorkerJobEvent Nothing _ _ = pure ()
callWorkerJobEvent (Just event) state brokerMessage = event state brokerMessage
handleTimeoutError :: (HasWorkerBroker b a) => State b a -> JobTimeout b a -> IO ()
handleTimeoutError state@(State { .. }) jt@(JobTimeout { .. }) = do
putStrLn $ formatStr state $ show jt
handleTimeoutError _state@(State { .. }) _jt@(JobTimeout { .. }) = do
-- putStrLn $ formatStr state $ show jt
let msgId = messageId jtBMessage
let job = toA $ getMessage jtBMessage
putStrLn $ formatStr state $ "timeout for job: " <> show job
-- putStrLn $ formatStr state $ "timeout for job: " <> show job
let mdata = metadata job
case timeoutStrategy mdata of
TSDelete -> deleteMessage broker queueName msgId
......@@ -136,24 +183,32 @@ handleTimeoutError state@(State { .. }) jt@(JobTimeout { .. }) = do
-- here? (i.e. delete, then resend)
-- Also, be aware that messsage id will change with resend
-- Delete original job first
-- Delete this job first, otherwise we'll be duplicating jobs.
deleteMessage broker queueName msgId
-- Send this job again, with increased 'readCount'
sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
TSRepeatNElseDelete _n -> do
-- TODO Implement 'readCt'
undefined
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
TSRepeatNElseDelete n -> do
let readCt = readCount mdata
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
-- if readCt > n then
-- PGMQ.deleteMessage conn queue messageId
-- else
-- pure ()
handleJobError :: (HasWorkerBroker b a) => State b a -> JobException b a -> IO ()
handleJobError state@(State { .. }) je@(JobException { .. }) = do
let msgId = messageId jeBMessage
let job = toA $ getMessage jeBMessage
putStrLn $ formatStr state $ "error: " <> show je <> " for job " <> show job
if readCt >= n then
deleteMessage broker queueName msgId
else do
-- NOTE In rare cases, when worker hangs, we might lose a job
-- here? (i.e. delete, then resend)
-- Also, be aware that messsage id will change with resend
-- Delete this job first, otherwise we'll be duplicating jobs.
deleteMessage broker queueName msgId
-- Send this job again, with increased 'readCount'
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
handleJobError :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleJobError _state@(State { .. }) brokerMessage = do
let msgId = messageId brokerMessage
let job = toA $ getMessage brokerMessage
-- putStrLn $ formatStr state $ "error: " <> show je <> " for job " <> show job
let mdata = metadata job
case errorStrategy mdata of
ESDelete -> deleteMessage broker queueName msgId
......@@ -162,14 +217,18 @@ handleJobError state@(State { .. }) je@(JobException { .. }) = do
let readCt = readCount mdata
if readCt >= n then
archiveMessage broker queueName msgId
else
sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
else do
-- Delete this job first, otherwise we'll be duplicating jobs.
deleteMessage broker queueName msgId
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
handleUnknownError :: (HasWorkerBroker b a) => State b a -> SomeException -> IO ()
handleUnknownError state err = do
let _ = traceStack ("unknown error: " <> show err)
putStrLn $ formatStr state $ "unknown error: " <> show err
sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO ()
sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO (MessageId b)
sendJob broker queueName job = do
sendMessage broker queueName $ toMessage job
......@@ -184,14 +243,15 @@ microsecond = 10^(6 :: Int)
-- | Wraps parameters for the 'sendJob' function
data (HasWorkerBroker b a) => SendJob b a =
SendJob { broker :: Broker b (Job a)
, queue :: Queue
, msg :: a
SendJob { broker :: Broker b (Job a)
, queue :: Queue
, msg :: a
-- , delay :: Delay
, archStrat :: ArchiveStrategy
, errStrat :: ErrorStrategy
, toStrat :: TimeoutStrategy
, timeout :: Timeout }
, archStrat :: ArchiveStrategy
, errStrat :: ErrorStrategy
, toStrat :: TimeoutStrategy
, timeout :: Timeout
, resendOnKill :: Bool}
-- | Create a 'SendJob' data with some defaults
mkDefaultSendJob :: HasWorkerBroker b a
......@@ -211,7 +271,8 @@ mkDefaultSendJob broker queue msg timeout =
, errStrat = ESArchive
-- | repeat timed out jobs
, toStrat = TSRepeat
, timeout }
, timeout
, resendOnKill = True }
-- | Like 'mkDefaultSendJob' but with default timeout
......@@ -226,12 +287,12 @@ mkDefaultSendJob' b q m = mkDefaultSendJob b q m defaultTimeout
-- | Call 'sendJob' with 'SendJob b a' data
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO ()
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO (MessageId b)
sendJob' (SendJob { .. }) = do
let metadata = JobMetadata { archiveStrategy = archStrat
, errorStrategy = errStrat
, timeoutStrategy = toStrat
, timeout = timeout
, readCount = 0 }
let metadata = defaultMetadata { archiveStrategy = archStrat
, errorStrategy = errStrat
, timeoutStrategy = toStrat
, timeout = timeout
, resendWhenWorkerKilled = resendOnKill }
let job = Job { job = msg, metadata }
sendJob broker queue job
......@@ -19,7 +19,11 @@ module Async.Worker.Broker.PGMQ
where
import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage)
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (withMVar)
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.Internal qualified as PSQLInternal
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
......@@ -30,22 +34,30 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
data Broker PGMQBroker a =
PGMQBroker' {
conn :: PSQL.Connection
, defaultVt :: PGMQ.VisibilityTimeout
}
data BrokerMessage PGMQBroker a = PGMQBM (PGMQ.Message a)
deriving (Show)
data Message PGMQBroker a = PGMQM a
data MessageId PGMQBroker = PGMQMid Int
deriving (Eq, Show)
data BrokerInitParams PGMQBroker a = PGMQBrokerInitParams PSQL.ConnectInfo
data BrokerInitParams PGMQBroker a = PGMQBrokerInitParams PSQL.ConnectInfo PGMQ.VisibilityTimeout
messageId (PGMQBM (PGMQ.Message { msgId })) = PGMQMid msgId
getMessage (PGMQBM (PGMQ.Message { message })) = PGMQM message
toMessage message = PGMQM message
toA (PGMQM message) = message
initBroker (PGMQBrokerInitParams connInfo) = do
initBroker (PGMQBrokerInitParams connInfo defaultVt) = do
conn <- PSQL.connect connInfo
-- PGMQ is quite verbose because of initialization. We can disable
-- notices
-- https://hackage.haskell.org/package/postgresql-simple-0.7.0.0/docs/src/Database.PostgreSQL.Simple.Internal.html#Connection
-- https://hackage.haskell.org/package/postgresql-libpq-0.10.1.0/docs/Database-PostgreSQL-LibPQ.html#g:13
-- https://www.postgresql.org/docs/current/libpq-notice-processing.html
withMVar (PSQLInternal.connectionHandle conn) $ \c -> do
LibPQ.disableNoticeReporting c
PGMQ.initialize conn
pure $ PGMQBroker' { conn }
pure $ PGMQBroker' { conn, defaultVt }
deinitBroker (PGMQBroker' { conn }) = PSQL.close conn
createQueue (PGMQBroker' { conn }) queue = do
......@@ -54,17 +66,45 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
dropQueue (PGMQBroker' { conn }) queue = do
PGMQ.dropQueue conn queue
readMessageWaiting q@(PGMQBroker' { conn }) queue = loop
readMessageWaiting q@(PGMQBroker' { conn, defaultVt }) queue = loop
where
-- loop :: PGMQ.SerializableMessage a => IO (BrokerMessage PGMQBroker' a)
loop = do
mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
-- NOTE readMessageWithPoll is not thread-safe, i.e. the
-- blocking is outside of GHC (in PostgreSQL itself) and we
-- can't reliably use it in a highly concurrent situation.
-- mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
mMsg <- PGMQ.readMessage conn queue defaultVt
case mMsg of
Nothing -> readMessageWaiting q queue
Just msg -> return $ PGMQBM msg
Nothing -> do
-- wait a bit, then retry
threadDelay (50 * 1000)
readMessageWaiting q queue
Just msg -> do
-- TODO! we want to set message visibility timeout so that other workers don't start this job
return $ PGMQBM msg
popMessageWaiting q@(PGMQBroker' { conn }) queue = loop
where
-- loop :: PGMQ.SerializableMessage a => IO (BrokerMessage PGMQBroker' a)
loop = do
-- mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
mMsg <- PGMQ.popMessage conn queue
case mMsg of
Nothing -> do
-- wait a bit, then retry
threadDelay (50 * 1000)
popMessageWaiting q queue
Just msg -> do
-- TODO! we want to set message visibility timeout so that other workers don't start this job
return $ PGMQBM msg
setMessageTimeout (PGMQBroker' { conn }) queue (PGMQMid msgId) timeoutS =
PGMQ.setMessageVt conn queue msgId timeoutS
sendMessage (PGMQBroker' { conn }) queue (PGMQM message) =
PGMQ.sendMessage conn queue message 0
PGMQMid <$> PGMQ.sendMessage conn queue message 0
deleteMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
PGMQ.deleteMessage conn queue msgId
......@@ -73,10 +113,14 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
PGMQ.archiveMessage conn queue msgId
getQueueSize (PGMQBroker' { conn }) queue = do
mMetrics <- PGMQ.getMetrics conn queue
case mMetrics of
Nothing -> return 0
Just (PGMQ.Metrics { queueLength }) -> return queueLength
-- NOTE: pgmq.metrics is NOT a proper way to deal with messages
-- that have vt in the future
-- (c.f. https://github.com/tembo-io/pgmq/issues/301)
-- mMetrics <- PGMQ.getMetrics conn queue
-- case mMetrics of
-- Nothing -> return 0
-- Just (PGMQ.Metrics { queueLength }) -> return queueLength
PGMQ.queueAvailableLength conn queue
getArchivedMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
mMsg <- PGMQ.readMessageFromArchive conn queue msgId
......
......@@ -12,10 +12,13 @@ https://redis.io/glossary/redis-queue/
The design is as follows:
- for each queue we have an 'id counter'
- each queue is represented as a list of message ids
- each queue is represented as a set of message ids
- each message is stored under unique key, derived from its id
- the above allows us to have an archive with messages
- deleting a message means removing it's unique key from Redis
The queue itself is a list, the archive is a set (so that we can use
SISMEMBER).
-}
......@@ -31,7 +34,7 @@ module Async.Worker.Broker.Redis
where
import Async.Worker.Broker.Types (HasBroker(..), Queue, SerializableMessage)
import Control.Concurrent (threadDelay)
-- import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Data.Aeson qualified as Aeson
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object)
......@@ -55,8 +58,6 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
deriving (Eq, Show)
data BrokerInitParams RedisBroker a = RedisBrokerInitParams Redis.ConnectInfo
-- We're using simple QUEUE so we don't care about message id as we
-- won't be deleting/archiving the messages
messageId (RedisBM (RedisWithMsgId { rmidId })) = RedisMid rmidId
getMessage (RedisBM (RedisWithMsgId { rmida })) = RedisM rmida
toMessage message = RedisM message
......@@ -76,23 +77,45 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
let queueK = queueKey queue
void $ Redis.runRedis conn $ Redis.del [queueK]
readMessageWaiting b@(RedisBroker' { conn }) queue = loop
-- TODO This is simplified
readMessageWaiting = popMessageWaiting
popMessageWaiting b@(RedisBroker' { conn }) queue = loop
where
queueK = queueKey queue
loop = do
eMsgId <- Redis.runRedis conn $ Redis.spop queueK
case eMsgId of
-- 0 means block indefinitely
-- https://redis.io/docs/latest/commands/blpop/
eData <- Redis.runRedis conn $ Redis.blpop [queueK] 0
case eData of
Left _ -> undefined
Right Nothing -> do
threadDelay 100
readMessageWaiting b queue
Right (Just msgIdBS) -> case bsToId msgIdBS of
Right Nothing -> undefined
Right (Just (_queueK, msgIdBS)) -> case bsToId msgIdBS of
Nothing -> undefined
Just msgId -> do
mMsg <- getRedisMessage b queue msgId
case mMsg of
Nothing -> undefined
Just msg -> return msg
maybe undefined return mMsg
-- popMessageWaiting b@(RedisBroker' { conn }) queue = loop
-- where
-- queueK = queueKey queue
-- loop = do
-- eMsgId <- Redis.runRedis conn $ Redis.spop queueK
-- case eMsgId of
-- Left _ -> undefined
-- Right Nothing -> do
-- threadDelay (10*1000)
-- popMessageWaiting b queue
-- Right (Just msgIdBS) -> case bsToId msgIdBS of
-- Nothing -> undefined
-- Just msgId -> do
-- mMsg <- getRedisMessage b queue msgId
-- case mMsg of
-- Nothing -> undefined
-- Just msg -> return msg
setMessageTimeout _broker _queue _msgId _timeoutS =
pure ()
sendMessage b@(RedisBroker' { conn }) queue (RedisM message) = do
mId <- nextId b queue
......@@ -104,34 +127,44 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
let msgK = messageKey queue msgId
let queueK = queueKey queue
void $ Redis.runRedis conn $ do
-- write the message itself under unique key
_ <- Redis.set msgK (BSL.toStrict $ Aeson.encode m)
Redis.sadd queueK [idToBS msgId]
-- add message id to the list
-- Redis.sadd queueK [idToBS msgId]
Redis.lpush queueK [idToBS msgId]
return msgId
-- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
deleteMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue
void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId]
let messageK = messageKey queue msgId
void $ Redis.runRedis conn $ Redis.del [messageK]
-- void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId]
void $ Redis.runRedis conn $ do
_ <- Redis.lrem queueK 1 (idToBS msgId)
Redis.del [messageK]
-- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
archiveMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue
let archiveK = archiveKey queue
eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId)
case eMove of
Left _ -> undefined
Right True -> return ()
Right False -> do
-- OK so the queue might not have the id, we just add it to archive to make sure
void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId]
void $ Redis.runRedis conn $ do
_ <- Redis.lrem queueK 1 (idToBS msgId)
Redis.sadd archiveK [idToBS msgId]
-- eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId)
-- case eMove of
-- Left _ -> undefined
-- Right True -> return ()
-- Right False -> do
-- -- OK so the queue might not have the id, we just add it to archive to make sure
-- void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId]
getQueueSize (RedisBroker' { conn }) queue = do
let queueK = queueKey queue
eLen <- Redis.runRedis conn $ Redis.scard queueK
-- eLen <- Redis.runRedis conn $ Redis.scard queueK
eLen <- Redis.runRedis conn $ Redis.llen queueK
case eLen of
Right len -> return $ fromIntegral len
Left _ -> return 0
Left _ -> undefined
getArchivedMessage b@(RedisBroker' { conn }) queue msgId = do
let archiveK = archiveKey queue
......@@ -169,7 +202,7 @@ nextId (RedisBroker' { conn }) queue = do
-- | Key under which a message is stored
messageKey :: Queue -> MessageId RedisBroker -> BS.ByteString
messageKey queue (RedisMid msgId) = BS.pack $ beePrefix <> "queue-" <> queue <> "-message-" <> show msgId
messageKey queue (RedisMid msgId) = queueKey queue <> BS.pack ("-message-" <> show msgId)
getRedisMessage :: FromJSON a
=> Broker RedisBroker a
......
......@@ -19,6 +19,7 @@ Broker typeclass definition.
module Async.Worker.Broker.Types
( Queue
, TimeoutS
-- * Main broker typeclass
-- $broker
, HasBroker(..)
......@@ -32,6 +33,7 @@ import Data.Typeable (Typeable)
type Queue = String
type TimeoutS = Int -- timeout for a message, in seconds
{- $broker
......@@ -117,11 +119,23 @@ class (
{-| Drop queue -}
dropQueue :: Broker b a -> Queue -> IO ()
{-| Read message, waiting for it if not present -}
{-| Read message from queue, waiting for it if not present (this leaves
the message in queue, you need to use 'setMessageTimeout' to prevent
other workers from seeing this message). -}
readMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a)
{-| Pop message from queue, waiting for it if not present -}
popMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a)
{-| We sometimes need a way to tell the broker that a message shouldn't
be visible for given amount of time (e.g. 'visibility timeout'
setting in PGMQ). The broker operates only on 'a' level and isn't
aware of 'Job' with its 'JobMetadata'. Hence, it's the worker's
responsibility to properly set timeout after message is read. -}
setMessageTimeout :: Broker b a -> Queue -> MessageId b -> TimeoutS -> IO ()
{-| Send message -}
sendMessage :: Broker b a -> Queue -> Message b a -> IO ()
sendMessage :: Broker b a -> Queue -> Message b a -> IO (MessageId b)
{-| Delete message -}
deleteMessage :: Broker b a -> Queue -> MessageId b -> IO ()
......
......@@ -41,13 +41,12 @@ module Async.Worker.Types
-- * Other useful types and functions
, HasWorkerBroker
, formatStr
, JobTimeout(..)
, JobException(..) )
, JobTimeout(..) )
where
import Async.Worker.Broker.Types (Broker, BrokerMessage, HasBroker, Queue)
import Control.Applicative ((<|>))
import Control.Exception.Safe (Exception, SomeException)
import Control.Exception.Safe (Exception)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Data.Typeable (Typeable)
......@@ -156,7 +155,12 @@ data JobMetadata =
, timeout :: Timeout
-- | Read count so we know how many times this message
-- was processed
, readCount :: ReadCount }
, readCount :: ReadCount
-- | A worker might have processed a task and be
-- killed. If 'resendWhenWorkerKilled' is 'True', this
-- job will be resent to broker and picked up
-- later. Otherwise it will be discarded.
, resendWhenWorkerKilled :: Bool }
deriving (Eq, Show)
instance ToJSON JobMetadata where
toJSON (JobMetadata { .. }) =
......@@ -166,6 +170,7 @@ instance ToJSON JobMetadata where
, "tstrat" .= timeoutStrategy
, "timeout" .= timeout
, "readCount" .= readCount
, "resendWhenWorkerKilled" .= resendWhenWorkerKilled
]
instance FromJSON JobMetadata where
parseJSON = withObject "JobMetadata" $ \o -> do
......@@ -174,6 +179,7 @@ instance FromJSON JobMetadata where
timeoutStrategy <- o .: "tstrat"
timeout <- o .: "timeout"
readCount <- o .: "readCount"
resendWhenWorkerKilled <- o .: "resendWhenWorkerKilled"
return $ JobMetadata { .. }
-- | For a typical 'Job' it's probably sane to just archive it no
......@@ -184,7 +190,8 @@ defaultMetadata =
, errorStrategy = ESArchive
, timeoutStrategy = TSArchive
, timeout = 10
, readCount = 0 }
, readCount = 0
, resendWhenWorkerKilled = True }
-- | Worker 'Job' is 'a' (defining action to call via 'performAction')
-- together with associated 'JobMetadata'.
......@@ -277,10 +284,3 @@ data JobTimeout b a =
, jtTimeout :: Timeout }
deriving instance (HasWorkerBroker b a) => Show (JobTimeout b a)
instance (HasWorkerBroker b a) => Exception (JobTimeout b a)
-- | An exception, thrown when job ends with error
data JobException b a =
JobException { jeBMessage :: BrokerMessage b (Job a)
, jeException :: SomeException }
deriving instance (HasWorkerBroker b a) => Show (JobException b a)
instance (HasWorkerBroker b a) => Exception (JobException b a)
......@@ -20,7 +20,8 @@ import Data.Aeson (ToJSON(..), FromJSON(..), withText)
import Data.Maybe (isJust)
import Data.Text qualified as T
import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntil)
import Test.Integration.Utils (defaultPGMQVt, getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntil)
import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum)
data TestEnv b =
......@@ -72,28 +73,56 @@ brokerTests bInitParams =
parallel $ around (withBroker bInitParams) $ describe "Broker tests" $ do
it "can send and receive a message" $ \(TestEnv { broker, queue }) -> do
let msg = Message { text = "test" }
BT.sendMessage broker queue (BT.toMessage msg)
msgId <- BT.sendMessage broker queue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting broker queue
-- putStrLn $ "[messageId] " <> show (BT.messageId msg2)
msg `shouldBe` BT.toA (BT.getMessage msg2)
msgId `shouldBe` BT.messageId msg2
it "can send, archive and read message from archive" $ \(TestEnv { broker, queue }) -> do
let msg = Message { text = "test" }
BT.sendMessage broker queue (BT.toMessage msg)
msgId <- BT.sendMessage broker queue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting broker queue
let msgId = BT.messageId msg2
msgId `shouldBe` BT.messageId msg2
BT.archiveMessage broker queue msgId
putStrLn $ "Reading msg " <> show msgId <> " from archive queue " <> queue
-- It might take some time to archive a message so we wait a bit
waitUntil (isJust <$> BT.getArchivedMessage broker queue msgId) 200
msgArchive <- BT.getArchivedMessage broker queue msgId
let msgIdArchive = BT.messageId <$> msgArchive
msgIdArchive `shouldBe` Just msgId
it "returns correct message id when sending message to broker" $ \(TestEnv { broker, queue }) -> do
let iter = [1..20] :: [Int] -- number of steps
mapM_ (\_i -> do
-- Generate random strings and make sure that the
-- message ids we get from sendMessage match our data
text <- randomString (onlyAlphaNum randomASCII) 20
let msg = Message { text }
msgId <- BT.sendMessage broker queue (BT.toMessage msg)
bMsg <- BT.readMessageWaiting broker queue
msg `shouldBe` BT.toA (BT.getMessage bMsg)
msgId `shouldBe` BT.messageId bMsg
BT.deleteMessage broker queue msgId
) iter
it "preserves msgId when archiving a message" $ \(TestEnv { broker, queue }) -> do
let iter = [1..20] :: [Int] -- number of steps
mapM_ (\_i -> do
-- Generate random strings and make sure that the
-- message ids we get from sendMessage match our data
text <- randomString (onlyAlphaNum randomASCII) 20
let msg = Message { text }
msgId <- BT.sendMessage broker queue (BT.toMessage msg)
BT.archiveMessage broker queue msgId
msgArchive <- BT.getArchivedMessage broker queue msgId
Just msg `shouldBe` (BT.toA . BT.getMessage <$> msgArchive)
) iter
pgmqBrokerInitParams :: IO (BT.BrokerInitParams PGMQ.PGMQBroker Message)
pgmqBrokerInitParams = do
PGMQ.PGMQBrokerInitParams <$> getPSQLEnvConnectInfo
conn <- getPSQLEnvConnectInfo
return $ PGMQ.PGMQBrokerInitParams conn defaultPGMQVt
redisBrokerInitParams :: IO (BT.BrokerInitParams Redis.RedisBroker Message)
redisBrokerInitParams = do
......
module Test.Integration.Utils
( getPSQLEnvConnectInfo
( defaultPGMQVt
, getPSQLEnvConnectInfo
, getRedisEnvConnectInfo
, randomQueueName
, waitUntil
, waitUntilTVarEq
, waitUntilTVarPred )
, waitUntilTVarPred
, waitUntilQueueSizeIs
, waitUntilQueueEmpty )
where
import Async.Worker.Broker qualified as B
......@@ -20,6 +23,26 @@ import Test.Hspec (expectationFailure, shouldBe, shouldSatisfy, Expectation, Has
import Test.RandomStrings (randomASCII, randomString, onlyLower)
-- | Timeout for 'wait' jobs, in ms.
newtype TimeoutMs = TimeoutMs Int
deriving (Eq, Show, Num, Integral, Real, Enum, Ord)
-- | Visibility timeout is a very important parameter for PGMQ. It is
-- mainly used when reading a job: it specifies for how many seconds
-- this job should be invisible for other workers. We need more tests
-- and setting this correctly, preferably in accordance with
-- 'Job.timeout'. Issue is that at the broker level we don't know
-- anything about 'Job'...
--
-- The lower the value, the more probable that some other worker will
-- pick up the same job at about the same time (before broker marks it
-- as invisible).
defaultPGMQVt :: Int
defaultPGMQVt = 1
-- | PSQL connect info that is fetched from env
getPSQLEnvConnectInfo :: IO PSQL.ConnectInfo
getPSQLEnvConnectInfo = do
......@@ -46,10 +69,14 @@ randomQueueName prefix = do
postfix <- randomString (onlyLower randomASCII) 10
return $ prefix <> "_" <> postfix
waitThreadDelay :: Int
waitThreadDelay = 50 * 1000
-- | Given a predicate IO action, test it for given number of
-- milliseconds or fail
waitUntil :: HasCallStack => IO Bool -> Int -> Expectation
waitUntil pred' timeoutMs = do
waitUntil :: HasCallStack => IO Bool -> TimeoutMs -> Expectation
waitUntil pred' (TimeoutMs timeoutMs) = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
p <- pred'
......@@ -61,12 +88,12 @@ waitUntil pred' timeoutMs = do
if p
then return ()
else do
threadDelay 50
threadDelay waitThreadDelay
performTest
-- | Similar to 'waitUntil' but specialized to 'TVar' equality checking
waitUntilTVarEq :: (HasCallStack, Show a, Eq a) => TVar a -> a -> Int -> Expectation
waitUntilTVarEq tvar expected timeoutMs = do
waitUntilTVarEq :: (HasCallStack, Show a, Eq a) => TVar a -> a -> TimeoutMs -> Expectation
waitUntilTVarEq tvar expected (TimeoutMs timeoutMs) = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
val <- readTVarIO tvar
......@@ -78,12 +105,12 @@ waitUntilTVarEq tvar expected timeoutMs = do
if val == expected
then return ()
else do
threadDelay 50
threadDelay waitThreadDelay
performTest
-- | Similar to 'waitUntilTVarEq' but with predicate checking
waitUntilTVarPred :: (HasCallStack, Show a, Eq a) => TVar a -> (a -> Bool) -> Int -> Expectation
waitUntilTVarPred tvar predicate timeoutMs = do
waitUntilTVarPred :: (HasCallStack, Show a, Eq a) => TVar a -> (a -> Bool) -> TimeoutMs -> Expectation
waitUntilTVarPred tvar predicate (TimeoutMs timeoutMs) = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
-- shortcut for testing mTimeout
val <- readTVarIO tvar
......@@ -95,5 +122,24 @@ waitUntilTVarPred tvar predicate timeoutMs = do
if predicate val
then return ()
else do
threadDelay 50
threadDelay waitThreadDelay
performTest
waitUntilQueueSizeIs :: (B.HasBroker b a) => B.Broker b a -> B.Queue -> Int -> TimeoutMs -> Expectation
waitUntilQueueSizeIs b queue size (TimeoutMs timeoutMs) = do
_mTimeout <- Timeout.timeout (timeoutMs * 1000) performTest
qSize <- B.getQueueSize b queue
qSize `shouldBe` size
where
performTest = do
qSize <- B.getQueueSize b queue
if qSize == size
then return ()
else do
threadDelay waitThreadDelay
performTest
waitUntilQueueEmpty :: (B.HasBroker b a) => B.Broker b a -> B.Queue -> TimeoutMs -> Expectation
waitUntilQueueEmpty b queue timeoutMs = waitUntilQueueSizeIs b queue 0 timeoutMs
......@@ -12,29 +12,37 @@
module Test.Integration.Worker
( workerTests
, multiWorkerTests
, pgmqWorkerBrokerInitParams
, redisWorkerBrokerInitParams )
where
import Async.Worker (run, mkDefaultSendJob, mkDefaultSendJob', sendJob', errStrat, toStrat)
import Async.Worker (run, mkDefaultSendJob, mkDefaultSendJob', sendJob', errStrat, toStrat, resendOnKill, KillWorkerSafely(..))
import Async.Worker.Broker.PGMQ qualified as PGMQ
import Async.Worker.Broker.Redis qualified as Redis
import Async.Worker.Broker.Types qualified as BT
import Async.Worker.Types
import Control.Concurrent (forkIO, killThread, threadDelay, ThreadId)
import Control.Concurrent (forkIO, killThread, threadDelay, ThreadId, throwTo)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO, newTVarIO, TVar, modifyTVar)
import Control.Exception (bracket, Exception, throwIO)
import Control.Monad (void)
import Data.Aeson (ToJSON(..), FromJSON(..), object, (.=), (.:), withObject)
import Data.Maybe (fromJust, isJust)
import Data.Set qualified as Set
import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntilTVarEq, waitUntilTVarPred)
import Test.Integration.Utils (defaultPGMQVt, getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntil, waitUntilTVarEq, waitUntilTVarPred, waitUntil, waitUntilQueueEmpty)
data TestEnv b =
TestEnv { state :: State b Message
, events :: TVar [Event]
, threadId :: ThreadId }
, threadId :: ThreadId
, brokerInitParams :: BT.BrokerInitParams b (Job Message)
-- | a separate broker so that we use separate connection from the worker
, broker :: BT.Broker b (Job Message)
, queueName :: BT.Queue
}
testQueuePrefix :: BT.Queue
......@@ -85,10 +93,41 @@ pa :: (HasWorkerBroker b Message) => State b a -> BT.BrokerMessage b (Job Messag
pa _state bm = do
let job' = BT.toA $ BT.getMessage bm
case job job' of
Message { text } -> putStrLn text
Message { text = _text } -> return () -- putStrLn text
Error -> throwIO $ SimpleException "Error!"
Timeout { delay } -> threadDelay (delay * second)
pushEvent :: BT.HasBroker b (Job Message)
=> TVar [Event]
-> (Message -> Event)
-> BT.BrokerMessage b (Job Message)
-> IO ()
pushEvent events evt bm = atomically $ modifyTVar events (\e -> e ++ [evt $ job $ BT.toA $ BT.getMessage bm])
initState :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> TVar [Event]
-> BT.Queue
-> String
-> IO (State b Message, ThreadId)
initState bInitParams events queue workerName = do
let pushEvt evt _s = pushEvent events evt
b' <- BT.initBroker bInitParams
let state = State { broker = b'
, queueName = queue
, name = workerName <> " for " <> queue
, performAction = pa
, onMessageReceived = Just (pushEvt EMessageReceived)
, onJobFinish = Just (pushEvt EJobFinished)
, onJobTimeout = Just (pushEvt EJobTimeout)
, onJobError = Just (pushEvt EJobError) }
threadId <- forkIO $ run state
pure (state, threadId)
withWorker :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
......@@ -110,201 +149,199 @@ withWorker brokerInitParams = bracket (setUpWorker brokerInitParams) tearDownWor
BT.createQueue b queue
events <- newTVarIO []
let pushEvent evt bm = atomically $ modifyTVar events (\e -> e ++ [evt $ job $ BT.toA $ BT.getMessage bm])
let state = State { broker = b
, queueName = queue
, name = "test worker for " <> queue
, performAction = pa
, onMessageReceived = Just (\_s bm -> pushEvent EMessageReceived bm)
, onJobFinish = Just (\_s bm -> pushEvent EJobFinished bm)
, onJobTimeout = Just (\_s bm -> pushEvent EJobTimeout bm)
, onJobError = Just (\_s bm -> pushEvent EJobError bm) }
threadId <- forkIO $ run state
(state, threadId) <- initState bInitParams events queue "test worker"
return $ TestEnv { state, events, threadId }
return $ TestEnv { state
, events
, threadId
, brokerInitParams = bInitParams
, broker = b
, queueName = queue }
tearDownWorker :: (HasWorkerBroker b Message)
=> TestEnv b
-> IO ()
tearDownWorker (TestEnv { state = State { broker = b, queueName }, threadId }) = do
BT.dropQueue b queueName
tearDownWorker (TestEnv { broker = b, queueName, state = State { broker = b' }, threadId }) = do
killThread threadId
BT.deinitBroker b'
BT.dropQueue b queueName
BT.deinitBroker b
-- | Single 'Worker' tests, abstracting the 'Broker' away.
workerTests :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> Spec
workerTests brokerInitParams =
parallel $ around (withWorker brokerInitParams) $ describe "Worker tests" $ do
it "can process a simple job" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "can process a simple job" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let text = "simple test"
let msg = Message { text }
let job = mkDefaultSendJob' broker queueName msg
sendJob' job
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobFinished msg ] 500
-- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
-- queue should be empty
waitUntilQueueEmpty broker queueName 100
it "can handle a job with error" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "can handle a job with error" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Error
let job = mkDefaultSendJob' broker queueName msg
sendJob' job
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobError msg ] 500
-- queue should be empty (error jobs archived by default)
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "can handle a job with error (with archive)" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "can handle a job with error (with archive)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Error
let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESDelete }
sendJob' job
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobError msg ] 500
-- queue should be empty (error job deleted)
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "can handle a job with error (with repeat n)" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "can handle a job with error (with repeat n)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Error
let job = (mkDefaultSendJob' broker queueName msg) { errStrat = ESRepeatNElseArchive 1 }
sendJob' job
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobError msg
, EMessageReceived msg, EJobError msg ] 500
-- NOTE It doesn't make sense to check queue size here, the
-- worker just continues to run the errored task in background
-- and currently there is no way to stop it. Maybe implementing
-- a custom test queue could help us here.
-- queue should be empty (error job archived)
waitUntilQueueEmpty broker queueName 100
it "can handle a job with timeout" $ \(TestEnv { state = State { broker, queueName }, events}) -> do
it "can handle a job with timeout (archive strategy)" $ \(TestEnv { broker, queueName, events}) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Timeout { delay = 2 }
let job = mkDefaultSendJob broker queueName msg 1
sendJob' job
let job' = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSArchive }
msgId <- sendJob' job'
waitUntilTVarPred events (\e -> take 2 e == [ EMessageReceived msg, EJobTimeout msg ]) 2500
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 1200
-- NOTE It doesn't make sense to check queue size here, the
-- worker just continues to run the errored task in background
-- and currently there is no way to stop it. Maybe implementing
-- a custom test queue could help us here.
-- There might be a slight delay before the message is archived
-- (handling exception step in the thread)
waitUntil (isJust <$> BT.getArchivedMessage broker queueName msgId) 100
it "can handle a job with timeout (archive strategy)" $ \(TestEnv { state = State { broker, queueName }, events}) -> do
-- The archive should contain our message
mMsgArchive <- BT.getArchivedMessage broker queueName msgId
mMsgArchive `shouldSatisfy` isJust
let msgArchive = fromJust mMsgArchive
job (BT.toA $ BT.getMessage msgArchive) `shouldBe` msg
-- Queue should be empty, since we archive timed out jobs
waitUntilQueueEmpty broker queueName 100
it "can handle a job with timeout (delete strategy)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSArchive }
sendJob' job
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSDelete }
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 2500
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 1200
-- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "can handle a job with timeout (delete strategy)" $ \(TestEnv { state = State { broker, queueName }, events}) -> do
it "can handle a job with timeout (repeat strategy)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSDelete }
sendJob' job
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSRepeat }
void $ sendJob' job
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 2500
-- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
-- NOTE It doesn't make sense to check queue size here, the
-- worker just continues to run the errored task in background
-- and currently there is no way to stop it. Maybe implementing
-- a custom test queue could help us here.
it "can handle a job with timeout (repeat N times, then archive strategy)" $ \(TestEnv { state = State { broker, queueName }, events}) -> do
it "can handle a job with timeout (repeat N times, then archive strategy)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSRepeatNElseArchive 1 }
sendJob' job
void $ sendJob' job
-- | Should have been run 2 times, then archived
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg
, EMessageReceived msg, EJobTimeout msg ] 3500
, EMessageReceived msg, EJobTimeout msg ] 2500
-- Queue should be empty, since we archive timed out jobs
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "can process two jobs" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "can handle a job with timeout (repeat N times, then delete strategy)" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSRepeatNElseDelete 1 }
void $ sendJob' job
-- | Should have been run 2 times, then archived
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg
, EMessageReceived msg, EJobTimeout msg ] 2500
-- Queue should be empty, since we deleted timed out jobs
waitUntilQueueEmpty broker queueName 100
it "can process two jobs" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
readTVarIO events >>= shouldBe []
-- queue should be empty
BT.getQueueSize broker queueName >>= shouldBe 0
let text1 = "simple test 1"
let msg1 = Message { text = text1 }
let job1 = mkDefaultSendJob' broker queueName msg1
sendJob' job1
void $ sendJob' job1
let text2 = "simple test 2"
let msg2 = Message { text = text2 }
let job2 = mkDefaultSendJob' broker queueName msg2
sendJob' job2
void $ sendJob' job2
-- The jobs don't have to be process exactly in this order so we just use Set here
waitUntilTVarPred events (
......@@ -313,24 +350,21 @@ workerTests brokerInitParams =
, EMessageReceived msg2, EJobFinished msg2 ]) 500
-- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "after job with error, continue with another one" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
it "after job with error, continue with another one" $ \(TestEnv { broker, queueName, events }) -> do
-- no events initially
events1 <- readTVarIO events
events1 `shouldBe` []
readTVarIO events >>= shouldBe []
-- queue should be empty
queueLen1 <- BT.getQueueSize broker queueName
queueLen1 `shouldBe` 0
BT.getQueueSize broker queueName >>= shouldBe 0
let msgErr = Error
let jobErr = mkDefaultSendJob' broker queueName msgErr
sendJob' jobErr
void $ sendJob' jobErr
let text = "simple test"
let msg = Message { text }
let job = mkDefaultSendJob' broker queueName msg
sendJob' job
void $ sendJob' job
waitUntilTVarPred events (
\e -> Set.fromList e ==
......@@ -338,8 +372,161 @@ workerTests brokerInitParams =
, EMessageReceived msg, EJobFinished msg ]) 500
-- queue should be empty
queueLen2 <- BT.getQueueSize broker queueName
queueLen2 `shouldBe` 0
waitUntilQueueEmpty broker queueName 100
it "killing worker should leave a currently processed message on queue (when resendWhenWorkerKilled is True)" $ \(TestEnv { broker, queueName, events, threadId }) -> do
-- no events initially
readTVarIO events >>= shouldBe []
-- queue should be empty
BT.getQueueSize broker queueName >>= shouldBe 0
-- Perform some long job
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 2) { resendOnKill = True }
void $ sendJob' job
-- Let's wait a bit to make sure the message is picked up by the
-- worker
waitUntilTVarEq events [ EMessageReceived msg ] 300
-- Now let's kill the thread immediately
throwTo threadId KillWorkerSafely
putStrLn $ "After KillWorkerSafely: " <> queueName
-- The message should still be there
threadDelay (300 * 1000)
BT.getQueueSize broker queueName >>= \qs -> do
putStrLn $ "After threadDelay: " <> queueName <> " size: " <> show qs
qs `shouldBe` 1
it "killing worker should discard the currently processed message (when resendWhenWorkerKilled is False)" $ \(TestEnv { broker, queueName, events, threadId }) -> do
-- no events initially
readTVarIO events >>= shouldBe []
-- queue should be empty
BT.getQueueSize broker queueName >>= flip shouldBe 0
-- Perform some long job
let msg = Timeout { delay = 2 }
let job = (mkDefaultSendJob broker queueName msg 2) { resendOnKill = False }
void $ sendJob' job
-- Let's wait a bit to make sure the message is picked up by the
-- worker
waitUntilTVarEq events [ EMessageReceived msg ] 300
waitUntilQueueEmpty broker queueName 300
-- Now let's kill the thread immediately
throwTo threadId KillWorkerSafely
-- The message shouldn't be there
threadDelay (300 * 1000)
BT.getQueueSize broker queueName >>= shouldBe 0
data TestEnvMulti b =
TestEnvMulti { broker :: BT.Broker b (Job Message) -- a broker with which you can query its state
, statesWithThreadIds :: [(State b Message, ThreadId)]
, events :: TVar [Event]
, brokerInitParams :: BT.BrokerInitParams b (Job Message)
, queueName :: BT.Queue
, numWorkers :: Int }
testMultiQueuePrefix :: BT.Queue
testMultiQueuePrefix = "test_workers"
withWorkers :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> Int
-> (TestEnvMulti b -> IO ())
-> IO ()
withWorkers brokerInitParams numWorkers = bracket (setUpWorkers brokerInitParams) tearDownWorkers
where
-- NOTE I need to pass 'b' again, otherwise GHC can't infer the
-- type of 'b' (even with 'ScopedTypeVariables' turned on)
setUpWorkers :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> IO (TestEnvMulti b)
setUpWorkers bInitParams = do
b <- BT.initBroker bInitParams
queue <- randomQueueName testMultiQueuePrefix
BT.dropQueue b queue
BT.createQueue b queue
events <- newTVarIO []
statesWithThreadIds <-
mapM (\idx -> initState bInitParams events queue ("test worker " <> show idx)) [1..numWorkers]
return $ TestEnvMulti { broker = b
, queueName = queue
, statesWithThreadIds
, events
, brokerInitParams = bInitParams
, numWorkers }
tearDownWorkers :: (HasWorkerBroker b Message)
=> TestEnvMulti b
-> IO ()
tearDownWorkers (TestEnvMulti { broker = b, queueName, statesWithThreadIds }) = do
mapM_ (\(State { broker = b' }, threadId) -> do
killThread threadId
BT.deinitBroker b'
) statesWithThreadIds
BT.dropQueue b queueName
BT.deinitBroker b
-- | Multiple 'Worker' tests, abstracting the 'Broker' away. All these
-- workers operate on the same queue.
multiWorkerTests :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> Int
-> Spec
multiWorkerTests brokerInitParams numWorkers =
parallel $ around (withWorkers brokerInitParams numWorkers) $ describe "Worker tests" $ do
it "can process simple jobs" $ \(TestEnvMulti { broker, queueName, events, numWorkers = nw }) -> do
-- no events initially
readTVarIO events >>= shouldBe []
-- queue should be empty
BT.getQueueSize broker queueName >>= shouldBe 0
-- create some messages and make sure they are processed (bombarding with messages)
let msgs = [Message { text = "task " <> show idx } | idx <- [1..20*nw]]
let jobs = map (mkDefaultSendJob' broker queueName) msgs
mapM_ sendJob' jobs
-- The jobs don't have to be process exactly in this order so we just use Set here
let expected = concat [[EMessageReceived msg, EJobFinished msg] | msg <- msgs]
waitUntilTVarPred events (\e -> Set.fromList e == Set.fromList expected) (fromIntegral $ 200*nw)
-- queue should be empty
waitUntilQueueEmpty broker queueName 100
it "multiple workers and one long message should result in one message processed" $ \(TestEnvMulti { broker, queueName, events }) -> do
let msg = Timeout { delay = 2 }
let job' = (mkDefaultSendJob broker queueName msg 1) { toStrat = TSArchive }
msgId <- sendJob' job'
waitUntilTVarEq events [ EMessageReceived msg, EJobTimeout msg ] 1200
-- There might be a slight delay before the message is archived
-- (handling exception step in the thread)
waitUntil (isJust <$> BT.getArchivedMessage broker queueName msgId) 100
-- The archive should contain our message
mMsgArchive <- BT.getArchivedMessage broker queueName msgId
mMsgArchive `shouldSatisfy` isJust
let msgArchive = fromJust mMsgArchive
job (BT.toA $ BT.getMessage msgArchive) `shouldBe` msg
-- Queue should be empty, since we archive timed out jobs
waitUntilQueueEmpty broker queueName 100
second :: Int
......@@ -351,7 +538,8 @@ millisecond = 1000
pgmqWorkerBrokerInitParams :: IO (BT.BrokerInitParams PGMQ.PGMQBroker (Job Message))
pgmqWorkerBrokerInitParams = do
PGMQ.PGMQBrokerInitParams <$> getPSQLEnvConnectInfo
conn <- getPSQLEnvConnectInfo
return $ PGMQ.PGMQBrokerInitParams conn defaultPGMQVt
redisWorkerBrokerInitParams :: IO (BT.BrokerInitParams Redis.RedisBroker (Job Message))
redisWorkerBrokerInitParams = do
......
module Main where
import Test.Integration.Broker (brokerTests, pgmqBrokerInitParams, redisBrokerInitParams)
import Test.Integration.Worker (workerTests, pgmqWorkerBrokerInitParams, redisWorkerBrokerInitParams)
import Test.Integration.Worker (workerTests, multiWorkerTests, pgmqWorkerBrokerInitParams, redisWorkerBrokerInitParams)
import Test.Tasty
import Test.Tasty.Hspec
......@@ -13,18 +13,22 @@ main = do
pgmqBrokerSpec <- testSpec "brokerTests (pgmq)" (brokerTests pgmqBInitParams)
pgmqWBInitParams <- pgmqWorkerBrokerInitParams
pgmqWorkerSpec <- testSpec "workerTests (pgmq)" (workerTests pgmqWBInitParams)
pgmqMultiWorkerSpec <- testSpec "multiWorkerTests (pgmq)" (multiWorkerTests pgmqWBInitParams 5)
redisBInitParams <- redisBrokerInitParams
redisBrokerSpec <- testSpec "brokerTests (redis)" (brokerTests redisBInitParams)
redisWBInitParams <- redisWorkerBrokerInitParams
redisWorkerSpec <- testSpec "workerTests (redis)" (workerTests redisWBInitParams)
redisMultiWorkerSpec <- testSpec "multiWorkerTests (redis)" (multiWorkerTests redisWBInitParams 5)
defaultMain $ testGroup "integration tests"
[
pgmqBrokerSpec
, pgmqWorkerSpec
, pgmqMultiWorkerSpec
, redisBrokerSpec
, redisWorkerSpec
, redisMultiWorkerSpec
]
......@@ -45,6 +45,7 @@ instance QC.Arbitrary WT.JobMetadata where
timeoutStrategy <- arbitrary
timeout <- arbitrary
readCount <- arbitrary
resendWhenWorkerKilled <- arbitrary
return $ WT.JobMetadata { .. }
aesonPropJobMetadataTests = testGroup "Aeson WT.JobMetadata (de-)serialization tests" $
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment