Timeout/error handling implemented in abstract worker

parent b45335b3
Pipeline #6409 failed with stages
in 13 minutes and 56 seconds
...@@ -8,7 +8,7 @@ Testing exception catch for PSQL. ...@@ -8,7 +8,7 @@ Testing exception catch for PSQL.
module Main module Main
where where
import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException) import Control.Exception (SomeException(..), catch)
import Control.Monad (void) import Control.Monad (void)
import Database.PostgreSQL.Simple qualified as PSQL import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.SqlQQ (sql) import Database.PostgreSQL.Simple.SqlQQ (sql)
......
...@@ -22,7 +22,7 @@ main = do ...@@ -22,7 +22,7 @@ main = do
PGMQ.initialize conn PGMQ.initialize conn
PGMQ.dropQueue conn "test" PGMQ.dropQueue conn "test"
metrics <- PGMQ.getMetrics conn "test" _metrics <- PGMQ.getMetrics conn "test"
let queue = "test" let queue = "test"
PGMQ.createQueue conn "test" PGMQ.createQueue conn "test"
...@@ -81,8 +81,8 @@ main = do ...@@ -81,8 +81,8 @@ main = do
metrics <- PGMQ.getMetrics conn queue metrics <- PGMQ.getMetrics conn queue
putStrLn $ "before purge: " <> show metrics putStrLn $ "before purge: " <> show metrics
PGMQ.purgeQueue conn queue PGMQ.purgeQueue conn queue
metrics <- PGMQ.getMetrics conn queue metrics2 <- PGMQ.getMetrics conn queue
putStrLn $ "after purge: " <> show metrics putStrLn $ "after purge: " <> show metrics2
PGMQ.dropQueue conn queue PGMQ.dropQueue conn queue
......
...@@ -24,9 +24,8 @@ where ...@@ -24,9 +24,8 @@ where
import Async.Worker.Broker import Async.Worker.Broker
import Async.Worker.Types import Async.Worker.Types
import Control.Exception.Safe (SomeException, catch, fromException, throwIO) import Control.Exception.Safe (catch, fromException, throwIO)
import Control.Monad (forever) import Control.Monad (forever)
import Data.Typeable (Typeable)
import System.Timeout qualified as Timeout import System.Timeout qualified as Timeout
...@@ -42,7 +41,12 @@ run state@(State { .. }) = do ...@@ -42,7 +41,12 @@ run state@(State { .. }) = do
-- of keeping only one try...catch in the whole function) -- of keeping only one try...catch in the whole function)
catch (do catch (do
readMessageWaiting broker queueName >>= handleMessage state readMessageWaiting broker queueName >>= handleMessage state
) (handleLoopError state) ) (\err ->
case fromException err of
Just jt@(JobTimeout {}) -> handleTimeoutError state jt
Nothing -> case fromException err of
Just je@(JobException {}) -> handleJobError state je
_ -> undefined)
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO () handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state@(State { .. }) brokerMessage = do handleMessage state@(State { .. }) brokerMessage = do
...@@ -52,7 +56,6 @@ handleMessage state@(State { .. }) brokerMessage = do ...@@ -52,7 +56,6 @@ handleMessage state@(State { .. }) brokerMessage = do
let job = toA msg let job = toA msg
let mdata = metadata job let mdata = metadata job
let t = jobTimeout job let t = jobTimeout job
putStrLn $ formatStr state $ "job timeout: " <> show t
mTimeout <- Timeout.timeout (t * microsecond) (performAction state brokerMessage) mTimeout <- Timeout.timeout (t * microsecond) (performAction state brokerMessage)
let archiveHandler = do let archiveHandler = do
...@@ -67,8 +70,7 @@ handleMessage state@(State { .. }) brokerMessage = do ...@@ -67,8 +70,7 @@ handleMessage state@(State { .. }) brokerMessage = do
case mTimeout of case mTimeout of
Just _ -> archiveHandler Just _ -> archiveHandler
Nothing -> throwIO $ JobTimeout { jtState = state Nothing -> throwIO $ JobTimeout { jtBMessage = brokerMessage
, jtBMessage = brokerMessage
, jtTimeout = t } , jtTimeout = t }
-- onMessageFetched broker queue msg -- onMessageFetched broker queue msg
...@@ -77,13 +79,44 @@ handleMessage state@(State { .. }) brokerMessage = do ...@@ -77,13 +79,44 @@ handleMessage state@(State { .. }) brokerMessage = do
-- mTimeout <- Timeout.timeout t (handleJob broker msg) -- mTimeout <- Timeout.timeout t (handleJob broker msg)
-- return ()) -- return ())
handleLoopError :: (HasWorkerBroker b a) => State b a -> SomeException -> IO () handleTimeoutError :: (HasWorkerBroker b a) => State b a -> JobTimeout b a -> IO ()
handleLoopError state err = do handleTimeoutError state@(State { .. }) jt@(JobTimeout { .. }) = do
case fromException err of putStrLn $ formatStr state $ show jt
Just (jobTimeout@(JobTimeout { }) :: JobTimeout b a) -> do let msgId = messageId jtBMessage
putStrLn $ formatStr state $ show jobTimeout let job = toA $ getMessage jtBMessage
_other -> do let mdata = metadata job
putStrLn $ formatStr state $ "other error" case timeoutStrategy mdata of
TSDelete -> deleteMessage broker queueName msgId
TSArchive -> archiveMessage broker queueName msgId
TSRepeat -> pure ()
TSRepeatNElseArchive _n -> do
-- TODO Implement 'readCt'
pure ()
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
-- if readCt > n then
-- PGMQ.archiveMessage conn queue messageId
-- else
-- pure ()
TSRepeatNElseDelete _n -> do
-- TODO Implement 'readCt'
pure ()
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
-- if readCt > n then
-- PGMQ.deleteMessage conn queue messageId
-- else
-- pure ()
handleJobError :: (HasWorkerBroker b a) => State b a -> JobException b a -> IO ()
handleJobError state@(State { .. }) je@(JobException { .. }) = do
putStrLn $ formatStr state $ show je
let msgId = messageId jeBMessage
let job = toA $ getMessage jeBMessage
let mdata = metadata job
case errorStrategy mdata of
ESDelete -> deleteMessage broker queueName msgId
ESArchive -> deleteMessage broker queueName msgId
ESRepeat -> return ()
sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO () sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO ()
sendJob broker queueName job = do sendJob broker queueName job = do
...@@ -97,7 +130,7 @@ microsecond = 10^(6 :: Int) ...@@ -97,7 +130,7 @@ microsecond = 10^(6 :: Int)
-- constructing them more easily -- constructing them more easily
-- | Wraps parameters for the 'sendJob' function -- | Wraps parameters for the 'sendJob' function
data SendJob b a = data (HasWorkerBroker b a) => SendJob b a =
SendJob { broker :: Broker b (Job a) SendJob { broker :: Broker b (Job a)
, queue :: Queue , queue :: Queue
, msg :: a , msg :: a
...@@ -108,7 +141,8 @@ data SendJob b a = ...@@ -108,7 +141,8 @@ data SendJob b a =
, timeout :: Timeout } , timeout :: Timeout }
-- | Create a 'SendJob' data with some defaults -- | Create a 'SendJob' data with some defaults
mkDefaultSendJob :: Broker b (Job a) mkDefaultSendJob :: HasWorkerBroker b a
=> Broker b (Job a)
-> Queue -> Queue
-> a -> a
-> SendJob b a -> SendJob b a
......
...@@ -25,17 +25,19 @@ import Database.PGMQ.Types qualified as PGMQ ...@@ -25,17 +25,19 @@ import Database.PGMQ.Types qualified as PGMQ
data PGMQBroker = PGMQBroker PSQL.Connection data PGMQBroker = PGMQBroker PSQL.Connection
instance (SerializableMessage a) => HasBroker PGMQBroker a where instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
data Broker PGMQBroker a = data Broker PGMQBroker a =
PGMQBroker' { PGMQBroker' {
conn :: PSQL.Connection conn :: PSQL.Connection
} }
data BrokerMessage PGMQBroker a = PGMQBM (PGMQ.Message a) data BrokerMessage PGMQBroker a = PGMQBM (PGMQ.Message a)
deriving (Show)
data Message PGMQBroker a = PGMQM a data Message PGMQBroker a = PGMQM a
type MessageId PGMQBroker = Int data MessageId PGMQBroker = PGMQMid Int
deriving (Eq, Show)
type BrokerInitParams PGMQBroker = PGMQBroker type BrokerInitParams PGMQBroker = PGMQBroker
messageId (PGMQBM (PGMQ.Message { msgId })) = msgId messageId (PGMQBM (PGMQ.Message { msgId })) = PGMQMid msgId
getMessage (PGMQBM (PGMQ.Message { message })) = PGMQM message getMessage (PGMQBM (PGMQ.Message { message })) = PGMQM message
toMessage message = PGMQM message toMessage message = PGMQM message
toA (PGMQM message) = message toA (PGMQM message) = message
...@@ -60,8 +62,8 @@ instance (SerializableMessage a) => HasBroker PGMQBroker a where ...@@ -60,8 +62,8 @@ instance (SerializableMessage a) => HasBroker PGMQBroker a where
sendMessage (PGMQBroker' { conn }) queue (PGMQM message) = sendMessage (PGMQBroker' { conn }) queue (PGMQM message) =
PGMQ.sendMessage conn queue message 0 PGMQ.sendMessage conn queue message 0
deleteMessage (PGMQBroker' { conn }) queue msgId = do deleteMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
PGMQ.deleteMessage conn queue msgId PGMQ.deleteMessage conn queue msgId
archiveMessage (PGMQBroker' { conn }) queue msgId = do archiveMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
PGMQ.archiveMessage conn queue msgId PGMQ.archiveMessage conn queue msgId
...@@ -34,7 +34,9 @@ type Queue = String ...@@ -34,7 +34,9 @@ type Queue = String
{-| A message in the queue system must have some properties. In {-| A message in the queue system must have some properties. In
particular, it must have some sort of 'id'. particular, it must have some sort of 'id'.
-} -}
-- class HasMessageId message msgId where -- class (Eq msgId, Show msgId, Typeable msgId) => HasMessageId msg msgId where
-- messageId :: msg -> msgId
-- class HasMessageId m where -- class HasMessageId m where
-- data family Message m :: Type -- data family Message m :: Type
-- type family MessageId m :: Type -- type family MessageId m :: Type
...@@ -72,11 +74,11 @@ type SerializableMessage a = ( FromJSON a ...@@ -72,11 +74,11 @@ type SerializableMessage a = ( FromJSON a
-} -}
-- class Broker broker brokerMessage message msgId | brokerMessage -> message, brokerMessage -> msgId where -- class Broker broker brokerMessage message msgId | brokerMessage -> message, brokerMessage -> msgId where
class ( Eq (MessageId b) class (
Eq (MessageId b)
, Show (MessageId b) , Show (MessageId b)
, Typeable (MessageId b) , Show (BrokerMessage b a)
, Typeable b ) => HasBroker b a where
, Typeable a ) => HasBroker b a where
-- | Data representing the broker -- | Data representing the broker
data family Broker b a :: Type data family Broker b a :: Type
-- | Data represenging message that is returned by broker -- | Data represenging message that is returned by broker
...@@ -84,7 +86,7 @@ class ( Eq (MessageId b) ...@@ -84,7 +86,7 @@ class ( Eq (MessageId b)
-- | Data that we serialize into broker -- | Data that we serialize into broker
data family Message b a :: Type data family Message b a :: Type
-- | How to get the message id (needed for delete/archive operations) -- | How to get the message id (needed for delete/archive operations)
type family MessageId b :: Type data family MessageId b :: Type
type family BrokerInitParams b :: Type type family BrokerInitParams b :: Type
...@@ -92,7 +94,8 @@ class ( Eq (MessageId b) ...@@ -92,7 +94,8 @@ class ( Eq (MessageId b)
-- 'BrokerMessage', 'Message' data types -- 'BrokerMessage', 'Message' data types
-- | Operation for getting the message id from 'BrokerMessage' -- | Operation for getting the message id from 'BrokerMessage'
messageId :: (Eq (MessageId b), Show (MessageId b)) => BrokerMessage b a -> MessageId b -- messageId :: (Eq (MessageId b), Show (MessageId b)) => BrokerMessage b a -> MessageId b
messageId :: BrokerMessage b a -> MessageId b
-- | 'BrokerMessage' contains 'Message' inside, this is a -- | 'BrokerMessage' contains 'Message' inside, this is a
-- deconstructor for 'BrokerMessage' -- deconstructor for 'BrokerMessage'
......
...@@ -11,6 +11,7 @@ Portability : POSIX ...@@ -11,6 +11,7 @@ Portability : POSIX
{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE UndecidableInstances #-}
module Async.Worker.Types module Async.Worker.Types
...@@ -29,10 +30,11 @@ module Async.Worker.Types ...@@ -29,10 +30,11 @@ module Async.Worker.Types
, PerformAction , PerformAction
, HasWorkerBroker , HasWorkerBroker
, formatStr , formatStr
, JobTimeout(..) ) , JobTimeout(..)
, JobException(..) )
where where
import Async.Worker.Broker.Types (Broker, BrokerMessage, HasBroker, Queue, messageId) import Async.Worker.Broker.Types (Broker, BrokerMessage, HasBroker, Queue)
import Control.Applicative ((<|>)) import Control.Applicative ((<|>))
import Control.Exception (Exception) import Control.Exception (Exception)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText) import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
...@@ -194,7 +196,7 @@ jobTimeout (Job { metadata }) = timeout metadata ...@@ -194,7 +196,7 @@ jobTimeout (Job { metadata }) = timeout metadata
-- message with metadata (corresponds to broker 'Message b a' and -- message with metadata (corresponds to broker 'Message b a' and
-- 'BrokerMessage b a' is what we get when the broker reads that -- 'BrokerMessage b a' is what we get when the broker reads that
-- message) -- message)
data State b a = data (HasWorkerBroker b a) => State b a =
State { broker :: Broker b (Job a) State { broker :: Broker b (Job a)
, queueName :: Queue -- name of queue , queueName :: Queue -- name of queue
-- custom name for this worker -- custom name for this worker
...@@ -210,21 +212,21 @@ type PerformAction b a = ...@@ -210,21 +212,21 @@ type PerformAction b a =
State b a -> BrokerMessage b (Job a) -> IO () State b a -> BrokerMessage b (Job a) -> IO ()
type HasWorkerBroker b a = ( HasBroker b (Job a), Typeable a ) type HasWorkerBroker b a = ( HasBroker b (Job a), Typeable a, Typeable b )
-- | Helper function to format a string with worker name (for logging) -- | Helper function to format a string with worker name (for logging)
formatStr :: State b a -> String -> String formatStr :: (HasWorkerBroker b a) => State b a -> String -> String
formatStr (State { name }) msg = formatStr (State { name }) msg =
"[" <> name <> "] " <> msg "[" <> name <> "] " <> msg
-- -- | Thrown when job times out -- -- | Thrown when job times out
data JobTimeout b a = data JobTimeout b a =
JobTimeout { jtState :: State b a JobTimeout { jtBMessage :: BrokerMessage b (Job a)
, jtBMessage :: BrokerMessage b (Job a)
, jtTimeout :: Timeout } , jtTimeout :: Timeout }
instance (HasWorkerBroker b a) => Show (JobTimeout b a) where deriving instance (HasWorkerBroker b a) => Show (JobTimeout b a)
show (JobTimeout { .. }) =
"JobTimeout worker = " <> name jtState <>
", jtMessageId = " <> show (messageId jtBMessage) <>
", jtTimeout = " <> show jtTimeout
instance (HasWorkerBroker b a) => Exception (JobTimeout b a) instance (HasWorkerBroker b a) => Exception (JobTimeout b a)
data JobException b a =
JobException { jeBMessage :: BrokerMessage b (Job a) }
deriving instance (HasWorkerBroker b a) => Show (JobException b a)
instance (HasWorkerBroker b a) => Exception (JobException b a)
...@@ -62,6 +62,7 @@ data Message a = ...@@ -62,6 +62,7 @@ data Message a =
, enqueuedAt :: ZonedTime , enqueuedAt :: ZonedTime
, vt :: ZonedTime , vt :: ZonedTime
, message :: a } , message :: a }
deriving (Show)
-- NOTE I'm not sure if this is needed -- NOTE I'm not sure if this is needed
instance Eq a => Eq (Message a) where instance Eq a => Eq (Message a) where
(==) msg1 msg2 = (==) msg1 msg2 =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment