[refactor] leave only pgmq stuff, move worker to separate repo

parent 4f9cd5e5
Pipeline #6423 failed with stages
in 6 minutes and 48 seconds
......@@ -7,7 +7,6 @@ cache:
stages:
- build
- unit-tests
- integration-tests
build:
......@@ -29,26 +28,6 @@ build:
script:
- cabal v2-build
unit-tests:
stage: unit-tests
image: haskell:9.4.7-slim
# https://stackoverflow.com/questions/76340763/what-gitlab-ci-configuration-will-reliably-cache-haskell-package-dependencies-bu
cache:
key: "v0-cabal"
paths:
- ".cabal"
before_script:
- |
cat >cabal.project.local <<EOF
store-dir: $CI_PROJECT_DIR/.cabal
remote-repo-cache: $CI_PROJECT_DIR/.cabal/packages
EOF
- apt-get update -qq && apt-get install -y -qq libpq-dev
- cabal update
script:
#- cabal v2-test test-pgmq-unit
- cabal v2-run test-unit -- +RTS -N -RTS
integration-tests:
stage: integration-tests
image: haskell:9.4.7-slim
......
{-# LANGUAGE QuasiQuotes #-}
{-
A very simple worker to test Database.PGMQ.Worker.
-}
module Main
where
import Async.Worker (sendJob', mkDefaultSendJob, mkDefaultSendJob', SendJob(..), run)
import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitParams))
import Async.Worker.Broker.Types (Broker, getMessage, toA, initBroker)
import Async.Worker.Types (State(..), PerformAction, getJob, formatStr, TimeoutStrategy(..), Job)
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (Exception, throwIO)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
-- import Database.PGMQ.Worker qualified as PGMQW
data SimpleException = SimpleException String
deriving Show
instance Exception SimpleException
data Message =
Ping
| Echo String
| Wait Int
| Error String
deriving (Eq, Show)
instance ToJSON Message where
toJSON Ping = toJSON ("ping" :: String)
toJSON (Echo s) = toJSON $ object [ ("echo" .= s) ]
toJSON (Wait s) = toJSON $ object [ ("wait" .= s) ]
toJSON (Error e) = toJSON $ object [ ("error" .= e) ]
instance FromJSON Message where
parseJSON v = parsePing v <|> parseEcho v <|> parseWait v <|> parseError v
where
parsePing = withText "Message (Ping)" $ \s -> do
case s of
"ping" -> pure Ping
s' -> fail $ T.unpack s'
parseEcho = withObject "Message (Echo)" $ \o -> do
s <- o .: "echo"
return $ Echo s
parseWait = withObject "Message (Wait)" $ \o -> do
s <- o .: "wait"
return $ Wait s
parseError = withObject "Message (Error)" $ \o -> do
e <- o .: "error"
return $ Error e
performAction' :: PerformAction PGMQBroker Message
performAction' s brokerMessage = do
let job = toA $ getMessage brokerMessage
case getJob job of
Ping -> putStrLn $ formatStr s "ping"
Echo str -> putStrLn $ formatStr s ("echo " <> str)
Wait int -> do
putStrLn $ formatStr s ( "waiting " <> show int <> " seconds")
threadDelay (int*second)
Error err -> do
putStrLn $ formatStr s ("error " <> err)
throwIO $ SimpleException $ "Error " <> err
second :: Int
second = 1000000
main :: IO ()
main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
let brokerInitParams = PGMQBrokerInitParams connInfo :: BrokerInitParams PGMQBroker (Job Message)
let queue = "simple_worker"
-- let workersLst = [1, 2, 3, 4] :: [Int]
let workersLst = [1, 2] :: [Int]
-- let tasksLst = [101, 102, 102, 103, 104, 105, 106, 107] :: [Int]
let tasksLst = [101] :: [Int]
-- let tasksLst = [] :: [Int]
mapM_ (\idx -> do
broker <- initBroker brokerInitParams :: IO (Broker PGMQBroker (Job Message))
let state = State { broker
, queueName = queue
, name = "worker " <> show idx
, performAction = performAction'
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
forkIO $ run state
) workersLst
-- delay so that the worker can initialize and settle
threadDelay second
conn <- PSQL.connect connInfo
broker <- initBroker brokerInitParams :: IO (Broker PGMQBroker (Job Message))
-- SendJob wrapper
let mkJob msg = mkDefaultSendJob' broker queue msg
mapM_ (\idx -> do
sendJob' $ mkJob $ Ping
sendJob' $ mkJob $ Wait 1
sendJob' $ mkJob $ Echo $ "hello " <> show idx
sendJob' $ mkJob $ Error $ "error " <> show idx
) tasksLst
-- a job that will timeout
let timedOut =
(mkDefaultSendJob broker queue (Wait 5) 1)
{ toStrat = TSRepeatNElseArchive 3 }
sendJob' timedOut
threadDelay (10*second)
metrics <- PGMQ.getMetrics conn queue
putStrLn $ "metrics: " <> show metrics
return ()
......@@ -65,13 +65,6 @@ library
exposed-modules: Database.PGMQ
, Database.PGMQ.Simple
, Database.PGMQ.Types
, Async.Worker
, Async.Worker.Broker
, Async.Worker.Broker.PGMQ
, Async.Worker.Broker.Redis
, Async.Worker.Broker.Types
, Async.Worker.Types
-- Modules included in this library but not exported.
-- other-modules:
......@@ -83,11 +76,8 @@ library
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, bytestring >= 0.11 && < 0.13
, hedis >= 0.15.2 && < 0.16
, mtl >= 2.2 && < 2.4
, postgresql-simple >= 0.6 && < 0.8
, safe >= 0.3 && < 0.4
, safe-exceptions >= 0.1.7 && < 0.2
, text >= 1.2 && < 2.2
, time >= 1.10 && < 1.15
, units >= 2.4 && < 2.5
......@@ -156,66 +146,6 @@ executable simple-test
RecordWildCards
ghc-options: -threaded
executable simple-worker
-- Import common warning flags.
import: warnings
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, mtl >= 2.2 && < 2.4
, postgresql-simple >= 0.6 && < 0.8
, text >= 1.2 && < 2.2
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: bin/simple-worker
main-is: Main.hs
-- Base language which the package is written in.
default-language: Haskell2010
default-extensions:
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
ghc-options: -threaded
test-suite test-unit
-- Import common warning flags.
import: warnings
type: exitcode-stdio-1.0
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, tasty >= 1.5 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.12
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: tests
main-is: unit-tests.hs
-- Base language which the package is written in.
default-language: Haskell2010
default-extensions:
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
ghc-options: -threaded
test-suite test-integration
-- Import common warning flags.
......@@ -223,15 +153,12 @@ test-suite test-integration
type: exitcode-stdio-1.0
other-modules: Test.Integration.Broker
, Test.Integration.PGMQ.Simple
other-modules: Test.Integration.PGMQ.Simple
, Test.Integration.Utils
, Test.Integration.Worker
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, containers >= 0.6 && < 0.8
, hedis >= 0.15.2 && < 0.16
, hspec >= 2.11 && < 3
, postgresql-simple >= 0.6 && < 0.8
, random-strings == 0.1.1.0
......
{-|
Module : Async.Worker
Description : Abstract async worker implementation using the 'Queue' typeclass
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Async.Worker
( run
, sendJob
, mkDefaultSendJob
, mkDefaultSendJob'
, sendJob'
, SendJob(..) )
where
import Async.Worker.Broker
import Async.Worker.Types
import Control.Exception.Safe (catch, fromException, throwIO, SomeException)
import Control.Monad (forever)
import System.Timeout qualified as Timeout
run :: (HasWorkerBroker b a) => State b a -> IO ()
run state@(State { .. }) = do
createQueue broker queueName
forever loop
where
loop :: IO ()
loop = do
-- TODO try...catch for main loop. This should catch exceptions
-- but also job timeout events (we want to stick to the practice
-- of keeping only one try...catch in the whole function)
catch (do
brokerMessage <- readMessageWaiting broker queueName
handleMessage state brokerMessage
callWorkerJobEvent onJobFinish state brokerMessage
) (\err ->
case fromException err of
Just jt@(JobTimeout {}) -> handleTimeoutError state jt
Nothing -> case fromException err of
Just je@(JobException {}) -> handleJobError state je
_ -> handleUnknownError state err)
handleMessage :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
handleMessage state@(State { .. }) brokerMessage = do
callWorkerJobEvent onMessageReceived state brokerMessage
let msgId = messageId brokerMessage
let msg = getMessage brokerMessage
let job' = toA msg
putStrLn $ formatStr state $ "received job: " <> show (job job')
let mdata = metadata job'
let t = jobTimeout job'
mTimeout <- Timeout.timeout (t * microsecond) (wrapPerformActionInJobException state brokerMessage)
let archiveHandler = do
case archiveStrategy mdata of
ASDelete -> do
-- putStrLn $ formatStr state $ "deleting completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
deleteMessage broker queueName msgId
ASArchive -> do
-- putStrLn $ formatStr state $ "archiving completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
archiveMessage broker queueName msgId
case mTimeout of
Just _ -> archiveHandler
Nothing -> do
callWorkerJobEvent onJobTimeout state brokerMessage
throwIO $ JobTimeout { jtBMessage = brokerMessage
, jtTimeout = t }
-- onMessageFetched broker queue msg
-- | It's important to know if an exception occured inside a job. This
-- way we can apply error recovering strategy and adjust this job in
-- the broker
wrapPerformActionInJobException :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
wrapPerformActionInJobException state@(State { onJobError }) brokerMessage = do
catch (do
runAction state brokerMessage
)
(\err -> do
callWorkerJobEvent onJobError state brokerMessage
let wrappedErr = JobException { jeBMessage = brokerMessage,
jeException = err }
throwIO wrappedErr
)
callWorkerJobEvent :: (HasWorkerBroker b a)
=> WorkerJobEvent b a
-> State b a
-> BrokerMessage b (Job a)
-> IO ()
callWorkerJobEvent Nothing _ _ = pure ()
callWorkerJobEvent (Just event) state brokerMessage = event state brokerMessage
handleTimeoutError :: (HasWorkerBroker b a) => State b a -> JobTimeout b a -> IO ()
handleTimeoutError state@(State { .. }) jt@(JobTimeout { .. }) = do
putStrLn $ formatStr state $ show jt
let msgId = messageId jtBMessage
let job = toA $ getMessage jtBMessage
putStrLn $ formatStr state $ "timeout for job: " <> show job
let mdata = metadata job
case timeoutStrategy mdata of
TSDelete -> deleteMessage broker queueName msgId
TSArchive -> archiveMessage broker queueName msgId
TSRepeat -> pure ()
TSRepeatNElseArchive n -> do
let readCt = readCount mdata
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
if readCt >= n then
archiveMessage broker queueName msgId
else do
-- NOTE In rare cases, when worker hangs, we might lose a job
-- here? (i.e. delete, then resend)
-- Also, be aware that messsage id will change with resend
-- Delete original job first
deleteMessage broker queueName msgId
-- Send this job again, with increased 'readCount'
sendJob broker queueName (job { metadata = mdata { readCount = readCt + 1 } })
TSRepeatNElseDelete _n -> do
-- TODO Implement 'readCt'
undefined
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
-- if readCt > n then
-- PGMQ.deleteMessage conn queue messageId
-- else
-- pure ()
handleJobError :: (HasWorkerBroker b a) => State b a -> JobException b a -> IO ()
handleJobError state@(State { .. }) je@(JobException { .. }) = do
let msgId = messageId jeBMessage
let job = toA $ getMessage jeBMessage
putStrLn $ formatStr state $ "error: " <> show je <> " for job " <> show job
let mdata = metadata job
case errorStrategy mdata of
ESDelete -> deleteMessage broker queueName msgId
ESArchive -> deleteMessage broker queueName msgId
ESRepeat -> return ()
handleUnknownError :: (HasWorkerBroker b a) => State b a -> SomeException -> IO ()
handleUnknownError state err = do
putStrLn $ formatStr state $ "unknown error: " <> show err
sendJob :: (HasWorkerBroker b a) => Broker b (Job a) -> Queue -> Job a -> IO ()
sendJob broker queueName job = do
sendMessage broker queueName $ toMessage job
microsecond :: Int
microsecond = 10^(6 :: Int)
-- Job has quite a few metadata. Here are some utilities for
-- constructing them more easily
-- | Wraps parameters for the 'sendJob' function
data (HasWorkerBroker b a) => SendJob b a =
SendJob { broker :: Broker b (Job a)
, queue :: Queue
, msg :: a
-- , delay :: Delay
, archStrat :: ArchiveStrategy
, errStrat :: ErrorStrategy
, toStrat :: TimeoutStrategy
, timeout :: Timeout }
-- | Create a 'SendJob' data with some defaults
mkDefaultSendJob :: HasWorkerBroker b a
=> Broker b (Job a)
-> Queue
-> a
-> Timeout
-> SendJob b a
mkDefaultSendJob broker queue msg timeout =
SendJob { broker
, queue
, msg
-- , delay = 0
-- | remove finished jobs
, archStrat = ASDelete
-- | archive errored jobs (for inspection later)
, errStrat = ESArchive
-- | repeat timed out jobs
, toStrat = TSRepeat
, timeout }
-- | Like 'mkDefaultSendJob' but with default timeout
mkDefaultSendJob' :: HasWorkerBroker b a
=> Broker b (Job a)
-> Queue
-> a
-> SendJob b a
mkDefaultSendJob' b q m = mkDefaultSendJob b q m defaultTimeout
where
defaultTimeout = 10
-- | Call 'sendJob' with 'SendJob b a' data
sendJob' :: (HasWorkerBroker b a) => SendJob b a -> IO ()
sendJob' (SendJob { .. }) = do
let metadata = JobMetadata { archiveStrategy = archStrat
, errorStrategy = errStrat
, timeoutStrategy = toStrat
, timeout = timeout
, readCount = 0 }
let job = Job { job = msg, metadata }
sendJob broker queue job
{-|
Module : Async.Worker.Broker
Description : Typeclass for the underlying broker that powers the worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
module Async.Worker.Broker
( module Async.Worker.Broker.Types )
where
import Async.Worker.Broker.Types
{-|
Module : Async.Worker.Broker.PGMQ
Description : PGMQ broker implementation
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.PGMQ
( PGMQBroker
, BrokerInitParams(..) )
where
import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
data PGMQBroker
instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
data Broker PGMQBroker a =
PGMQBroker' {
conn :: PSQL.Connection
}
data BrokerMessage PGMQBroker a = PGMQBM (PGMQ.Message a)
deriving (Show)
data Message PGMQBroker a = PGMQM a
data MessageId PGMQBroker = PGMQMid Int
deriving (Eq, Show)
data BrokerInitParams PGMQBroker a = PGMQBrokerInitParams PSQL.ConnectInfo
messageId (PGMQBM (PGMQ.Message { msgId })) = PGMQMid msgId
getMessage (PGMQBM (PGMQ.Message { message })) = PGMQM message
toMessage message = PGMQM message
toA (PGMQM message) = message
initBroker (PGMQBrokerInitParams connInfo) = do
conn <- PSQL.connect connInfo
PGMQ.initialize conn
pure $ PGMQBroker' { conn }
deinitBroker (PGMQBroker' { conn }) = PSQL.close conn
createQueue (PGMQBroker' { conn }) queue = do
PGMQ.createQueue conn queue
dropQueue (PGMQBroker' { conn }) queue = do
PGMQ.dropQueue conn queue
readMessageWaiting q@(PGMQBroker' { conn }) queue = loop
where
-- loop :: PGMQ.SerializableMessage a => IO (BrokerMessage PGMQBroker' a)
loop = do
mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
case mMsg of
Nothing -> readMessageWaiting q queue
Just msg -> return $ PGMQBM msg
sendMessage (PGMQBroker' { conn }) queue (PGMQM message) =
PGMQ.sendMessage conn queue message 0
deleteMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
PGMQ.deleteMessage conn queue msgId
archiveMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
PGMQ.archiveMessage conn queue msgId
getQueueSize (PGMQBroker' { conn }) queue = do
mMetrics <- PGMQ.getMetrics conn queue
case mMetrics of
Nothing -> return 0
Just (PGMQ.Metrics { queueLength }) -> return queueLength
{-|
Module : Async.Worker.Broker.Redis
Description : Redis broker functionality
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
Based on lists:
https://redis.io/glossary/redis-queue/
-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.Redis
( RedisBroker
, BrokerInitParams(..) )
where
import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage)
import Control.Monad (void)
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Database.Redis qualified as Redis
data RedisBroker
instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
data Broker RedisBroker a =
RedisBroker' {
conn :: Redis.Connection
}
data BrokerMessage RedisBroker a = RedisBM a
deriving (Show)
data Message RedisBroker a = RedisM a
data MessageId RedisBroker = RedisMid Int
deriving (Eq, Show)
data BrokerInitParams RedisBroker a = RedisBrokerInitParams Redis.ConnectInfo
-- We're using simple QUEUE so we don't care about message id as we
-- won't be deleting/archiving the messages
messageId _ = RedisMid 0
getMessage (RedisBM msg) = RedisM msg
toMessage message = RedisM message
toA (RedisM message) = message
initBroker (RedisBrokerInitParams connInfo) = do
conn <- Redis.checkedConnect connInfo
pure $ RedisBroker' { conn }
deinitBroker (RedisBroker' { conn }) = Redis.disconnect conn
-- createQueue (RedisBroker' { conn }) queue = do
createQueue _broker _queue = do
-- No need to actually pre-create queues
return ()
-- dropQueue (RedisBroker' { conn }) queue = do
dropQueue _broker _queue = do
-- We don't care about this
return ()
readMessageWaiting q@(RedisBroker' { conn }) queue = loop
where
loop = do
eMsg <- Redis.runRedis conn $ Redis.blpop [BS.pack queue] 10
case eMsg of
Left _ -> undefined
Right Nothing -> readMessageWaiting q queue
Right (Just (_queue, msg)) -> case Aeson.decode (BSL.fromStrict msg) of
Just dmsg -> return $ RedisBM dmsg
Nothing -> undefined
sendMessage (RedisBroker' { conn }) queue (RedisM message) =
void $ Redis.runRedis conn $ Redis.lpush (BS.pack queue) [BSL.toStrict $ Aeson.encode message]
-- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
deleteMessage _broker _queue _msgId = do
-- Nothing
return ()
-- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
archiveMessage _broker _queue _msgId = do
-- Nothing
return ()
getQueueSize (RedisBroker' { conn }) queue = do
eLen <- Redis.runRedis conn $ Redis.llen (BS.pack queue)
case eLen of
Right len -> return $ fromIntegral len
Left _ -> return 0
{-|
Module : Async.Worker.Broker.Types
Description : Typeclass for the underlying broker that powers the worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.Types
( Queue
-- , HasMessageId(..)
, HasBroker(..)
, SerializableMessage
)
where
import Data.Aeson (FromJSON, ToJSON)
import Data.Kind (Type)
import Data.Typeable (Typeable)
type Queue = String
{-| A message in the queue system must have some properties. In
particular, it must have some sort of 'id'.
-}
-- class (Eq msgId, Show msgId, Typeable msgId) => HasMessageId msg msgId where
-- messageId :: msg -> msgId
-- class HasMessageId m where
-- data family Message m :: Type
-- type family MessageId m :: Type
-- messageId :: Message m -> MessageId m
{- NOTE There are 3 types of messages here:
- 'a' the underlying, user-defined message
- 'Job a' worker definition, containing message metadata
- 'BrokerMessage message m', i.e. for PGMQ, it returns things like:
msgId, readCt, enqueuedAt, vt
- 'a' is read-write
- 'Job a' is read-write
- 'BrokerMessage' is read-only, i.e. we can't save it to broker and
it doesn't make sense to construct it on Haskell side. Instead, we
save 'Job a' and get 'BrokerMessage' when reading. In this sense,
read and send are not symmetrical (similarly, Opaleye has Read and
Write tables).
-}
-- | So this is the broker message that contains our message inside
-- class BrokerMessage brokerMessage message where
-- getMessage :: brokerMessage -> message
type SerializableMessage a = ( FromJSON a
, ToJSON a
-- NOTE This shouldn't be necessary
, Typeable a )
{-|
This is an interface for basic broker functionality.
-}
-- class Broker broker brokerMessage message msgId | brokerMessage -> message, brokerMessage -> msgId where
class (
Eq (MessageId b)
, Show (MessageId b)
, Show (BrokerMessage b a)
) => HasBroker b a where
-- | Data representing the broker
data family Broker b a :: Type
-- | Data represenging message that is returned by broker
data family BrokerMessage b a :: Type
-- | Data that we serialize into broker
data family Message b a :: Type
-- | How to get the message id (needed for delete/archive operations)
data family MessageId b :: Type
data family BrokerInitParams b a :: Type
-- The following are just constructors and deconstructors for the
-- 'BrokerMessage', 'Message' data types
-- | Operation for getting the message id from 'BrokerMessage'
-- messageId :: (Eq (MessageId b), Show (MessageId b)) => BrokerMessage b a -> MessageId b
messageId :: BrokerMessage b a -> MessageId b
-- | 'BrokerMessage' contains 'Message' inside, this is a
-- deconstructor for 'BrokerMessage'
getMessage :: BrokerMessage b a -> Message b a
-- | Convert 'a' to 'Message b a'
toMessage :: a -> Message b a
-- | Convert 'Message b a' to 'a'
toA :: Message b a -> a
-- | Initialize broker
initBroker :: BrokerInitParams b a -> IO (Broker b a)
-- | Deconstruct broker (e.g. close DB connection)
deinitBroker :: Broker b a -> IO ()
{-| Create new queue with given name. Optionally any other
initializations can be added here. -}
createQueue :: Broker b a -> Queue -> IO ()
{-| Drop queue -}
dropQueue :: Broker b a -> Queue -> IO ()
{-| Read message, waiting for it if not present -}
-- readMessageWaiting :: SerializableMessage a => Broker b a -> Queue -> IO (BrokerMessage b a)
readMessageWaiting :: Broker b a -> Queue -> IO (BrokerMessage b a)
{-| Send message -}
-- sendMessage :: SerializableMessage a => Broker b a -> Queue -> Message b a -> IO ()
sendMessage :: Broker b a -> Queue -> Message b a -> IO ()
{-| Delete message -}
deleteMessage :: Broker b a -> Queue -> MessageId b -> IO ()
{-| Archive message -}
archiveMessage :: Broker b a -> Queue -> MessageId b -> IO ()
{-| Queue size -}
getQueueSize :: Broker b a -> Queue -> IO Int
{-|
Module : Async.Worker.Types
Description : Types for the async worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE RankNTypes #-}
module Async.Worker.Types
( ArchiveStrategy(..)
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMetadata(..)
, Job(..)
, getJob
, jobTimeout
, JobMessage
, State(..)
, PerformAction
, JobTimeout(..) )
where
import Control.Applicative ((<|>))
import Control.Exception (Exception)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Database.PGMQ.Types qualified as PGMQ
import Database.PostgreSQL.Simple qualified as PSQL
type Timeout = Int
-- | Strategy for archiving jobs
data ArchiveStrategy =
-- | Delete message when it's done
ASDelete
-- | Archive message when it's done
| ASArchive
deriving (Eq, Show)
instance ToJSON ArchiveStrategy where
toJSON ASDelete = toJSON ("ASDelete" :: String)
toJSON ASArchive = toJSON ("ASArchive" :: String)
instance FromJSON ArchiveStrategy where
parseJSON = withText "ArchiveStrategy" $ \s -> do
case s of
"ASDelete" -> pure ASDelete
"ASArchive" -> pure ASArchive
s' -> fail $ T.unpack s'
-- | Strategy of handling jobs with errors
data ErrorStrategy =
-- | Delete job when it threw an error
ESDelete
-- | Archive job when it threw an error
| ESArchive
-- | Try to repeat the job when an error ocurred
| ESRepeat
deriving (Eq, Show)
instance ToJSON ErrorStrategy where
toJSON ESDelete = toJSON ("ESDelete" :: String)
toJSON ESArchive = toJSON ("ESArchive" :: String)
toJSON ESRepeat = toJSON ("ESRepeat" :: String)
instance FromJSON ErrorStrategy where
parseJSON = withText "ErrorStrategy" $ \s -> do
case s of
"ESDelete" -> pure ESDelete
"ESArchive" -> pure ESArchive
"ESRepeat" -> pure ESRepeat
s' -> fail $ T.unpack s'
-- | Strategy for handling timeouts
data TimeoutStrategy =
-- | Delete job when it timed out
TSDelete
-- | Archive job when it timed out
| TSArchive
-- | Repeat job when it timed out (inifinitely)
| TSRepeat
-- | Repeat job when it timed out (but only maximum number of times), otherwise archive it
| TSRepeatNElseArchive Int
-- | Repeat job when it timed out (but only maximum number of times), otherwise delete it
| TSRepeatNElseDelete Int
deriving (Eq, Show)
instance ToJSON TimeoutStrategy where
toJSON TSDelete = toJSON ("TSDelete" :: String)
toJSON TSArchive = toJSON ("TSArchive" :: String)
toJSON TSRepeat = toJSON ("TSRepeat" :: String)
toJSON (TSRepeatNElseArchive n) = toJSON $ object [ ("TSRepeatNElseArchive" .= n) ]
toJSON (TSRepeatNElseDelete n) = toJSON $ object [ ("TSRepeatNElseDelete" .= n) ]
instance FromJSON TimeoutStrategy where
parseJSON v = parseText v
<|> parseTSRepeatNElseArchive v
<|> parseTSRepeatNElseDelete v
where
-- | Parser for textual formats
parseText = withText "TimeoutStrategy (text)" $ \s -> do
case s of
"TSDelete" -> pure TSDelete
"TSArchive" -> pure TSArchive
"TSRepeat" -> pure TSRepeat
s' -> fail $ T.unpack s'
-- | Parser for 'TSRepeatN' object
parseTSRepeatNElseArchive = withObject "TimeoutStrategy (TSRepeatNElseArchive)" $ \o -> do
n <- o .: "TSRepeatNElseArchive"
pure $ TSRepeatNElseArchive n
parseTSRepeatNElseDelete = withObject "TimeoutStrategy (TSRepeatNElseDelete)" $ \o -> do
n <- o .: "TSRepeatNElseDelete"
pure $ TSRepeatNElseDelete n
-- | Job metadata
data JobMetadata =
JobMetadata { archiveStrategy :: ArchiveStrategy
, errorStrategy :: ErrorStrategy
, timeoutStrategy :: TimeoutStrategy
-- | If not-empty, sets a custom visibility timeout for
-- | a job. Otherwise, worker's State.visibilityTimeout
-- | will be used.
, timeout :: Timeout }
deriving (Eq, Show)
instance ToJSON JobMetadata where
toJSON (JobMetadata { .. }) =
toJSON $ object [
( "astrat" .= archiveStrategy )
, ( "estrat" .= errorStrategy )
, ( "tstrat" .= timeoutStrategy )
, ( "timeout" .= timeout )
]
instance FromJSON JobMetadata where
parseJSON = withObject "JobMetadata" $ \o -> do
archiveStrategy <- o .: "astrat"
errorStrategy <- o .: "estrat"
timeoutStrategy <- o .: "tstrat"
timeout <- o .: "timeout"
return $ JobMetadata { .. }
-- | Worker has specific message type, because each message carries
-- | around some metadata for the worker itself
data Job a =
Job { job :: a
, metadata :: JobMetadata }
deriving (Eq, Show)
getJob :: Job a -> a
getJob (Job { job }) = job
instance ToJSON a => ToJSON (Job a) where
toJSON (Job { .. }) =
toJSON $ object [
("metadata" .= metadata)
, ("job" .= job)
]
instance FromJSON a => FromJSON (Job a) where
parseJSON = withObject "Job" $ \o -> do
metadata <- o .: "metadata"
job <- o .: "job"
return $ Job { .. }
jobTimeout :: Job a -> PGMQ.VisibilityTimeout
jobTimeout (Job { metadata }) = timeout metadata
-- | The worker job, as it is serialized in a PGMQ table
type JobMessage a = PGMQ.Message (Job a)
-- | Main state for a running worker
data State a =
State { connectInfo :: PSQL.ConnectInfo
-- custom name for this worker
, name :: String
, performAction :: PerformAction a
, queue :: PGMQ.Queue
-- -- | Time in seconds that the message become invisible after reading.
-- , visibilityTimeout :: PGMQ.VisibilityTimeout
-- | Time in seconds to wait for new messages to reach the queue. Defaults to 5.
, maxPollSeconds :: PGMQ.Delay
-- | Milliseconds between the internal poll operations. Defaults to 100.
, pollIntervalMs :: PGMQ.PollIntervalMs }
-- | Callback definition (what to execute when a message arrives)
type PerformAction a = State a -> JobMessage a -> IO ()
-- | Thrown when job times out
data JobTimeout =
JobTimeout { messageId :: Int
, vt :: PGMQ.VisibilityTimeout }
deriving Show
instance Exception JobTimeout
{-|
Module : Async.Worker.Types
Description : Types for the async worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
module Async.Worker.Types
( ArchiveStrategy(..)
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMetadata(..)
, defaultMetadata
, Job(..)
, getJob
, jobTimeout
, Timeout
-- , JobMessage
-- , WorkerBroker
, State(..)
, WorkerJobEvent
, runAction
, PerformAction
, HasWorkerBroker
, formatStr
, JobTimeout(..)
, JobException(..) )
where
import Async.Worker.Broker.Types (Broker, BrokerMessage, HasBroker, Queue)
import Control.Applicative ((<|>))
import Control.Exception.Safe (Exception, SomeException)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Data.Typeable (Typeable)
type ReadCount = Int
type Timeout = Int
-- | Strategy for archiving jobs
data ArchiveStrategy =
-- | Delete message when it's done
ASDelete
-- | Archive message when it's done
| ASArchive
deriving (Eq, Show)
instance ToJSON ArchiveStrategy where
toJSON ASDelete = toJSON ("ASDelete" :: String)
toJSON ASArchive = toJSON ("ASArchive" :: String)
instance FromJSON ArchiveStrategy where
parseJSON = withText "ArchiveStrategy" $ \s -> do
case s of
"ASDelete" -> pure ASDelete
"ASArchive" -> pure ASArchive
s' -> fail $ T.unpack s'
-- | Strategy of handling jobs with errors
data ErrorStrategy =
-- | Delete job when it threw an error
ESDelete
-- | Archive job when it threw an error
| ESArchive
-- | Try to repeat the job when an error ocurred
| ESRepeat
-- TODO Repeat N times
deriving (Eq, Show)
instance ToJSON ErrorStrategy where
toJSON ESDelete = toJSON ("ESDelete" :: String)
toJSON ESArchive = toJSON ("ESArchive" :: String)
toJSON ESRepeat = toJSON ("ESRepeat" :: String)
instance FromJSON ErrorStrategy where
parseJSON = withText "ErrorStrategy" $ \s -> do
case s of
"ESDelete" -> pure ESDelete
"ESArchive" -> pure ESArchive
"ESRepeat" -> pure ESRepeat
s' -> fail $ T.unpack s'
-- | Strategy for handling timeouts
data TimeoutStrategy =
-- | Delete job when it timed out
TSDelete
-- | Archive job when it timed out
| TSArchive
-- | Repeat job when it timed out (inifinitely)
| TSRepeat
-- | Repeat job when it timed out (but only maximum number of times), otherwise archive it
| TSRepeatNElseArchive Int
-- | Repeat job when it timed out (but only maximum number of times), otherwise delete it
| TSRepeatNElseDelete Int
deriving (Eq, Show)
instance ToJSON TimeoutStrategy where
toJSON TSDelete = toJSON ("TSDelete" :: String)
toJSON TSArchive = toJSON ("TSArchive" :: String)
toJSON TSRepeat = toJSON ("TSRepeat" :: String)
toJSON (TSRepeatNElseArchive n) = toJSON $ object [ ("TSRepeatNElseArchive" .= n) ]
toJSON (TSRepeatNElseDelete n) = toJSON $ object [ ("TSRepeatNElseDelete" .= n) ]
instance FromJSON TimeoutStrategy where
parseJSON v = parseText v
<|> parseTSRepeatNElseArchive v
<|> parseTSRepeatNElseDelete v
where
-- | Parser for textual formats
parseText = withText "TimeoutStrategy (text)" $ \s -> do
case s of
"TSDelete" -> pure TSDelete
"TSArchive" -> pure TSArchive
"TSRepeat" -> pure TSRepeat
s' -> fail $ T.unpack s'
-- | Parser for 'TSRepeatN' object
parseTSRepeatNElseArchive = withObject "TimeoutStrategy (TSRepeatNElseArchive)" $ \o -> do
n <- o .: "TSRepeatNElseArchive"
pure $ TSRepeatNElseArchive n
parseTSRepeatNElseDelete = withObject "TimeoutStrategy (TSRepeatNElseDelete)" $ \o -> do
n <- o .: "TSRepeatNElseDelete"
pure $ TSRepeatNElseDelete n
-- | Job metadata
data JobMetadata =
JobMetadata { archiveStrategy :: ArchiveStrategy
, errorStrategy :: ErrorStrategy
, timeoutStrategy :: TimeoutStrategy
-- | Time after which the job is considered to time-out
-- (in seconds).
, timeout :: Timeout
-- | Read count so we know how many times this message
-- was processed
, readCount :: ReadCount }
deriving (Eq, Show)
instance ToJSON JobMetadata where
toJSON (JobMetadata { .. }) =
toJSON $ object [
( "astrat" .= archiveStrategy )
, ( "estrat" .= errorStrategy )
, ( "tstrat" .= timeoutStrategy )
, ( "timeout" .= timeout )
, ( "readCount" .= readCount )
]
instance FromJSON JobMetadata where
parseJSON = withObject "JobMetadata" $ \o -> do
archiveStrategy <- o .: "astrat"
errorStrategy <- o .: "estrat"
timeoutStrategy <- o .: "tstrat"
timeout <- o .: "timeout"
readCount <- o .: "readCount"
return $ JobMetadata { .. }
defaultMetadata :: JobMetadata
defaultMetadata =
JobMetadata { archiveStrategy = ASArchive
, errorStrategy = ESArchive
, timeoutStrategy = TSArchive
, timeout = 10
, readCount = 0 }
-- | Worker has specific message type, because each message carries
-- | around some metadata for the worker itself
data Job a =
Job { job :: a
, metadata :: JobMetadata }
deriving (Eq, Show)
getJob :: Job a -> a
getJob (Job { job }) = job
instance ToJSON a => ToJSON (Job a) where
toJSON (Job { .. }) =
toJSON $ object [
("metadata" .= metadata)
, ("job" .= job)
]
instance FromJSON a => FromJSON (Job a) where
parseJSON = withObject "Job" $ \o -> do
metadata <- o .: "metadata"
job <- o .: "job"
return $ Job { .. }
jobTimeout :: Job a -> Timeout
jobTimeout (Job { metadata }) = timeout metadata
-- | The worker job, as it is serialized in a queue
-- type JobMessage a msgId = HasMessageId (Job a) msgId
-- | Broker associated with the abstract worker
-- type WorkerBroker b brokerMessage a msgId = Broker b brokerMessage (Job a) msgId
-- | Main state for a running worker ('b' is broker, 'a' is the
-- underlying message).
--
-- For a worker, 'a' is the underlying message specification,
-- implementation-dependent (e.g. can be of form {'function': ...,
-- 'arguments: [...]}), 'Job a' is worker's wrapper around that
-- message with metadata (corresponds to broker 'Message b a' and
-- 'BrokerMessage b a' is what we get when the broker reads that
-- message)
data (HasWorkerBroker b a) => State b a =
State { broker :: Broker b (Job a)
, queueName :: Queue -- name of queue
-- custom name for this worker
, name :: String
, performAction :: PerformAction b a
-- | These events are useful for debugging or adding custom functionality
, onMessageReceived :: WorkerJobEvent b a
, onJobFinish :: WorkerJobEvent b a
, onJobTimeout :: WorkerJobEvent b a
, onJobError :: WorkerJobEvent b a }
runAction :: (HasWorkerBroker b a) => State b a -> BrokerMessage b (Job a) -> IO ()
runAction state brokerMessage = (performAction state) state brokerMessage
type WorkerJobEvent b a = Maybe (State b a -> BrokerMessage b (Job a) -> IO ())
-- | Callback definition (what to execute when a message arrives)
-- type PerformAction broker brokerMessage a msgId =
-- (WorkerBroker broker brokerMessage a msgId, JobMessage a msgId)
-- => State broker brokerMessage a msgId -> brokerMessage -> IO ()
type PerformAction b a =
State b a -> BrokerMessage b (Job a) -> IO ()
-- TODO 'Show a' could be removed. Any logging can be done as part of
-- 'on' events in 'State'.
type HasWorkerBroker b a = ( HasBroker b (Job a), Typeable a, Typeable b, Show a )
-- | Helper function to format a string with worker name (for logging)
formatStr :: (HasWorkerBroker b a) => State b a -> String -> String
formatStr (State { name }) msg =
"[" <> name <> "] " <> msg
-- -- | Thrown when job times out
data JobTimeout b a =
JobTimeout { jtBMessage :: BrokerMessage b (Job a)
, jtTimeout :: Timeout }
deriving instance (HasWorkerBroker b a) => Show (JobTimeout b a)
instance (HasWorkerBroker b a) => Exception (JobTimeout b a)
data JobException b a =
JobException { jeBMessage :: BrokerMessage b (Job a)
, jeException :: SomeException }
deriving instance (HasWorkerBroker b a) => Show (JobException b a)
instance (HasWorkerBroker b a) => Exception (JobException b a)
{-|
Generic Broker tests. All brokers should satisfy them.
-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.Integration.Broker
( brokerTests
, pgmqBrokerInitParams
, redisBrokerInitParams )
where
import Async.Worker.Broker.PGMQ qualified as PGMQ
import Async.Worker.Broker.Redis qualified as Redis
import Async.Worker.Broker.Types qualified as BT
import Control.Exception (bracket)
import Data.Aeson (ToJSON(..), FromJSON(..), withText)
import Data.Text qualified as T
import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName)
data TestEnv b =
TestEnv { broker :: BT.Broker b Message
, queue :: BT.Queue }
testQueuePrefix :: BT.Queue
testQueuePrefix = "test_broker"
data Message =
Message { text :: String }
deriving (Show, Eq)
instance ToJSON Message where
toJSON (Message { text }) = toJSON text
instance FromJSON Message where
parseJSON = withText "Message" $ \text -> do
pure $ Message { text = T.unpack text }
withBroker :: (BT.HasBroker b Message)
=> BT.BrokerInitParams b Message
-> (TestEnv b -> IO ())
-> IO ()
withBroker bInitParams = bracket (setUpBroker bInitParams) tearDownBroker
where
-- NOTE I need to pass 'b' again, otherwise GHC can't infer the
-- type of 'b' (even with 'ScopedTypeVariables' turned on)
setUpBroker :: (BT.HasBroker b Message)
=> BT.BrokerInitParams b Message -> IO (TestEnv b)
setUpBroker bInit = do
b <- BT.initBroker bInit
queue <- randomQueueName testQueuePrefix
BT.dropQueue b queue
BT.createQueue b queue
return $ TestEnv { broker = b
, queue }
tearDownBroker (TestEnv { broker, queue }) = do
BT.dropQueue broker queue
BT.deinitBroker broker
brokerTests :: (BT.HasBroker b Message)
=> BT.BrokerInitParams b Message -> Spec
brokerTests bInitParams =
parallel $ around (withBroker bInitParams) $ describe "Broker tests" $ do
it "can send and receive a message" $ \(TestEnv { broker, queue }) -> do
BT.dropQueue broker queue
BT.createQueue broker queue
let msg = Message { text = "test" }
BT.sendMessage broker queue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting broker queue
putStrLn $ "[messageId] " <> show (BT.messageId msg2)
msg `shouldBe` (BT.toA $ BT.getMessage msg2)
pgmqBrokerInitParams :: IO (BT.BrokerInitParams PGMQ.PGMQBroker Message)
pgmqBrokerInitParams = do
PGMQ.PGMQBrokerInitParams <$> getPSQLEnvConnectInfo
redisBrokerInitParams :: IO (BT.BrokerInitParams Redis.RedisBroker Message)
redisBrokerInitParams = do
Redis.RedisBrokerInitParams <$> getRedisEnvConnectInfo
......@@ -2,7 +2,6 @@ module Test.Integration.PGMQ.Simple
( pgmqSimpleTests )
where
import Async.Worker.Broker qualified as B
import Control.Exception (bracket)
import Data.Maybe (isJust)
import Database.PostgreSQL.Simple qualified as PSQL
......@@ -15,12 +14,12 @@ import Test.Integration.Utils (getPSQLEnvConnectInfo, randomQueueName)
data TestEnv =
TestEnv {
conn :: PSQL.Connection
, queue :: B.Queue
, queue :: PGMQ.Queue
}
-- NOTE These tests expect a local pgmq server runnign on port 5432.
testQueuePrefix :: B.Queue
testQueuePrefix :: PGMQ.Queue
testQueuePrefix = "test_pgmq"
setUpConn :: IO TestEnv
......
module Test.Integration.Utils
( getPSQLEnvConnectInfo
, getRedisEnvConnectInfo
, randomQueueName )
where
import Async.Worker.Broker qualified as B
import Data.Maybe (fromMaybe)
import Database.PGMQ.Types (Queue)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.Redis qualified as Redis
import System.Environment (lookupEnv)
import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum)
......@@ -24,13 +22,7 @@ getPSQLEnvConnectInfo = do
, PSQL.connectHost = fromMaybe "localhost" pgHost
, PSQL.connectPassword = fromMaybe "postgres" pgPass }
getRedisEnvConnectInfo :: IO Redis.ConnectInfo
getRedisEnvConnectInfo = do
redisHost <- lookupEnv "REDIS_HOST"
-- https://hackage.haskell.org/package/hedis-0.15.2/docs/Database-Redis.html#v:defaultConnectInfo
pure $ Redis.defaultConnectInfo { Redis.connectHost = fromMaybe "localhost" redisHost }
randomQueueName :: B.Queue -> IO B.Queue
randomQueueName :: Queue -> IO Queue
randomQueueName prefix = do
postfix <- randomString (onlyAlphaNum randomASCII) 10
return $ prefix <> "_" <> postfix
This diff is collapsed.
module Main where
-- import Test.Integration.Broker (brokerTests, pgmqBrokerInitParams, redisBrokerInitParams)
-- import Test.Integration.PGMQ.Simple (pgmqSimpleTests)
import Test.Integration.Worker (workerTests, {- pgmqWorkerBrokerInitParams, -} redisWorkerBrokerInitParams)
import Test.Integration.PGMQ.Simple (pgmqSimpleTests)
import Test.Tasty
import Test.Tasty.Hspec
......@@ -10,24 +8,10 @@ import Test.Tasty.Hspec
main :: IO ()
main = do
-- pgmqBInitParams <- pgmqBrokerInitParams
-- pgmqBrokerSpec <- testSpec "brokerTests (pgmq)" (brokerTests pgmqBInitParams)
-- pgmqWBInitParams <- pgmqWorkerBrokerInitParams
-- pgmqWorkerSpec <- testSpec "workerTests (pgmq)" (workerTests pgmqWBInitParams)
-- pgmqSimpleSpec <- testSpec "pgmqSimpleTests" pgmqSimpleTests
-- redisBInitParams <- redisBrokerInitParams
-- redisBrokerSpec <- testSpec "brokerTests (redis)" (brokerTests redisBInitParams)
redisWBInitParams <- redisWorkerBrokerInitParams
redisWorkerSpec <- testSpec "workerTests (redis)" (workerTests redisWBInitParams)
pgmqSimpleSpec <- testSpec "pgmqSimpleTests" pgmqSimpleTests
defaultMain $ testGroup "integration tests"
[
-- pgmqBrokerSpec
-- , pgmqSimpleSpec
-- , pgmqWorkerSpec
-- , redisBrokerSpec
redisWorkerSpec
pgmqSimpleSpec
]
{-# OPTIONS_GHC -Wno-orphans -Wno-missing-signatures #-}
module Main where
import Async.Worker.Types qualified as WT
import Data.Aeson qualified as Aeson
import Test.Tasty
import Test.Tasty.QuickCheck as QC
main = defaultMain tests
tests :: TestTree
tests = testGroup "Tests" [propertyTests, unitTests]
propertyTests = testGroup "Property tests" [aesonPropTests]
aesonPropTests = testGroup "Aeson (de-)serialization property tests" $
[ aesonPropJobMetadataTests
, aesonPropJobTests ]
instance QC.Arbitrary WT.ArchiveStrategy where
arbitrary = QC.elements [ WT.ASDelete, WT.ASArchive ]
instance QC.Arbitrary WT.ErrorStrategy where
arbitrary = QC.elements [ WT.ESDelete, WT.ESArchive, WT.ESRepeat ]
instance QC.Arbitrary WT.TimeoutStrategy where
arbitrary = do
n <- arbitrary
m <- arbitrary
QC.elements [ WT.TSDelete
, WT.TSArchive
, WT.TSRepeat
, WT.TSRepeatNElseArchive n
, WT.TSRepeatNElseDelete m ]
instance QC.Arbitrary WT.JobMetadata where
arbitrary = do
archiveStrategy <- arbitrary
errorStrategy <- arbitrary
timeoutStrategy <- arbitrary
timeout <- arbitrary
readCount <- arbitrary
return $ WT.JobMetadata { .. }
aesonPropJobMetadataTests = testGroup "Aeson WT.JobMetadata (de-)serialization tests" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\jm ->
Aeson.decode (Aeson.encode (jm :: WT.JobMetadata)) == Just jm
]
instance QC.Arbitrary a => QC.Arbitrary (WT.Job a) where
arbitrary = WT.Job <$> arbitrary <*> arbitrary
aesonPropJobTests = testGroup "Aeson WT.Job (de-)serialization tests" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\j ->
Aeson.decode (Aeson.encode (j :: WT.Job String)) == Just j
]
unitTests = testGroup "Unit tests" []
-- [ testCase "List comparison (different length)" $
-- [1, 2, 3] `compare` [1,2] @?= GT
-- -- the following test does not hold
-- , testCase "List comparison (same length)" $
-- [1, 2, 3] `compare` [1,2,2] @?= LT
-- ]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment