Some refactoring, docs fixes, worker ts strategies

parent 8b05f5cc
{-# LANGUAGE QuasiQuotes #-}
{-
Testing exception catch for PSQL.
-}
module Main
where
import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException)
import Control.Monad (void)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.SqlQQ (sql)
main :: IO ()
main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
conn <- PSQL.connect connInfo
void (PSQL.execute_ conn [sql| CREATE EXTENSION IF NOT EXISTS pgmq |])
let queue = "test" :: String
-- catch exception of deleting a queue that doesn't exist
catch (
void (PSQL.query conn [sql| SELECT pgmq.drop_queue(?) |] (PSQL.Only queue) :: IO [PSQL.Only Bool])
) (\e@(SomeException _) -> putStrLn $ "err: " <> show e)
return ()
......@@ -12,6 +12,7 @@ import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
main :: IO ()
main = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
......@@ -20,6 +21,9 @@ main = do
PGMQ.initialize conn
PGMQ.dropQueue conn "test"
metrics <- PGMQ.getMetrics conn "test"
let queue = "test"
PGMQ.createQueue conn "test"
PGMQ.sendMessage conn queue ("test message 1" :: String) 0
......
......@@ -101,7 +101,8 @@ main = do
) tasksLst
-- a job that will timeout
let timedOut = (mkJob (Wait 5)) { PGMQW.vt = Just 1 }
let timedOut = (mkJob (Wait 5)) { PGMQW.vt = Just 1
, PGMQW.toStrat = PGMQW.TSRepeatNElseArchive 3 }
PGMQW.sendJob timedOut
threadDelay (10*second)
......
......@@ -98,6 +98,29 @@ library
OverloadedStrings
RecordWildCards
executable simple-exception-catch
-- Import common warning flags.
import: warnings
build-depends: base ^>=4.17.2.0
, postgresql-simple >= 0.6 && < 0.8
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: bin/simple-exception-catch
main-is: Main.hs
-- Base language which the package is written in.
default-language: Haskell2010
default-extensions:
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
executable simple-test
-- Import common warning flags.
......@@ -177,3 +200,33 @@ test-suite test-pgmq-unit
NamedFieldPuns
OverloadedStrings
RecordWildCards
test-suite test-pgmq-integration
-- Import common warning flags.
import: warnings
type: exitcode-stdio-1.0
build-depends: base ^>=4.17.2.0
, aeson >= 1.4 && < 2.3
, hspec >= 2.11 && < 3
, postgresql-simple >= 0.6 && < 0.8
, tasty >= 1.5 && < 1.6
, tasty-hspec >= 1.2.0 && < 2
, haskell-pgmq
-- Directories containing source files.
hs-source-dirs: tests
main-is: integration-tests.hs
-- Base language which the package is written in.
default-language: Haskell2010
default-extensions:
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
{-|
Module : Database.PGMQ
Description : PGMQ haskell bindings
Description : PGMQ Haskell bindings
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
......@@ -9,10 +9,14 @@ Portability : POSIX
-}
module Database.PGMQ
( module Database.PGMQ.Simple
, module Database.PGMQ.Types
, module Database.PGMQ.Worker
, module Database.PGMQ.Worker.Types )
where
-- import Database.PostgreSQL.Simple qualified as PSQL
-- data PGMQOptions =
-- PGMQOptions { connectInfo :: PSQL.ConnectInfo }
import Database.PGMQ.Simple
import Database.PGMQ.Types
import Database.PGMQ.Worker
import Database.PGMQ.Worker.Types
{-|
Module : Database.PGMQ.Simple
Description : PGMQ basic Haskell bindings
Description : Basic functionality
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
......@@ -39,6 +39,7 @@ where
-- 'pgmq.send' functions accept only a ::jsonb type).
-- import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException)
import Control.Monad (void)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.Newtypes qualified as PSQL (Aeson(..))
......@@ -48,52 +49,84 @@ import Database.PGMQ.Types (Delay, MaxPollSeconds, Message, MessageCount, Messag
import Safe (headMay)
-- | Initialize PostgreSQL extension
{-| Initialize PGMQ given a PostgreSQL connection. Mainly concerned
with creating the 'pgmq' extension. -}
initialize :: PSQL.Connection -> IO ()
initialize conn = void $ PSQL.execute_ conn [sql| CREATE EXTENSION IF NOT EXISTS pgmq |]
-- | Archives message in given queue for given id
-- https://tembo.io/pgmq/api/sql/functions/#archive-single
{-| Archives message in given queue for given id
https://tembo.io/pgmq/api/sql/functions/#archive-single -}
archiveMessage :: PSQL.Connection -> Queue -> MessageId -> IO ()
archiveMessage conn queue msgId =
void (PSQL.query conn [sql| SELECT pgmq.archive(?, ?) |] (queue, msgId) :: IO [PSQL.Only Bool])
-- | Archives messages in given queue for given ids
-- https://tembo.io/pgmq/api/sql/functions/#archive-batch
{-| Archives messages in given queue for given ids
https://tembo.io/pgmq/api/sql/functions/#archive-batch -}
archiveMessages :: PSQL.Connection -> Queue -> [MessageId] -> IO ()
archiveMessages conn queue msgIds =
void (PSQL.query conn [sql| SELECT pgmq.archive(?, ?::bigint[]) |] (queue, PSQL.PGArray msgIds) :: IO [PSQL.Only Int])
-- | Creates a queue
-- https://tembo.io/pgmq/api/sql/functions/#create
{-| Creates a queue
https://tembo.io/pgmq/api/sql/functions/#create -}
createQueue :: PSQL.Connection -> Queue -> IO ()
createQueue conn queue =
void (PSQL.query conn [sql| SELECT pgmq.create(?) |] (PSQL.Only queue) :: IO [PSQL.Only ()])
-- | Deletes given message from given queue
-- https://tembo.io/pgmq/api/sql/functions/#delete-single
{-| Deletes given message from given queue
https://tembo.io/pgmq/api/sql/functions/#delete-single -}
deleteMessage :: PSQL.Connection -> Queue -> MessageId -> IO ()
deleteMessage conn queue msgId =
void (PSQL.query conn [sql| SELECT pgmq.delete(?, ?) |] (queue, msgId) :: IO [PSQL.Only Bool])
-- | Deletes given messages from given queue
-- https://tembo.io/pgmq/api/sql/functions/#delete-batch
{-| Deletes given messages from given queue
https://tembo.io/pgmq/api/sql/functions/#delete-batch -}
deleteMessages :: PSQL.Connection -> Queue -> [MessageId] -> IO ()
deleteMessages conn queue msgIds =
void (PSQL.query conn [sql| SELECT pgmq.delete(?, ?) |] (queue, PSQL.PGArray msgIds) :: IO [PSQL.Only Int])
-- | Deletes given queue
-- https://tembo.io/pgmq/api/sql/functions/#drop_queue
{-| Deletes given queue
https://tembo.io/pgmq/api/sql/functions/#drop_queue -}
dropQueue :: PSQL.Connection -> Queue -> IO ()
dropQueue conn queue =
void (PSQL.query conn [sql| SELECT pgmq.drop_queue(?) |] (PSQL.Only queue) :: IO [PSQL.Only Bool])
-- | Read metrics for a given queue
-- https://tembo.io/pgmq/api/sql/functions/#metrics
{-| Read metrics for a given queue
https://tembo.io/pgmq/api/sql/functions/#metrics -}
getMetrics :: PSQL.Connection -> Queue -> IO (Maybe Metrics)
getMetrics conn queue =
PSQL.query conn [sql| SELECT * FROM pgmq.metrics(?) |] (PSQL.Only queue) >>= return . headMay
-- catch
-- (PSQL.query conn [sql| SELECT * FROM pgmq.metrics(?) |] (PSQL.Only queue) >>= return . headMay)
-- handleError
-- where
-- -- support the case when a table does not exist
-- -- handleError :: Exception e => e -> IO (Maybe Metrics)
-- handleError err@(SomeException _) = do
-- putStrLn $ "Error: " <> show err
-- return Nothing
-- handleError :: SomeException -> IO (Maybe Metrics)
-- handleError err = do
-- putStrLn $ "Error: " <> show err
-- putStrLn "x"
-- return Nothing
-- case fromException err of
-- -- Just (PSQL.SomePostgreSqlException e) ->
-- -- case fromException (toException e) of
-- -- Just (PSQL.SqlError { sqlState = "42P01" }) -> return Nothing
-- -- -- re-raise other errors
-- -- _ -> throwIO err
-- Just (PSQL.SqlError { sqlState = "42P01" }) -> return Nothing
-- -- re-raise other errors
-- _ -> throwIO err
-- | Read metrics for all queues
-- https://tembo.io/pgmq/api/sql/functions/#metrics_all
......@@ -135,8 +168,8 @@ readMessages conn queue vt count =
PSQL.query conn [sql| SELECT * FROM pgmq.read(?, ?, ?) |] (queue, vt, count)
-- | Reads a single message, polling for given duration if the queue
-- | is empty.
-- | NOTE This is a blocking operation.
-- is empty.
-- NOTE This is a blocking operation.
-- https://tembo.io/pgmq/api/sql/functions/#read_with_poll
readMessageWithPoll :: (MessageClass a)
=> PSQL.Connection
......@@ -149,8 +182,8 @@ readMessageWithPoll conn queue vt maxPollSeconds pollIntervalMs =
readMessagesWithPoll conn queue vt 1 maxPollSeconds pollIntervalMs >>= return . headMay
-- | Reads given number of messages, polling for given duration if the
-- | queue is empty.
-- | NOTE This is a blocking operation.
-- queue is empty.
-- NOTE This is a blocking operation.
-- https://tembo.io/pgmq/api/sql/functions/#read_with_poll
readMessagesWithPoll :: (MessageClass a)
=> PSQL.Connection
......@@ -183,3 +216,15 @@ sendMessages conn queue msgs delay =
setMessageVt :: PSQL.Connection -> Queue -> MessageId -> VisibilityTimeout -> IO ()
setMessageVt conn queue msgId vt =
void (PSQL.query conn [sql| SELECT 1 FROM pgmq.set_vt(?, ?, ?) |] (queue, msgId, vt) :: IO [PSQL.Only Int])
{-|
A utility function: sometimes pgmq throws an error that a table (for
queue) doesn't exist and we want to ignore it as it's not critical in
the given function.
'42P01' means 'undefined_table' in postgres:
https://www.postgresql.org/docs/current/errcodes-appendix.html
-}
-- withNoDoesNotExistError :: IO a -> IO a
{-|
Module : Database.PGMQ.Types
Description : PGMQ haskell types
Description : PGMQ types
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
......
......@@ -20,6 +20,7 @@ module Database.PGMQ.Worker
( -- | reexports from Types
ArchiveStrategy(..)
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMessage
, Job(..)
, getJob
......@@ -128,14 +129,29 @@ run' state@(State { visibilityTimeout = stateVisibilityTimeout, .. }) conn = do
-- Handle errors of 'performAction'.
let errorHandler :: SomeException -> IO ()
errorHandler err = do
putStrLn $ formatStr state $ "Error ocurred: `" <> show err
case fromException err of
Just (JobTimeout { messageId }) -> do
let PGMQ.Message { readCt } = msg
putStrLn $ formatStr state $ "timeout occured: `" <> show timeoutStrategy <> " (readCt: " <> show readCt <> ", messageId = " <> show messageId <> ")"
case timeoutStrategy of
TSDelete -> PGMQ.deleteMessage conn queue messageId
TSArchive -> PGMQ.archiveMessage conn queue messageId
TSRepeat -> pure ()
TSRepeatNElseArchive n -> do
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
if readCt > n then
PGMQ.archiveMessage conn queue messageId
else
pure ()
TSRepeatNElseDelete n -> do
-- OK so this can be repeated at most 'n' times, compare 'readCt' with 'n'
if readCt > n then
PGMQ.deleteMessage conn queue messageId
else
pure ()
_ -> do
putStrLn $ formatStr state $ "Error occured: `" <> show err
case errorStrategy of
ESDelete -> PGMQ.deleteMessage conn queue msgId
ESArchive -> PGMQ.deleteMessage conn queue msgId
......
{-|
Module : Database.PGMQ.Worker.Types
Description : PGMQ async worker implementation
Description : Types for the async worker
Copyright : (c) Gargantext, 2024-Present
License : AGPL
Maintainer : gargantext@iscpif.fr
......@@ -23,6 +23,7 @@ module Database.PGMQ.Worker.Types
, JobTimeout(..) )
where
import Control.Applicative ((<|>))
import Control.Exception (Exception)
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
......@@ -74,20 +75,38 @@ data TimeoutStrategy =
TSDelete
-- | Archive job when it timed out
| TSArchive
-- | Repeat job when it timed out
-- | Repeat job when it timed out (inifinitely)
| TSRepeat
-- | Repeat job when it timed out (but only maximum number of times), otherwise archive it
| TSRepeatNElseArchive Int
-- | Repeat job when it timed out (but only maximum number of times), otherwise delete it
| TSRepeatNElseDelete Int
deriving (Eq, Show)
instance ToJSON TimeoutStrategy where
toJSON TSDelete = toJSON ("TSDelete" :: String)
toJSON TSArchive = toJSON ("TSArchive" :: String)
toJSON TSRepeat = toJSON ("TSRepeat" :: String)
toJSON (TSRepeatNElseArchive n) = toJSON $ object [ ("TSRepeatNElseArchive" .= n) ]
toJSON (TSRepeatNElseDelete n) = toJSON $ object [ ("TSRepeatNElseDelete" .= n) ]
instance FromJSON TimeoutStrategy where
parseJSON = withText "TimeoutStrategy" $ \s -> do
case s of
"TSDelete" -> pure TSDelete
"TSArchive" -> pure TSArchive
"TSRepeat" -> pure TSRepeat
s' -> fail $ T.unpack s'
parseJSON v = parseText v
<|> parseTSRepeatNElseArchive v
<|> parseTSRepeatNElseDelete v
where
-- | Parser for textual formats
parseText = withText "TimeoutStrategy (text)" $ \s -> do
case s of
"TSDelete" -> pure TSDelete
"TSArchive" -> pure TSArchive
"TSRepeat" -> pure TSRepeat
s' -> fail $ T.unpack s'
-- | Parser for 'TSRepeatN' object
parseTSRepeatNElseArchive = withObject "TimeoutStrategy (TSRepeatNElseArchive)" $ \o -> do
n <- o .: "TSRepeatNElseArchive"
pure $ TSRepeatNElseArchive n
parseTSRepeatNElseDelete = withObject "TimeoutStrategy (TSRepeatNElseDelete)" $ \o -> do
n <- o .: "TSRepeatNElseDelete"
pure $ TSRepeatNElseDelete n
-- | Job metadata
......
{-# OPTIONS_GHC -Wno-orphans -Wno-missing-signatures #-}
module Main where
import Control.Exception (bracket)
import Data.Maybe (isJust, isNothing)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Worker.Types qualified as PGMQW
import Database.PGMQ.Types qualified as PGMQ
import Test.Hspec
import Test.Tasty
import Test.Tasty.Hspec
main :: IO ()
main = do
simpleSpec <- testSpec "simpleTests" simpleTests
defaultMain $ testGroup "integration tests"
[ simpleSpec ]
data TestEnv =
TestEnv {
conn :: PSQL.Connection
, queue :: String
}
-- NOTE These tests expect a local pgmq server runnign on port 5432.
testQueue :: String
testQueue = "test"
setUpConn :: IO TestEnv
setUpConn = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
conn <- PSQL.connect connInfo
return $ TestEnv { conn, queue = testQueue }
dropConn :: TestEnv -> IO ()
dropConn (TestEnv { conn }) = do
PSQL.close conn
withConn :: (TestEnv -> IO ()) -> IO ()
withConn = bracket setUpConn dropConn
withPGMQ :: (TestEnv -> IO ()) -> IO ()
withPGMQ f = withConn $ \testEnv -> bracket (setUpPGMQ testEnv) (\_ -> pure ()) (\_ -> f testEnv)
where
setUpPGMQ (TestEnv { conn, queue }) = do
PGMQ.initialize conn
PGMQ.dropQueue conn queue
simpleTests :: Spec
simpleTests = sequential $ aroundAll withPGMQ $ describe "Database" $ do
it "can get metrics for non-existing queue" $ \(TestEnv { conn, queue }) -> do
-- first of all, this should also work for non-existing queues
metrics <- PGMQ.getMetrics conn queue
metrics `shouldSatisfy` isNothing
-- let (Just (PGMQ.Metrics { .. })) = metrics
-- queueName `shouldBe` queue
-- totalMessages `shouldBe` 0
......@@ -25,9 +25,16 @@ aesonPropTests = testGroup "Aeson (de-)serialization property tests" $
instance QC.Arbitrary PGMQW.ArchiveStrategy where
arbitrary = QC.elements [ PGMQW.ASDelete, PGMQW.ASArchive ]
instance QC.Arbitrary PGMQW.ErrorStrategy where
arbitrary = QC.elements [ PGMQW.ESDelete, PGMQW.ESArchive, PGMQW.ESRepeat ]
arbitrary = QC.elements [ PGMQW.ESDelete, PGMQW.ESArchive, PGMQW.ESRepeat ]
instance QC.Arbitrary PGMQW.TimeoutStrategy where
arbitrary = QC.elements [ PGMQW.TSDelete, PGMQW.TSArchive, PGMQW.TSRepeat ]
arbitrary = do
n <- arbitrary
m <- arbitrary
QC.elements [ PGMQW.TSDelete
, PGMQW.TSArchive
, PGMQW.TSRepeat
, PGMQW.TSRepeatNElseArchive n
, PGMQW.TSRepeatNElseDelete m ]
instance QC.Arbitrary PGMQW.JobMetadata where
arbitrary = do
archiveStrategy <- arbitrary
......@@ -37,7 +44,7 @@ instance QC.Arbitrary PGMQW.JobMetadata where
return $ PGMQW.JobMetadata { .. }
aesonPropJobMetadataTests = testGroup "Aeson PGMQW.JobMetadata (de-)serialization tests" $
[ QC.testProperty "Aeson.encode . Aeson.decode == id" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\jm ->
Aeson.decode (Aeson.encode (jm :: PGMQW.JobMetadata)) == Just jm
]
......@@ -46,7 +53,7 @@ instance QC.Arbitrary a => QC.Arbitrary (PGMQW.Job a) where
arbitrary = PGMQW.Job <$> arbitrary <*> arbitrary
aesonPropJobTests = testGroup "Aeson PGMQW.Job (de-)serialization tests" $
[ QC.testProperty "Aeson.encode . Aeson.decode == id" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\j ->
Aeson.decode (Aeson.encode (j :: PGMQW.Job String)) == Just j
]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment