Add queueAvailableLength to count number of readily available messages

parent f827b577
Pipeline #6508 passed with stages
in 8 minutes and 10 seconds
...@@ -32,7 +32,8 @@ module Database.PGMQ.Simple ...@@ -32,7 +32,8 @@ module Database.PGMQ.Simple
, readMessagesWithPoll , readMessagesWithPoll
, sendMessage , sendMessage
, sendMessages , sendMessages
, setMessageVt ) , setMessageVt
, queueAvailableLength )
where where
...@@ -254,6 +255,18 @@ setMessageVt conn queue msgId vt = ...@@ -254,6 +255,18 @@ setMessageVt conn queue msgId vt =
-- | This function returns the number of readily available messages.
-- 'metrics' returns total queue size, but even then, when vt of
-- these messages is set in the future, the read operation might
-- return nothing (see queueAvailableNow).
queueAvailableLength :: PSQL.Connection -> Queue -> IO Int
queueAvailableLength conn queue = do
let queueTable = "q_" <> queue
let tableId = PSQL.QualifiedIdentifier (Just "pgmq") $ T.pack queueTable
[PSQL.Only len] <- PSQL.query conn [sql| SELECT COUNT(*) FROM ? WHERE vt <= now() |] (PSQL.Only tableId) :: IO [PSQL.Only Int]
return len
{-| {-|
A utility function: sometimes pgmq throws an error that a table (for A utility function: sometimes pgmq throws an error that a table (for
queue) doesn't exist and we want to ignore it as it's not critical in queue) doesn't exist and we want to ignore it as it's not critical in
......
...@@ -118,3 +118,21 @@ pgmqSimpleTests = parallel $ around withPGMQ $ describe "PGMQ Simple" $ do ...@@ -118,3 +118,21 @@ pgmqSimpleTests = parallel $ around withPGMQ $ describe "PGMQ Simple" $ do
msg1 `shouldBe` message1 msg1 `shouldBe` message1
msg2 `shouldBe` message2 msg2 `shouldBe` message2
) iter ) iter
it "can count messages properly, including vt (https://github.com/tembo-io/pgmq/issues/301)" $ \(TestEnv { conn, queue }) -> do
let message = "hello vt test" :: String
msgId <- PGMQ.sendMessage conn queue message 1
-- immediately after creating such message, we should get metrics.length = 1 and available length = 0
mMetrics <- PGMQ.getMetrics conn queue
let mQLen = PGMQ.queueLength <$> mMetrics
mQLen `shouldBe` (Just 1)
aLen <- PGMQ.queueAvailableLength conn queue
aLen `shouldBe` 0
-- Now reset the vt
PGMQ.setMessageVt conn queue msgId 0
mMetrics' <- PGMQ.getMetrics conn queue
let mQLen' = PGMQ.queueLength <$> mMetrics'
mQLen' `shouldBe` (Just 1)
aLen' <- PGMQ.queueAvailableLength conn queue
aLen' `shouldBe` 1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment