[tests] fixes, timeout worker test implemented

parent e25f1411
Pipeline #6414 passed with stages
in 14 minutes and 48 seconds
...@@ -7,8 +7,8 @@ A very simple worker to test Database.PGMQ.Worker. ...@@ -7,8 +7,8 @@ A very simple worker to test Database.PGMQ.Worker.
module Main module Main
where where
import Async.Worker (sendJob', mkDefaultSendJob, SendJob(..), run) import Async.Worker (sendJob', mkDefaultSendJob, mkDefaultSendJob', SendJob(..), run)
import Async.Worker.Broker.PGMQ (PGMQBroker(..)) import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (Broker, getMessage, toA, initBroker) import Async.Worker.Broker.Types (Broker, getMessage, toA, initBroker)
import Async.Worker.Types (State(..), PerformAction, getJob, formatStr, TimeoutStrategy(..), Job) import Async.Worker.Types (State(..), PerformAction, getJob, formatStr, TimeoutStrategy(..), Job)
import Control.Applicative ((<|>)) import Control.Applicative ((<|>))
...@@ -75,6 +75,7 @@ main :: IO () ...@@ -75,6 +75,7 @@ main :: IO ()
main = do main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres" let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" } , PSQL.connectDatabase = "postgres" }
let brokerInitParams = PGMQBrokerInitParams connInfo :: BrokerInitParams PGMQBroker (Job Message)
let queue = "simple_worker" let queue = "simple_worker"
...@@ -85,7 +86,7 @@ main = do ...@@ -85,7 +86,7 @@ main = do
-- let tasksLst = [] :: [Int] -- let tasksLst = [] :: [Int]
mapM_ (\idx -> do mapM_ (\idx -> do
broker <- initBroker connInfo :: Broker PGMQBroker (Job Message) broker <- initBroker brokerInitParams :: IO (Broker PGMQBroker (Job Message))
let state = State { broker let state = State { broker
, queueName = queue , queueName = queue
, name = "worker " <> show idx , name = "worker " <> show idx
...@@ -101,10 +102,10 @@ main = do ...@@ -101,10 +102,10 @@ main = do
threadDelay second threadDelay second
conn <- PSQL.connect connInfo conn <- PSQL.connect connInfo
let broker = initBroker (PGMQBroker conn) :: Broker PGMQBroker (Job Message) broker <- initBroker brokerInitParams :: IO (Broker PGMQBroker (Job Message))
-- SendJob wrapper -- SendJob wrapper
let mkJob msg = mkDefaultSendJob broker queue msg let mkJob msg = mkDefaultSendJob' broker queue msg
mapM_ (\idx -> do mapM_ (\idx -> do
sendJob' $ mkJob $ Ping sendJob' $ mkJob $ Ping
...@@ -114,8 +115,9 @@ main = do ...@@ -114,8 +115,9 @@ main = do
) tasksLst ) tasksLst
-- a job that will timeout -- a job that will timeout
let timedOut = (mkJob (Wait 5)) { timeout = 1 let timedOut =
, toStrat = TSRepeatNElseArchive 3 } (mkDefaultSendJob broker queue (Wait 5) 1)
{ toStrat = TSRepeatNElseArchive 3 }
sendJob' timedOut sendJob' timedOut
threadDelay (10*second) threadDelay (10*second)
......
...@@ -17,6 +17,7 @@ module Async.Worker ...@@ -17,6 +17,7 @@ module Async.Worker
( run ( run
, sendJob , sendJob
, mkDefaultSendJob , mkDefaultSendJob
, mkDefaultSendJob'
, sendJob' , sendJob'
, SendJob(..) ) , SendJob(..) )
where where
...@@ -173,8 +174,9 @@ mkDefaultSendJob :: HasWorkerBroker b a ...@@ -173,8 +174,9 @@ mkDefaultSendJob :: HasWorkerBroker b a
=> Broker b (Job a) => Broker b (Job a)
-> Queue -> Queue
-> a -> a
-> Timeout
-> SendJob b a -> SendJob b a
mkDefaultSendJob broker queue msg = mkDefaultSendJob broker queue msg timeout =
SendJob { broker SendJob { broker
, queue , queue
, msg , msg
...@@ -185,8 +187,19 @@ mkDefaultSendJob broker queue msg = ...@@ -185,8 +187,19 @@ mkDefaultSendJob broker queue msg =
, errStrat = ESArchive , errStrat = ESArchive
-- | repeat timed out jobs -- | repeat timed out jobs
, toStrat = TSRepeat , toStrat = TSRepeat
, timeout = 10 } , timeout }
-- | Like 'mkDefaultSendJob' but with default timeout
mkDefaultSendJob' :: HasWorkerBroker b a
=> Broker b (Job a)
-> Queue
-> a
-> SendJob b a
mkDefaultSendJob' b q m = mkDefaultSendJob b q m defaultTimeout
where
defaultTimeout = 10
-- | Call 'sendJob' with 'SendJob b a' data -- | Call 'sendJob' with 'SendJob b a' data
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO () sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO ()
......
...@@ -44,11 +44,11 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where ...@@ -44,11 +44,11 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
toA (PGMQM message) = message toA (PGMQM message) = message
initBroker (PGMQBrokerInitParams connInfo) = do initBroker (PGMQBrokerInitParams connInfo) = do
conn <- PSQL.connect connInfo conn <- PSQL.connect connInfo
PGMQ.initialize conn
pure $ PGMQBroker' { conn } pure $ PGMQBroker' { conn }
deinitBroker (PGMQBroker' { conn }) = PSQL.close conn deinitBroker (PGMQBroker' { conn }) = PSQL.close conn
createQueue (PGMQBroker' { conn }) queue = do createQueue (PGMQBroker' { conn }) queue = do
PGMQ.initialize conn
PGMQ.createQueue conn queue PGMQ.createQueue conn queue
dropQueue (PGMQBroker' { conn }) queue = do dropQueue (PGMQBroker' { conn }) queue = do
......
...@@ -12,7 +12,7 @@ module Test.Integration.Worker ...@@ -12,7 +12,7 @@ module Test.Integration.Worker
, pgmqWorkerBrokerInitParams ) , pgmqWorkerBrokerInitParams )
where where
import Async.Worker (run, mkDefaultSendJob, sendJob') import Async.Worker (run, mkDefaultSendJob, mkDefaultSendJob', sendJob')
import Async.Worker.Broker.PGMQ (BrokerInitParams(..), PGMQBroker) import Async.Worker.Broker.PGMQ (BrokerInitParams(..), PGMQBroker)
import Async.Worker.Broker.Types qualified as BT import Async.Worker.Broker.Types qualified as BT
import Async.Worker.Types import Async.Worker.Types
...@@ -37,10 +37,12 @@ testQueue = "test" ...@@ -37,10 +37,12 @@ testQueue = "test"
data Message = data Message =
Message { text :: String } Message { text :: String }
| Error | Error
| Timeout { delay :: Int }
deriving (Show, Eq) deriving (Show, Eq)
instance ToJSON Message where instance ToJSON Message where
toJSON (Message { text }) = toJSON $ object [ "type" .= ("Message" :: String), "text" .= text ] toJSON (Message { text }) = toJSON $ object [ "type" .= ("Message" :: String), "text" .= text ]
toJSON Error = toJSON $ object [ "type" .= ("Error" :: String) ] toJSON Error = toJSON $ object [ "type" .= ("Error" :: String) ]
toJSON (Timeout { delay }) = toJSON $ object [ "type" .= ("Timeout" :: String), "delay" .= delay ]
instance FromJSON Message where instance FromJSON Message where
parseJSON = withObject "Message" $ \o -> do parseJSON = withObject "Message" $ \o -> do
type_ <- o .: "type" type_ <- o .: "type"
...@@ -49,6 +51,9 @@ instance FromJSON Message where ...@@ -49,6 +51,9 @@ instance FromJSON Message where
text <- o .: "text" text <- o .: "text"
pure $ Message { text } pure $ Message { text }
"Error" -> pure Error "Error" -> pure Error
"Timeout" -> do
delay <- o .: "delay"
pure $ Timeout { delay }
_ -> fail $ "Unknown type " <> type_ _ -> fail $ "Unknown type " <> type_
...@@ -71,6 +76,7 @@ pa _state bm = do ...@@ -71,6 +76,7 @@ pa _state bm = do
case job job' of case job job' of
Message { text } -> putStrLn text Message { text } -> putStrLn text
Error -> throwIO $ SimpleException "Error!" Error -> throwIO $ SimpleException "Error!"
Timeout { delay } -> threadDelay (delay * second)
withWorker :: (HasWorkerBroker b Message) withWorker :: (HasWorkerBroker b Message)
...@@ -127,7 +133,7 @@ workerTests brokerInitParams = ...@@ -127,7 +133,7 @@ workerTests brokerInitParams =
let text = "simple test" let text = "simple test"
let msg = Message { text } let msg = Message { text }
let job = mkDefaultSendJob broker queueName msg let job = mkDefaultSendJob' broker queueName msg
sendJob' job sendJob' job
threadDelay (1*second) threadDelay (1*second)
...@@ -135,18 +141,33 @@ workerTests brokerInitParams = ...@@ -135,18 +141,33 @@ workerTests brokerInitParams =
events2 <- atomically $ readTVar events events2 <- atomically $ readTVar events
events2 `shouldBe` [ EMessageReceived msg, EJobFinished msg ] events2 `shouldBe` [ EMessageReceived msg, EJobFinished msg ]
it "can process a job with error" $ \(TestEnv { state = State { broker, queueName }, events }) -> do it "can handle a job with error" $ \(TestEnv { state = State { broker, queueName }, events }) -> do
-- no events initially -- no events initially
events1 <- atomically $ readTVar events events1 <- atomically $ readTVar events
events1 `shouldBe` [] events1 `shouldBe` []
let job = mkDefaultSendJob broker queueName Error let msg = Error
let job = mkDefaultSendJob' broker queueName msg
sendJob' job sendJob' job
threadDelay (1*second) threadDelay (1*second)
events2 <- atomically $ readTVar events events2 <- atomically $ readTVar events
events2 `shouldBe` [ EMessageReceived Error, EJobError Error ] events2 `shouldBe` [ EMessageReceived msg, EJobError msg ]
it "can handle a job with timeout" $ \(TestEnv { state = State { broker, queueName }, events}) -> do
-- no events initially
events1 <- atomically $ readTVar events
events1 `shouldBe` []
let msg = Timeout 2
let job = (mkDefaultSendJob broker queueName msg 1)
sendJob' job
threadDelay (2*second)
events2 <- atomically $ readTVar events
events2 `shouldBe` [ EMessageReceived msg, EJobTimeout msg ]
second :: Int second :: Int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment