[tests] add missing Test.Integration.Worker

parent 6eca306c
Pipeline #6411 canceled with stages
in 6 minutes and 34 seconds
{-|
Generic Worker tests
-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-} -- TODO can remove this if 'Show a' is removed from 'HasWorkerBroker'
{-# LANGUAGE ScopedTypeVariables #-}
module Test.Integration.Worker
( workerTests
, pgmqWorkerBrokerInitParams )
where
import Async.Worker (run, mkDefaultSendJob, sendJob')
import Async.Worker.Broker.PGMQ (BrokerInitParams(..), PGMQBroker)
import Async.Worker.Broker.Types qualified as BT
import Async.Worker.Types
import Control.Concurrent (forkIO, killThread, threadDelay, ThreadId)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar
import Control.Exception (bracket, Exception, throwIO)
import Data.Aeson (ToJSON(..), FromJSON(..), object, (.=), (.:), withObject)
import Database.PostgreSQL.Simple qualified as PSQL
import Test.Hspec
data TestEnv b =
TestEnv { state :: State b Message
, events :: TVar [Event]
, threadId :: ThreadId }
testQueue :: BT.Queue
testQueue = "test"
data Message =
Message { text :: String }
| Error
deriving (Show, Eq)
instance ToJSON Message where
toJSON (Message { text }) = toJSON $ object [ "type" .= ("Message" :: String), "text" .= text ]
toJSON Error = toJSON $ object [ "type" .= ("Error" :: String) ]
instance FromJSON Message where
parseJSON = withObject "Message" $ \o -> do
type_ <- o .: "type"
case type_ of
"Message" -> do
text <- o .: "text"
pure $ Message { text }
"Error" -> pure Error
_ -> fail $ "Unknown type " <> type_
data Event =
EMessageReceived Message
| EJobFinished Message
| EJobTimeout Message
| EJobError Message
deriving (Eq, Show)
data SimpleException = SimpleException String
deriving Show
instance Exception SimpleException
pa :: (HasWorkerBroker b Message) => State b a -> BT.BrokerMessage b (Job Message) -> IO ()
pa _state bm = do
let job' = BT.toA $ BT.getMessage bm
case job job' of
Message { text } -> putStrLn text
Error -> throwIO $ SimpleException "Error!"
withWorker :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> (TestEnv b -> IO ())
-> IO ()
withWorker brokerInitParams = bracket (setUpWorker brokerInitParams) tearDownWorker
where
-- NOTE I need to pass 'b' again, otherwise GHC can't infer the
-- type of 'b' (even with 'ScopedTypeVariables' turned on)
setUpWorker :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> IO (TestEnv b)
setUpWorker bInitParams = do
b <- BT.initBroker bInitParams
BT.dropQueue b testQueue
BT.createQueue b testQueue
events <- newTVarIO []
let pushEvent evt bm = atomically $ modifyTVar events (\e -> e ++ [evt $ job $ BT.toA $ BT.getMessage bm])
let state = State { broker = b
, queueName = testQueue
, name = "test worker"
, performAction = pa
, onMessageReceived = Just (\_s bm -> pushEvent EMessageReceived bm)
, onJobFinish = Just (\_s bm -> pushEvent EJobFinished bm)
, onJobTimeout = Just (\_s bm -> pushEvent EJobTimeout bm)
, onJobError = Just (\_s bm -> pushEvent EJobError bm) }
threadId <- forkIO $ run state
return $ TestEnv { state, events, threadId }
tearDownWorker :: (HasWorkerBroker b Message)
=> TestEnv b
-> IO ()
tearDownWorker (TestEnv { state = State { broker = b, queueName }, threadId }) = do
BT.dropQueue b queueName
killThread threadId
BT.deinitBroker b
workerTests :: (HasWorkerBroker b Message)
=> BT.BrokerInitParams b (Job Message)
-> Spec
workerTests brokerInitParams =
sequential $ around (withWorker brokerInitParams) $ describe "Worker tests" $ do
it "can process a simple job" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
-- no events initially
events1 <- atomically $ readTVar events
events1 `shouldBe` []
let text = "simple test"
let msg = Message { text }
let job = mkDefaultSendJob broker queueName msg
sendJob' job
threadDelay (1*second)
events2 <- atomically $ readTVar events
events2 `shouldBe` [ EMessageReceived msg, EJobFinished msg ]
it "can process a job with error" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
-- no events initially
events1 <- atomically $ readTVar events
events1 `shouldBe` []
let job = mkDefaultSendJob broker queueName Error
sendJob' job
threadDelay (1*second)
events2 <- atomically $ readTVar events
events2 `shouldBe` [ EMessageReceived Error, EJobError Error ]
second :: Int
second = 1000000
pgmqWorkerBrokerInitParams :: BT.BrokerInitParams PGMQBroker (Job Message)
pgmqWorkerBrokerInitParams =
PGMQBrokerInitParams $
PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment