Commit 1bfa37fd authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

Merge branch 'polymorphic-worker' into 'master'

Polymorphic worker

See merge request !1
parents 7b2b77cc 8b6dedca
Pipeline #6426 canceled with stages
# You can lint your .gitlab-ci.yml file here:
# https://gitlab.iscpif.fr/gargantext/haskell-pgmq/-/ci/lint
cache:
paths:
- dist-newstyle/
stages:
- build
- integration-tests
build:
stage: build
image: haskell:9.4.7-slim
# https://stackoverflow.com/questions/76340763/what-gitlab-ci-configuration-will-reliably-cache-haskell-package-dependencies-bu
cache:
key: "v0-cabal"
paths:
- ".cabal"
before_script:
- |
cat >cabal.project.local <<EOF
store-dir: $CI_PROJECT_DIR/.cabal
remote-repo-cache: $CI_PROJECT_DIR/.cabal/packages
EOF
- apt-get update -qq && apt-get install -y -qq libpq-dev
- cabal update
script:
- cabal v2-build
integration-tests:
stage: integration-tests
image: haskell:9.4.7-slim
services:
- name: docker.io/cgenie/pgmq:16-1.3.3.1
alias: pgmq
variables:
# Configure postgres service (https://hub.docker.com/_/postgres/)
POSTGRES_DB: pgmq
POSTGRES_USER: pgmq
POSTGRES_PASSWORD: pgmq
# https://stackoverflow.com/questions/76340763/what-gitlab-ci-configuration-will-reliably-cache-haskell-package-dependencies-bu
cache:
key: "v0-cabal"
paths:
- ".cabal"
before_script:
- |
cat >cabal.project.local <<EOF
store-dir: $CI_PROJECT_DIR/.cabal
remote-repo-cache: $CI_PROJECT_DIR/.cabal/packages
EOF
- apt-get update -qq && apt-get install -y -qq libpq-dev
- cabal update
script:
- export POSTGRES_HOST=pgmq
#- cabal v2-test test-pgmq-integration
# cabal v2-test -j seems not to work correctly, so we just run the integration test
- cabal v2-run test-integration -- +RTS -N -RTS
...@@ -9,13 +9,27 @@ Based on [elixir pgmq](https://hexdocs.pm/pgmq/Pgmq.html) bindings. ...@@ -9,13 +9,27 @@ Based on [elixir pgmq](https://hexdocs.pm/pgmq/Pgmq.html) bindings.
There is a binary in [./bin/simple-test/Main.hs](./bin/simple-test/Main.hs), it should contain all test cases for the basic module `Database.PGMQ.Simple`. There is a binary in [./bin/simple-test/Main.hs](./bin/simple-test/Main.hs), it should contain all test cases for the basic module `Database.PGMQ.Simple`.
First, let's decide which container to use:
```shell
export PGMQ_CONTAINER=docker.io/cgenie/pgmq:16-1.3.3.1
```
However with the above, some tests might fail because some functions
throw errors when a table is not there (e.g. `pgmq.drop_queue`).
Some fixes are made in our custom repo:
https://github.com/garganscript/pgmq
and you can use:
```shell
export PGMQ_CONTAINER=cgenie/pgmq:16-1.3.3.1
```
To run it, first start a pgmq container: To run it, first start a pgmq container:
```shell ```shell
podman run --name pgmq -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest podman run --name pgmq -e POSTGRES_PASSWORD=postgres -p 5432:5432 $PG_CONTAINER
``` ```
or with Docker: or with Docker:
```shell ```shell
docker run --name pgmq -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest docker run --name pgmq -e POSTGRES_PASSWORD=postgres -p 5432:5432 $PG_CONTAINER
``` ```
Then run the tests: Then run the tests:
```shell ```shell
......
...@@ -8,7 +8,7 @@ Testing exception catch for PSQL. ...@@ -8,7 +8,7 @@ Testing exception catch for PSQL.
module Main module Main
where where
import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException) import Control.Exception (SomeException(..), catch)
import Control.Monad (void) import Control.Monad (void)
import Database.PostgreSQL.Simple qualified as PSQL import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.SqlQQ (sql) import Database.PostgreSQL.Simple.SqlQQ (sql)
......
...@@ -22,7 +22,7 @@ main = do ...@@ -22,7 +22,7 @@ main = do
PGMQ.initialize conn PGMQ.initialize conn
PGMQ.dropQueue conn "test" PGMQ.dropQueue conn "test"
metrics <- PGMQ.getMetrics conn "test" _metrics <- PGMQ.getMetrics conn "test"
let queue = "test" let queue = "test"
PGMQ.createQueue conn "test" PGMQ.createQueue conn "test"
...@@ -81,8 +81,8 @@ main = do ...@@ -81,8 +81,8 @@ main = do
metrics <- PGMQ.getMetrics conn queue metrics <- PGMQ.getMetrics conn queue
putStrLn $ "before purge: " <> show metrics putStrLn $ "before purge: " <> show metrics
PGMQ.purgeQueue conn queue PGMQ.purgeQueue conn queue
metrics <- PGMQ.getMetrics conn queue metrics2 <- PGMQ.getMetrics conn queue
putStrLn $ "after purge: " <> show metrics putStrLn $ "after purge: " <> show metrics2
PGMQ.dropQueue conn queue PGMQ.dropQueue conn queue
......
{-# LANGUAGE QuasiQuotes #-}
{-
A very simple worker to test Database.PGMQ.Worker.
-}
module Main
where
import Control.Applicative ((<|>))
import Control.Concurrent (forkIO, threadDelay)
import Control.Exception (Exception, throwIO)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
import Database.PGMQ.Worker qualified as PGMQW
data SimpleException = SimpleException String
deriving Show
instance Exception SimpleException
data Message =
Ping
| Echo String
| Wait Int
| Error String
deriving (Eq, Show)
instance ToJSON Message where
toJSON Ping = toJSON ("ping" :: String)
toJSON (Echo s) = toJSON $ object [ ("echo" .= s) ]
toJSON (Wait s) = toJSON $ object [ ("wait" .= s) ]
toJSON (Error e) = toJSON $ object [ ("error" .= e) ]
instance FromJSON Message where
parseJSON v = parsePing v <|> parseEcho v <|> parseWait v <|> parseError v
where
parsePing = withText "Message (Ping)" $ \s -> do
case s of
"ping" -> pure Ping
s' -> fail $ T.unpack s'
parseEcho = withObject "Message (Echo)" $ \o -> do
s <- o .: "echo"
return $ Echo s
parseWait = withObject "Message (Wait)" $ \o -> do
s <- o .: "wait"
return $ Wait s
parseError = withObject "Message (Error)" $ \o -> do
e <- o .: "error"
return $ Error e
performAction :: PGMQW.State Message -> PGMQW.JobMessage Message -> IO ()
performAction s (PGMQ.Message { message = job }) = do
case PGMQW.getJob job of
Ping -> putStrLn $ PGMQW.formatStr s "ping"
Echo str -> putStrLn $ PGMQW.formatStr s ("echo " <> str)
Wait int -> do
putStrLn $ PGMQW.formatStr s ( "waiting " <> show int <> " seconds")
threadDelay (int*second)
Error err -> do
putStrLn $ PGMQW.formatStr s ("error " <> err)
throwIO $ SimpleException $ "Error " <> err
second :: Int
second = 1000000
main :: IO ()
main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
let queue = "simple_worker"
-- let workersLst = [1, 2, 3, 4] :: [Int]
let workersLst = [1, 2] :: [Int]
-- let tasksLst = [101, 102, 102, 103, 104, 105, 106, 107] :: [Int]
let tasksLst = [101] :: [Int]
-- let tasksLst = [] :: [Int]
mapM_ (\idx -> do
let state = PGMQW.newState connInfo ("worker " <> show idx) performAction queue
forkIO $ PGMQW.run state
) workersLst
-- delay so that the worker can initialize and settle
threadDelay second
conn <- PSQL.connect connInfo
-- SendJob wrapper
let mkJob msg = PGMQW.mkDefaultSendJob conn queue msg
mapM_ (\idx -> do
PGMQW.sendJob $ mkJob $ Ping
PGMQW.sendJob $ mkJob $ Wait 1
PGMQW.sendJob $ mkJob $ Echo $ "hello " <> show idx
PGMQW.sendJob $ mkJob $ Error $ "error " <> show idx
) tasksLst
-- a job that will timeout
let timedOut = (mkJob (Wait 5)) { PGMQW.vt = Just 1
, PGMQW.toStrat = PGMQW.TSRepeatNElseArchive 3 }
PGMQW.sendJob timedOut
threadDelay (10*second)
metrics <- PGMQ.getMetrics conn queue
putStrLn $ "metrics: " <> show metrics
return ()
...@@ -65,8 +65,6 @@ library ...@@ -65,8 +65,6 @@ library
exposed-modules: Database.PGMQ exposed-modules: Database.PGMQ
, Database.PGMQ.Simple , Database.PGMQ.Simple
, Database.PGMQ.Types , Database.PGMQ.Types
, Database.PGMQ.Worker
, Database.PGMQ.Worker.Types
-- Modules included in this library but not exported. -- Modules included in this library but not exported.
-- other-modules: -- other-modules:
...@@ -77,7 +75,7 @@ library ...@@ -77,7 +75,7 @@ library
-- Other library packages from which modules are imported. -- Other library packages from which modules are imported.
build-depends: base ^>=4.17.2.0 build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3 , aeson >= 1.4 && < 2.3
, mtl >= 2.2 && < 2.4 , bytestring >= 0.11 && < 0.13
, postgresql-simple >= 0.6 && < 0.8 , postgresql-simple >= 0.6 && < 0.8
, safe >= 0.3 && < 0.4 , safe >= 0.3 && < 0.4
, text >= 1.2 && < 2.2 , text >= 1.2 && < 2.2
...@@ -122,6 +120,8 @@ executable simple-exception-catch ...@@ -122,6 +120,8 @@ executable simple-exception-catch
OverloadedStrings OverloadedStrings
RecordWildCards RecordWildCards
ghc-options: -threaded
executable simple-test executable simple-test
-- Import common warning flags. -- Import common warning flags.
import: warnings import: warnings
...@@ -144,75 +144,28 @@ executable simple-test ...@@ -144,75 +144,28 @@ executable simple-test
NamedFieldPuns NamedFieldPuns
OverloadedStrings OverloadedStrings
RecordWildCards RecordWildCards
executable simple-worker
-- Import common warning flags.
import: warnings
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, mtl >= 2.2 && < 2.4
, postgresql-simple >= 0.6 && < 0.8
, text >= 1.2 && < 2.2
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: bin/simple-worker
main-is: Main.hs
-- Base language which the package is written in. ghc-options: -threaded
default-language: Haskell2010
default-extensions: test-suite test-integration
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
test-suite test-pgmq-unit
-- Import common warning flags. -- Import common warning flags.
import: warnings import: warnings
type: exitcode-stdio-1.0 type: exitcode-stdio-1.0
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, tasty >= 1.5 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, tasty-quickcheck >= 0.10 && < 0.12
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: tests
main-is: unit-tests.hs
-- Base language which the package is written in. other-modules: Test.Integration.PGMQ.Simple
default-language: Haskell2010 , Test.Integration.Utils
default-extensions:
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
test-suite test-pgmq-integration
-- Import common warning flags.
import: warnings
type: exitcode-stdio-1.0
build-depends: base ^>=4.17.2.0 build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3 , aeson >= 1.4 && < 2.3
, containers >= 0.6 && < 0.8
, hspec >= 2.11 && < 3 , hspec >= 2.11 && < 3
, postgresql-simple >= 0.6 && < 0.8 , postgresql-simple >= 0.6 && < 0.8
, random-strings == 0.1.1.0
, stm >= 2.5.3 && < 3
, tasty >= 1.5 && < 1.6 , tasty >= 1.5 && < 1.6
, tasty-hspec >= 1.2.0 && < 2 , tasty-hspec >= 1.2.0 && < 2
, text >= 1.2 && < 2.2
, haskell-pgmq , haskell-pgmq
...@@ -230,3 +183,5 @@ test-suite test-pgmq-integration ...@@ -230,3 +183,5 @@ test-suite test-pgmq-integration
NamedFieldPuns NamedFieldPuns
OverloadedStrings OverloadedStrings
RecordWildCards RecordWildCards
ghc-options: -threaded
...@@ -10,13 +10,9 @@ Portability : POSIX ...@@ -10,13 +10,9 @@ Portability : POSIX
module Database.PGMQ module Database.PGMQ
( module Database.PGMQ.Simple ( module Database.PGMQ.Simple
, module Database.PGMQ.Types , module Database.PGMQ.Types )
, module Database.PGMQ.Worker
, module Database.PGMQ.Worker.Types )
where where
import Database.PGMQ.Simple import Database.PGMQ.Simple
import Database.PGMQ.Types import Database.PGMQ.Types
import Database.PGMQ.Worker
import Database.PGMQ.Worker.Types
...@@ -45,14 +45,24 @@ import Database.PostgreSQL.Simple qualified as PSQL ...@@ -45,14 +45,24 @@ import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.Newtypes qualified as PSQL (Aeson(..)) import Database.PostgreSQL.Simple.Newtypes qualified as PSQL (Aeson(..))
import Database.PostgreSQL.Simple.SqlQQ (sql) import Database.PostgreSQL.Simple.SqlQQ (sql)
import Database.PostgreSQL.Simple.Types qualified as PSQL (PGArray(..)) import Database.PostgreSQL.Simple.Types qualified as PSQL (PGArray(..))
import Database.PGMQ.Types (Delay, MaxPollSeconds, Message, MessageCount, MessageId, Metrics, MessageClass, PollIntervalMs, Queue, QueueInfo, VisibilityTimeout) import Database.PGMQ.Types (Delay, MaxPollSeconds, Message, MessageCount, MessageId, Metrics, SerializableMessage, PollIntervalMs, Queue, QueueInfo, VisibilityTimeout)
import Safe (headMay) import Safe (headMay)
{-| Initialize PGMQ given a PostgreSQL connection. Mainly concerned {-| Initialize PGMQ given a PostgreSQL connection. Mainly concerned
with creating the 'pgmq' extension. -} with creating the 'pgmq' extension. -}
initialize :: PSQL.Connection -> IO () initialize :: PSQL.Connection -> IO ()
initialize conn = void $ PSQL.execute_ conn [sql| CREATE EXTENSION IF NOT EXISTS pgmq |] initialize conn =
-- OK so this is a bit tricky because of the usage of IF NOT EXISTS:
-- https://stackoverflow.com/questions/29900845/create-schema-if-not-exists-raises-duplicate-key-error
-- PostgreSQL will complain badly if 'initialize' is called
-- from multiple threads at once.
-- Hence, we use 'pg_advisory_xact_lock' to lock ourselves
-- out of this situation.
PSQL.withTransaction conn $ do
let magicLockId = 1122334455 :: Int
_ <- PSQL.query conn [sql| SELECT pg_advisory_xact_lock(?) |] (PSQL.Only magicLockId) :: IO [PSQL.Only ()]
void $ PSQL.execute_ conn [sql| CREATE EXTENSION IF NOT EXISTS pgmq |]
{-| Archives message in given queue for given id {-| Archives message in given queue for given id
...@@ -142,7 +152,7 @@ listQueues conn = ...@@ -142,7 +152,7 @@ listQueues conn =
-- | Read a message and immediately delete it from the queue. Returns `None` if the queue is empty. -- | Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
-- https://tembo.io/pgmq/api/sql/functions/#pop -- https://tembo.io/pgmq/api/sql/functions/#pop
popMessage :: (MessageClass a) popMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> IO (Maybe (Message a)) => PSQL.Connection -> Queue -> IO (Maybe (Message a))
popMessage conn queue = do popMessage conn queue = do
PSQL.query conn [sql| SELECT * FROM pgmq.pop(?) |] (PSQL.Only queue) >>= return . headMay PSQL.query conn [sql| SELECT * FROM pgmq.pop(?) |] (PSQL.Only queue) >>= return . headMay
...@@ -155,23 +165,26 @@ purgeQueue conn queue = do ...@@ -155,23 +165,26 @@ purgeQueue conn queue = do
-- | Read a message from given queue, with given visibility timeout (in seconds) -- | Read a message from given queue, with given visibility timeout (in seconds)
-- https://tembo.io/pgmq/api/sql/functions/#read -- https://tembo.io/pgmq/api/sql/functions/#read
readMessage :: (MessageClass a) readMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> VisibilityTimeout -> IO (Maybe (Message a)) => PSQL.Connection -> Queue -> VisibilityTimeout -> IO (Maybe (Message a))
readMessage conn queue vt = readMessage conn queue vt =
readMessages conn queue vt 1 >>= return . headMay readMessages conn queue vt 1 >>= return . headMay
-- | Reads given number of messages from given queue {-| Reads given number of messages from given queue
-- https://tembo.io/pgmq/api/sql/functions/#read
readMessages :: (MessageClass a) https://tembo.io/pgmq/api/sql/functions/#read -}
readMessages :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> VisibilityTimeout -> MessageCount -> IO [Message a] => PSQL.Connection -> Queue -> VisibilityTimeout -> MessageCount -> IO [Message a]
readMessages conn queue vt count = readMessages conn queue vt count =
PSQL.query conn [sql| SELECT * FROM pgmq.read(?, ?, ?) |] (queue, vt, count) PSQL.query conn [sql| SELECT * FROM pgmq.read(?, ?, ?) |] (queue, vt, count)
-- | Reads a single message, polling for given duration if the queue {-| Reads a single message, polling for given duration if the queue
-- is empty. is empty.
-- NOTE This is a blocking operation.
-- https://tembo.io/pgmq/api/sql/functions/#read_with_poll NOTE This is a blocking operation.
readMessageWithPoll :: (MessageClass a)
https://tembo.io/pgmq/api/sql/functions/#read_with_poll -}
readMessageWithPoll :: (SerializableMessage a)
=> PSQL.Connection => PSQL.Connection
-> Queue -> Queue
-> VisibilityTimeout -> VisibilityTimeout
...@@ -185,7 +198,7 @@ readMessageWithPoll conn queue vt maxPollSeconds pollIntervalMs = ...@@ -185,7 +198,7 @@ readMessageWithPoll conn queue vt maxPollSeconds pollIntervalMs =
-- queue is empty. -- queue is empty.
-- NOTE This is a blocking operation. -- NOTE This is a blocking operation.
-- https://tembo.io/pgmq/api/sql/functions/#read_with_poll -- https://tembo.io/pgmq/api/sql/functions/#read_with_poll
readMessagesWithPoll :: (MessageClass a) readMessagesWithPoll :: (SerializableMessage a)
=> PSQL.Connection => PSQL.Connection
-> Queue -> Queue
-> VisibilityTimeout -> VisibilityTimeout
...@@ -199,14 +212,14 @@ readMessagesWithPoll conn queue vt count maxPollSeconds pollIntervalMs = ...@@ -199,14 +212,14 @@ readMessagesWithPoll conn queue vt count maxPollSeconds pollIntervalMs =
-- | Sends one message to a queue -- | Sends one message to a queue
-- https://tembo.io/pgmq/api/sql/functions/#send -- https://tembo.io/pgmq/api/sql/functions/#send
sendMessage :: (MessageClass a) sendMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> a -> Delay -> IO () => PSQL.Connection -> Queue -> a -> Delay -> IO ()
sendMessage conn queue msg delay = sendMessage conn queue msg delay =
void (PSQL.query conn [sql| SELECT pgmq.send(?, ?::jsonb, ?) |] (queue, PSQL.Aeson msg, delay) :: IO [PSQL.Only Int]) void (PSQL.query conn [sql| SELECT pgmq.send(?, ?::jsonb, ?) |] (queue, PSQL.Aeson msg, delay) :: IO [PSQL.Only Int])
-- | Sends a batch of messages -- | Sends a batch of messages
-- https://tembo.io/pgmq/api/sql/functions/#send_batch -- https://tembo.io/pgmq/api/sql/functions/#send_batch
sendMessages :: (MessageClass a) sendMessages :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> [a] -> Delay -> IO () => PSQL.Connection -> Queue -> [a] -> Delay -> IO ()
sendMessages conn queue msgs delay = sendMessages conn queue msgs delay =
void (PSQL.query conn [sql| SELECT pgmq.send_batch(?, ?::jsonb[], ?) |] (queue, PSQL.PGArray (PSQL.Aeson <$> msgs), delay) :: IO [PSQL.Only Int]) void (PSQL.query conn [sql| SELECT pgmq.send_batch(?, ?::jsonb[], ?) |] (queue, PSQL.PGArray (PSQL.Aeson <$> msgs), delay) :: IO [PSQL.Only Int])
......
...@@ -9,6 +9,8 @@ Portability : POSIX ...@@ -9,6 +9,8 @@ Portability : POSIX
-} -}
{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Database.PGMQ.Types module Database.PGMQ.Types
where where
...@@ -48,10 +50,10 @@ type PollIntervalMs = Int ...@@ -48,10 +50,10 @@ type PollIntervalMs = Int
-- | Basic message typeclass for PGMQ: it has to be jsonb-serializable -- | Basic message typeclass for PGMQ: it has to be jsonb-serializable
type MessageClass a = ( FromJSON a type SerializableMessage a = ( FromJSON a
, ToJSON a , ToJSON a
-- NOTE This shouldn't be necessary -- NOTE This shouldn't be necessary
, Typeable a ) , Typeable a )
-- | Message, as returned by the 'pgmq.read' function -- | Message, as returned by the 'pgmq.read' function
data Message a = data Message a =
...@@ -60,6 +62,7 @@ data Message a = ...@@ -60,6 +62,7 @@ data Message a =
, enqueuedAt :: ZonedTime , enqueuedAt :: ZonedTime
, vt :: ZonedTime , vt :: ZonedTime
, message :: a } , message :: a }
deriving (Show)
-- NOTE I'm not sure if this is needed -- NOTE I'm not sure if this is needed
instance Eq a => Eq (Message a) where instance Eq a => Eq (Message a) where
(==) msg1 msg2 = (==) msg1 msg2 =
...@@ -67,7 +70,7 @@ instance Eq a => Eq (Message a) where ...@@ -67,7 +70,7 @@ instance Eq a => Eq (Message a) where
(readCt msg1) == (readCt msg2) && (readCt msg1) == (readCt msg2) &&
(zonedTimeToUTC $ vt msg1) == (zonedTimeToUTC $ vt msg2) && (zonedTimeToUTC $ vt msg1) == (zonedTimeToUTC $ vt msg2) &&
(message msg1) == (message msg2) (message msg1) == (message msg2)
instance MessageClass a => PSQL.FromRow (Message a) where instance SerializableMessage a => PSQL.FromRow (Message a) where
fromRow = do fromRow = do
msgId <- PSQL.field msgId <- PSQL.field
readCt <- PSQL.field readCt <- PSQL.field
......
{-|
Module : Database.PGMQ.Worker
Description : PGMQ async worker implementation
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
NOTE: Although this module depends on 'Database.PGMQ.Simple', it does
so only in a small way. In fact we could have a different mechanism
for sending and reading tasks, as long as it can handle JSON
serialization of our jobs with respective metadata.
-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.PGMQ.Worker
( -- | reexports from Types
ArchiveStrategy(..)
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMessage
, Job(..)
, getJob
, State
-- | worker functions
, newState
, run
, run'
, formatStr
, SendJob(..)
, mkDefaultSendJob
, sendJob )
where
import Control.Exception (SomeException, catch, fromException, throwIO)
import Control.Monad (forever)
import Data.Maybe (fromMaybe)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
import Database.PGMQ.Worker.Types
import System.Timeout (timeout)
-- | Helper function to define worker 'State'
newState :: (PGMQ.MessageClass a)
=> PSQL.ConnectInfo
-> String
-> PerformAction a
-> PGMQ.Queue
-> State a
newState connectInfo name performAction queue = State { .. }
where
visibilityTimeout = 10
maxPollSeconds = 5
pollIntervalMs = 100
-- | Main function to start the worker
run :: (PGMQ.MessageClass a)
=> State a -> IO ()
run state = do
conn <- PSQL.connect $ connectInfo state
run' state conn
-- | Main function to start the worker (given a 'PSQL.Connection')
run' :: forall a. (PGMQ.MessageClass a)
=> State a -> PSQL.Connection -> IO ()
run' state@(State { visibilityTimeout = stateVisibilityTimeout, .. }) conn = do
PGMQ.initialize conn
PGMQ.createQueue conn queue
-- The whole worker is just an infinite loop
forever loop
where
-- | Main looping function (describes a single loop iteration)
loop :: IO ()
loop = do
-- We catch any errors that could happen so that the worker runs
-- safely
catch (do
-- Read a message from queue and handle it.
PGMQ.readMessageWithPoll conn queue stateVisibilityTimeout maxPollSeconds pollIntervalMs >>=
handleMessage) handleLoopError
-- | Error handling function for the whole loop
handleLoopError :: SomeException -> IO ()
handleLoopError err = do
case fromException err of
-- TODO It can happen that the queue contains
-- ill-formatted messages. I don't yet know how to
-- handle this case - one would have to obtain message
-- id to properly delete that using pgmq.
Just (PSQL.ConversionFailed {}) -> do
putStrLn $ formatStr state $ show err
Just _ -> do
putStrLn $ formatStr state $ show err
Nothing -> do
putStrLn $ formatStr state $ "Exception: " <> show err
-- | Main job handling function. The message could not have
-- | arrived before read poll ended, hence the 'Maybe JobMessage'
-- | type.
handleMessage :: Maybe (JobMessage a) -> IO ()
handleMessage Nothing = return ()
handleMessage (Just msg@PGMQ.Message { message = Job { metadata = JobMetadata { .. } }, msgId }) = do
putStrLn $ formatStr state $ "handling message " <> show msgId <> " (AS: " <> show archiveStrategy <> ", ES: " <> show errorStrategy <> ")"
-- Immediately set visibility timeout of a job, when it's
-- specified in job's metadata (stateVisibilityTimeout is a
-- global property of the worker and might not reflect job's
-- timeout specs)
case visibilityTimeout of
Nothing -> pure ()
Just vt -> PGMQ.setMessageVt conn queue msgId vt
let vt = fromMaybe stateVisibilityTimeout visibilityTimeout
let archiveHandler = do
case archiveStrategy of
ASDelete -> do
-- putStrLn $ formatStr state $ "deleting completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
PGMQ.deleteMessage conn queue msgId
ASArchive -> do
-- putStrLn $ formatStr state $ "archiving completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
PGMQ.archiveMessage conn queue msgId
-- Handle errors of 'performAction'.
let errorHandler :: SomeException -> IO ()
errorHandler err = do
case fromException err of
Just (JobTimeout { messageId }) -> do
let PGMQ.Message { readCt } = msg
putStrLn $ formatStr state $ "timeout occured: `" <> show timeoutStrategy <> " (readCt: " <> show readCt <> ", messageId = " <> show messageId <> ")"
case timeoutStrategy of
TSDelete -> PGMQ.deleteMessage conn queue messageId
TSArchive -> PGMQ.archiveMessage conn queue messageId
TSRepeat -> pure ()
TSRepeatNElseArchive n -> do
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
if readCt > n then
PGMQ.archiveMessage conn queue messageId
else
pure ()
TSRepeatNElseDelete n -> do
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
if readCt > n then
PGMQ.deleteMessage conn queue messageId
else
pure ()
_ -> do
putStrLn $ formatStr state $ "Error occured: `" <> show err
case errorStrategy of
ESDelete -> PGMQ.deleteMessage conn queue msgId
ESArchive -> PGMQ.deleteMessage conn queue msgId
ESRepeat -> return ()
(do
mTimeout <- timeout (vt * microsecond) (performAction state msg)
case mTimeout of
Just _ -> archiveHandler
Nothing -> throwIO $ JobTimeout { messageId = msgId, vt = vt }
) `catch` errorHandler
-- | Helper function to format a string with worker name (for logging)
formatStr :: State a -> String -> String
formatStr (State { name }) msg =
"[" <> name <> "] " <> msg
microsecond :: Int
microsecond = 10^6
-- | Wraps parameters for the 'sendJob' function
data SendJob a =
SendJob { conn :: PSQL.Connection
, queue :: PGMQ.Queue
, msg :: a
, delay :: PGMQ.Delay
, archStrat :: ArchiveStrategy
, errStrat :: ErrorStrategy
, toStrat :: TimeoutStrategy
, vt :: Maybe PGMQ.VisibilityTimeout }
-- | Create a 'SendJob' data with some defaults
mkDefaultSendJob :: PSQL.Connection
-> PGMQ.Queue
-> a
-> SendJob a
mkDefaultSendJob conn queue msg =
SendJob { conn
, queue
, msg
, delay = 0
-- | remove finished jobs
, archStrat = ASDelete
-- | archive errored jobs (for inspection later)
, errStrat = ESArchive
-- | repeat timed out jobs
, toStrat = TSRepeat
, vt = Nothing }
-- | Send given message as a worker job to pgmq. This wraps
-- | 'PGMQ.sendMessage' with worker job metadata.
sendJob :: (PGMQ.MessageClass a)
=> SendJob a -> IO ()
sendJob (SendJob { .. }) = do
let metadata = JobMetadata { archiveStrategy = archStrat
, errorStrategy = errStrat
, timeoutStrategy = toStrat
, visibilityTimeout = vt }
PGMQ.sendMessage conn queue (Job { job = msg
, metadata = metadata }) delay
{-|
Module : Database.PGMQ.Worker.Types
Description : Types for the async worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE RankNTypes #-}
module Database.PGMQ.Worker.Types
( ArchiveStrategy(..)
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMetadata(..)
, Job(..)
, getJob
, JobMessage
, State(..)
, PerformAction
, JobTimeout(..) )
where
import Control.Applicative ((<|>))
import Control.Exception (Exception)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
import Database.PGMQ.Types qualified as PGMQ
import Database.PostgreSQL.Simple qualified as PSQL
-- | Strategy for archiving jobs
data ArchiveStrategy =
-- | Delete message when it's done
ASDelete
-- | Archive message when it's done
| ASArchive
deriving (Eq, Show)
instance ToJSON ArchiveStrategy where
toJSON ASDelete = toJSON ("ASDelete" :: String)
toJSON ASArchive = toJSON ("ASArchive" :: String)
instance FromJSON ArchiveStrategy where
parseJSON = withText "ArchiveStrategy" $ \s -> do
case s of
"ASDelete" -> pure ASDelete
"ASArchive" -> pure ASArchive
s' -> fail $ T.unpack s'
-- | Strategy of handling jobs with errors
data ErrorStrategy =
-- | Delete job when it threw an error
ESDelete
-- | Archive job when it threw an error
| ESArchive
-- | Try to repeat the job when an error ocurred
| ESRepeat
deriving (Eq, Show)
instance ToJSON ErrorStrategy where
toJSON ESDelete = toJSON ("ESDelete" :: String)
toJSON ESArchive = toJSON ("ESArchive" :: String)
toJSON ESRepeat = toJSON ("ESRepeat" :: String)
instance FromJSON ErrorStrategy where
parseJSON = withText "ErrorStrategy" $ \s -> do
case s of
"ESDelete" -> pure ESDelete
"ESArchive" -> pure ESArchive
"ESRepeat" -> pure ESRepeat
s' -> fail $ T.unpack s'
-- | Strategy for handling timeouts
data TimeoutStrategy =
-- | Delete job when it timed out
TSDelete
-- | Archive job when it timed out
| TSArchive
-- | Repeat job when it timed out (inifinitely)
| TSRepeat
-- | Repeat job when it timed out (but only maximum number of times), otherwise archive it
| TSRepeatNElseArchive Int
-- | Repeat job when it timed out (but only maximum number of times), otherwise delete it
| TSRepeatNElseDelete Int
deriving (Eq, Show)
instance ToJSON TimeoutStrategy where
toJSON TSDelete = toJSON ("TSDelete" :: String)
toJSON TSArchive = toJSON ("TSArchive" :: String)
toJSON TSRepeat = toJSON ("TSRepeat" :: String)
toJSON (TSRepeatNElseArchive n) = toJSON $ object [ ("TSRepeatNElseArchive" .= n) ]
toJSON (TSRepeatNElseDelete n) = toJSON $ object [ ("TSRepeatNElseDelete" .= n) ]
instance FromJSON TimeoutStrategy where
parseJSON v = parseText v
<|> parseTSRepeatNElseArchive v
<|> parseTSRepeatNElseDelete v
where
-- | Parser for textual formats
parseText = withText "TimeoutStrategy (text)" $ \s -> do
case s of
"TSDelete" -> pure TSDelete
"TSArchive" -> pure TSArchive
"TSRepeat" -> pure TSRepeat
s' -> fail $ T.unpack s'
-- | Parser for 'TSRepeatN' object
parseTSRepeatNElseArchive = withObject "TimeoutStrategy (TSRepeatNElseArchive)" $ \o -> do
n <- o .: "TSRepeatNElseArchive"
pure $ TSRepeatNElseArchive n
parseTSRepeatNElseDelete = withObject "TimeoutStrategy (TSRepeatNElseDelete)" $ \o -> do
n <- o .: "TSRepeatNElseDelete"
pure $ TSRepeatNElseDelete n
-- | Job metadata
data JobMetadata =
JobMetadata { archiveStrategy :: ArchiveStrategy
, errorStrategy :: ErrorStrategy
, timeoutStrategy :: TimeoutStrategy
-- | If not-empty, sets a custom visibility timeout for
-- | a job. Otherwise, worker's State.visibilityTimeout
-- | will be used.
, visibilityTimeout :: Maybe PGMQ.VisibilityTimeout }
deriving (Eq, Show)
instance ToJSON JobMetadata where
toJSON (JobMetadata { .. }) =
toJSON $ object [
( "astrat" .= archiveStrategy )
, ( "estrat" .= errorStrategy )
, ( "tstrat" .= timeoutStrategy )
, ( "vt" .= visibilityTimeout )
]
instance FromJSON JobMetadata where
parseJSON = withObject "JobMetadata" $ \o -> do
archiveStrategy <- o .: "astrat"
errorStrategy <- o .: "estrat"
timeoutStrategy <- o .: "tstrat"
visibilityTimeout <- o .: "vt"
return $ JobMetadata { .. }
-- | Worker has specific message type, because each message carries
-- | around some metadata for the worker itself
data Job a =
Job { job :: a
, metadata :: JobMetadata }
deriving (Eq, Show)
getJob :: Job a -> a
getJob (Job { job }) = job
instance ToJSON a => ToJSON (Job a) where
toJSON (Job { .. }) =
toJSON $ object [
("metadata" .= metadata)
, ("job" .= job)
]
instance FromJSON a => FromJSON (Job a) where
parseJSON = withObject "Job" $ \o -> do
metadata <- o .: "metadata"
job <- o .: "job"
return $ Job { .. }
-- | The worker job, as it is serialized in a PGMQ table
type JobMessage a = PGMQ.Message (Job a)
-- | Main state for a running worker
data State a =
State { connectInfo :: PSQL.ConnectInfo
-- custom name for this worker
, name :: String
, performAction :: PerformAction a
, queue :: PGMQ.Queue
-- | Time in seconds that the message become invisible after reading.
, visibilityTimeout :: PGMQ.VisibilityTimeout
-- | Time in seconds to wait for new messages to reach the queue. Defaults to 5.
, maxPollSeconds :: PGMQ.Delay
-- | Milliseconds between the internal poll operations. Defaults to 100.
, pollIntervalMs :: PGMQ.PollIntervalMs }
-- | Callback definition (what to execute when a message arrives)
type PerformAction a = State a -> JobMessage a -> IO ()
-- | Thrown when job times out
data JobTimeout =
JobTimeout { messageId :: Int
, vt :: PGMQ.VisibilityTimeout }
deriving Show
instance Exception JobTimeout
module Test.Integration.PGMQ.Simple
( pgmqSimpleTests )
where
import Control.Exception (bracket)
import Data.Maybe (isJust)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, randomQueueName)
data TestEnv =
TestEnv {
conn :: PSQL.Connection
, queue :: PGMQ.Queue
}
-- NOTE These tests expect a local pgmq server runnign on port 5432.
testQueuePrefix :: PGMQ.Queue
testQueuePrefix = "test_pgmq"
setUpConn :: IO TestEnv
setUpConn = do
connInfo <- getPSQLEnvConnectInfo
conn <- PSQL.connect connInfo
queue <- randomQueueName testQueuePrefix
return $ TestEnv { conn, queue }
dropConn :: TestEnv -> IO ()
dropConn (TestEnv { conn }) = do
PSQL.close conn
withConn :: (TestEnv -> IO ()) -> IO ()
withConn = bracket setUpConn dropConn
withPGMQ :: (TestEnv -> IO ()) -> IO ()
withPGMQ f = withConn $ \testEnv -> bracket (setUpPGMQ testEnv) (tearDownPGMQ testEnv) (\_ -> f testEnv)
where
setUpPGMQ (TestEnv { conn, queue }) = do
PGMQ.initialize conn
PGMQ.dropQueue conn queue
PGMQ.createQueue conn queue
tearDownPGMQ (TestEnv { conn, queue }) _ = do
PGMQ.dropQueue conn queue
pgmqSimpleTests :: Spec
pgmqSimpleTests = parallel $ around withPGMQ $ describe "PGMQ Simple" $ do
-- it "can get metrics for non-existing queue" $ \(TestEnv { conn, queue }) -> do
-- -- first of all, this should also work for non-existing queues
-- metrics <- PGMQ.getMetrics conn queue
-- metrics `shouldSatisfy` isNothing
it "can get metrics for an empty queue" $ \(TestEnv { conn, queue }) -> do
metrics <- PGMQ.getMetrics conn queue
metrics `shouldSatisfy` isJust
(PGMQ.queueLength <$> metrics) `shouldBe` (Just 0)
it "listQueues properly returns our queue" $ \(TestEnv { conn, queue }) -> do
queues <- PGMQ.listQueues conn
((\(PGMQ.QueueInfo { queueName }) -> queueName) <$> queues) `shouldContain` [queue]
module Test.Integration.Utils
( getPSQLEnvConnectInfo
, randomQueueName )
where
import Data.Maybe (fromMaybe)
import Database.PGMQ.Types (Queue)
import Database.PostgreSQL.Simple qualified as PSQL
import System.Environment (lookupEnv)
import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum)
getPSQLEnvConnectInfo :: IO PSQL.ConnectInfo
getPSQLEnvConnectInfo = do
pgUser <- lookupEnv "POSTGRES_USER"
pgDb <- lookupEnv "POSTGRES_DB"
pgPass <- lookupEnv "POSTGRES_PASSWORD"
pgHost <- lookupEnv "POSTGRES_HOST"
-- https://hackage.haskell.org/package/postgresql-simple-0.7.0.0/docs/Database-PostgreSQL-Simple.html#t:ConnectInfo
pure $ PSQL.defaultConnectInfo { PSQL.connectUser = fromMaybe "postgres" pgUser
, PSQL.connectDatabase = fromMaybe "postgres" pgDb
, PSQL.connectHost = fromMaybe "localhost" pgHost
, PSQL.connectPassword = fromMaybe "postgres" pgPass }
randomQueueName :: Queue -> IO Queue
randomQueueName prefix = do
postfix <- randomString (onlyAlphaNum randomASCII) 10
return $ prefix <> "_" <> postfix
{-# OPTIONS_GHC -Wno-orphans -Wno-missing-signatures #-}
module Main where module Main where
import Control.Exception (bracket) import Test.Integration.PGMQ.Simple (pgmqSimpleTests)
import Data.Maybe (isJust, isNothing)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Worker.Types qualified as PGMQW
import Database.PGMQ.Types qualified as PGMQ
import Test.Hspec
import Test.Tasty import Test.Tasty
import Test.Tasty.Hspec import Test.Tasty.Hspec
...@@ -16,53 +8,10 @@ import Test.Tasty.Hspec ...@@ -16,53 +8,10 @@ import Test.Tasty.Hspec
main :: IO () main :: IO ()
main = do main = do
simpleSpec <- testSpec "simpleTests" simpleTests pgmqSimpleSpec <- testSpec "pgmqSimpleTests" pgmqSimpleTests
defaultMain $ testGroup "integration tests" defaultMain $ testGroup "integration tests"
[ simpleSpec ] [
pgmqSimpleSpec
]
data TestEnv =
TestEnv {
conn :: PSQL.Connection
, queue :: String
}
-- NOTE These tests expect a local pgmq server runnign on port 5432.
testQueue :: String
testQueue = "test"
setUpConn :: IO TestEnv
setUpConn = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
conn <- PSQL.connect connInfo
return $ TestEnv { conn, queue = testQueue }
dropConn :: TestEnv -> IO ()
dropConn (TestEnv { conn }) = do
PSQL.close conn
withConn :: (TestEnv -> IO ()) -> IO ()
withConn = bracket setUpConn dropConn
withPGMQ :: (TestEnv -> IO ()) -> IO ()
withPGMQ f = withConn $ \testEnv -> bracket (setUpPGMQ testEnv) (\_ -> pure ()) (\_ -> f testEnv)
where
setUpPGMQ (TestEnv { conn, queue }) = do
PGMQ.initialize conn
PGMQ.dropQueue conn queue
simpleTests :: Spec
simpleTests = sequential $ aroundAll withPGMQ $ describe "Database" $ do
it "can get metrics for non-existing queue" $ \(TestEnv { conn, queue }) -> do
-- first of all, this should also work for non-existing queues
metrics <- PGMQ.getMetrics conn queue
metrics `shouldSatisfy` isNothing
-- let (Just (PGMQ.Metrics { .. })) = metrics
-- queueName `shouldBe` queue
-- totalMessages `shouldBe` 0
{-# OPTIONS_GHC -Wno-orphans -Wno-missing-signatures #-}
module Main where
import Data.Aeson qualified as Aeson
import Database.PGMQ.Worker.Types qualified as PGMQW
import Test.Tasty
import Test.Tasty.QuickCheck as QC
import Test.Tasty.HUnit
main = defaultMain tests
tests :: TestTree
tests = testGroup "Tests" [propertyTests, unitTests]
propertyTests = testGroup "Property tests" [aesonPropTests]
aesonPropTests = testGroup "Aeson (de-)serialization property tests" $
[ aesonPropJobMetadataTests
, aesonPropJobTests ]
instance QC.Arbitrary PGMQW.ArchiveStrategy where
arbitrary = QC.elements [ PGMQW.ASDelete, PGMQW.ASArchive ]
instance QC.Arbitrary PGMQW.ErrorStrategy where
arbitrary = QC.elements [ PGMQW.ESDelete, PGMQW.ESArchive, PGMQW.ESRepeat ]
instance QC.Arbitrary PGMQW.TimeoutStrategy where
arbitrary = do
n <- arbitrary
m <- arbitrary
QC.elements [ PGMQW.TSDelete
, PGMQW.TSArchive
, PGMQW.TSRepeat
, PGMQW.TSRepeatNElseArchive n
, PGMQW.TSRepeatNElseDelete m ]
instance QC.Arbitrary PGMQW.JobMetadata where
arbitrary = do
archiveStrategy <- arbitrary
errorStrategy <- arbitrary
timeoutStrategy <- arbitrary
visibilityTimeout <- arbitrary
return $ PGMQW.JobMetadata { .. }
aesonPropJobMetadataTests = testGroup "Aeson PGMQW.JobMetadata (de-)serialization tests" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\jm ->
Aeson.decode (Aeson.encode (jm :: PGMQW.JobMetadata)) == Just jm
]
instance QC.Arbitrary a => QC.Arbitrary (PGMQW.Job a) where
arbitrary = PGMQW.Job <$> arbitrary <*> arbitrary
aesonPropJobTests = testGroup "Aeson PGMQW.Job (de-)serialization tests" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\j ->
Aeson.decode (Aeson.encode (j :: PGMQW.Job String)) == Just j
]
unitTests = testGroup "Unit tests" []
-- [ testCase "List comparison (different length)" $
-- [1, 2, 3] `compare` [1,2] @?= GT
-- -- the following test does not hold
-- , testCase "List comparison (same length)" $
-- [1, 2, 3] `compare` [1,2,2] @?= LT
-- ]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment