[worker] simplify exception catching

parent 29113f05
Pipeline #6510 failed with stages
in 8 minutes and 14 seconds
......@@ -39,7 +39,7 @@ import Async.Worker.Broker
import Async.Worker.Types
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (readTVarIO, newTVarIO, writeTVar)
import Control.Exception.Safe (catch, fromException, throwIO, SomeException, Exception)
import Control.Exception.Safe (catches, Handler(..), throwIO, SomeException, Exception)
import Control.Monad (forever, void, when)
import Debug.Trace (traceStack)
import System.Timeout qualified as Timeout
......@@ -70,40 +70,40 @@ run state@(State { .. }) = do
-- exception handling.
mBrokerMessageTVar <- newTVarIO Nothing -- :: IO (TVar (Maybe (BrokerMessage b (Job a))))
catch (do
catches (do
brokerMessage <- readMessageWaiting broker queueName
atomically $ writeTVar mBrokerMessageTVar (Just brokerMessage)
handleMessage state brokerMessage
callWorkerJobEvent onJobFinish state brokerMessage
atomically $ writeTVar mBrokerMessageTVar Nothing
) (\err -> do
mBrokerMessage <- readTVarIO mBrokerMessageTVar
case fromException err of
Just KillWorkerSafely -> do
case mBrokerMessage of
Just brokerMessage -> do
let job = toA $ getMessage brokerMessage
let mdata = metadata job
-- Should we resend this message?
when (resendWhenWorkerKilled mdata) $ do
putStrLn $ formatStr state $ "resending job: " <> show job
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCount mdata + 1 } })
size <- getQueueSize broker queueName
putStrLn $ formatStr state $ "queue size: " <> show size
-- In any case, deinit the broker (i.e. close connection)
-- deinitBroker broker
) [
Handler $ \(_err :: KillWorkerSafely) -> do
mBrokerMessage <- readTVarIO mBrokerMessageTVar
case mBrokerMessage of
Just brokerMessage -> do
let job = toA $ getMessage brokerMessage
let mdata = metadata job
-- Should we resend this message?
when (resendWhenWorkerKilled mdata) $ do
putStrLn $ formatStr state $ "resending job: " <> show job
void $ sendJob broker queueName (job { metadata = mdata { readCount = readCount mdata + 1 } })
size <- getQueueSize broker queueName
putStrLn $ formatStr state $ "queue size: " <> show size
-- kill worker
throwIO KillWorkerSafely
Nothing -> pure ()
Nothing -> case fromException err of
Just jt@(JobTimeout {}) -> handleTimeoutError state jt
Nothing -> case mBrokerMessage of
-- In any case, deinit the broker (i.e. close connection)
-- deinitBroker broker
-- kill worker
throwIO KillWorkerSafely
Nothing -> pure ()
, Handler $ \(err :: JobTimeout b a) -> handleTimeoutError state err
, Handler $ \err -> do
mBrokerMessage <- readTVarIO mBrokerMessageTVar
case mBrokerMessage of
Just brokerMessage -> do
callWorkerJobEvent onJobError state brokerMessage
handleJobError state brokerMessage
_ -> handleUnknownError state err)
Nothing -> handleUnknownError state err
]
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state@(State { .. }) brokerMessage = do
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment