Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
H
haskell-pgmq
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gargantext
haskell-pgmq
Commits
3d9a80d4
Verified
Commit
3d9a80d4
authored
Jun 16, 2025
by
Przemyslaw Kaminski
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[refactor] <&> refactoring
parent
1dd92f0a
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
16 additions
and
15 deletions
+16
-15
Simple.hs
src/Database/PGMQ/Simple.hs
+16
-15
No files found.
src/Database/PGMQ/Simple.hs
View file @
3d9a80d4
...
@@ -11,7 +11,7 @@ https://tembo.io/pgmq/api/sql/functions/
...
@@ -11,7 +11,7 @@ https://tembo.io/pgmq/api/sql/functions/
-}
-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE QuasiQuotes #-}
module
Database.PGMQ.Simple
module
Database.PGMQ.Simple
(
initialize
(
initialize
,
archiveMessage
,
archiveMessage
...
@@ -45,6 +45,7 @@ where
...
@@ -45,6 +45,7 @@ where
-- import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException)
-- import Control.Exception (Exception, SomeException(..), catch, fromException, throwIO, toException)
import
Control.Monad
(
void
)
import
Control.Monad
(
void
)
import
Data.Functor
((
<&>
))
import
Data.String
(
fromString
)
import
Data.String
(
fromString
)
import
Data.Text
qualified
as
T
import
Data.Text
qualified
as
T
import
Database.PGMQ.Schema
(
pgmqSchema
)
import
Database.PGMQ.Schema
(
pgmqSchema
)
...
@@ -109,7 +110,7 @@ createQueue conn queue =
...
@@ -109,7 +110,7 @@ createQueue conn queue =
deleteMessage
::
PSQL
.
Connection
->
Queue
->
MessageId
->
IO
()
deleteMessage
::
PSQL
.
Connection
->
Queue
->
MessageId
->
IO
()
deleteMessage
conn
queue
msgId
=
deleteMessage
conn
queue
msgId
=
void
(
PSQL
.
query
conn
[
sql
|
SELECT pgmq.delete(?, ?)
|]
(
queue
,
msgId
)
::
IO
[
PSQL
.
Only
Bool
])
void
(
PSQL
.
query
conn
[
sql
|
SELECT pgmq.delete(?, ?)
|]
(
queue
,
msgId
)
::
IO
[
PSQL
.
Only
Bool
])
{-| Deletes given messages from given queue
{-| Deletes given messages from given queue
https://tembo.io/pgmq/api/sql/functions/#delete-batch -}
https://tembo.io/pgmq/api/sql/functions/#delete-batch -}
...
@@ -129,7 +130,7 @@ dropQueue conn queue =
...
@@ -129,7 +130,7 @@ dropQueue conn queue =
https://tembo.io/pgmq/api/sql/functions/#metrics -}
https://tembo.io/pgmq/api/sql/functions/#metrics -}
getMetrics
::
PSQL
.
Connection
->
Queue
->
IO
(
Maybe
Metrics
)
getMetrics
::
PSQL
.
Connection
->
Queue
->
IO
(
Maybe
Metrics
)
getMetrics
conn
queue
=
getMetrics
conn
queue
=
PSQL
.
query
conn
[
sql
|
SELECT * FROM pgmq.metrics(?)
|]
(
PSQL
.
Only
queue
)
>>=
return
.
headMay
PSQL
.
query
conn
[
sql
|
SELECT * FROM pgmq.metrics(?)
|]
(
PSQL
.
Only
queue
)
<&>
headMay
-- catch
-- catch
-- (PSQL.query conn [sql| SELECT * FROM pgmq.metrics(?) |] (PSQL.Only queue) >>= return . headMay)
-- (PSQL.query conn [sql| SELECT * FROM pgmq.metrics(?) |] (PSQL.Only queue) >>= return . headMay)
-- handleError
-- handleError
...
@@ -153,7 +154,7 @@ getMetrics conn queue =
...
@@ -153,7 +154,7 @@ getMetrics conn queue =
-- Just (PSQL.SqlError { sqlState = "42P01" }) -> return Nothing
-- Just (PSQL.SqlError { sqlState = "42P01" }) -> return Nothing
-- -- re-raise other errors
-- -- re-raise other errors
-- _ -> throwIO err
-- _ -> throwIO err
-- | Read metrics for all queues
-- | Read metrics for all queues
-- https://tembo.io/pgmq/api/sql/functions/#metrics_all
-- https://tembo.io/pgmq/api/sql/functions/#metrics_all
...
@@ -166,7 +167,7 @@ getMetricsAll conn =
...
@@ -166,7 +167,7 @@ getMetricsAll conn =
listQueues
::
PSQL
.
Connection
->
IO
[
QueueInfo
]
listQueues
::
PSQL
.
Connection
->
IO
[
QueueInfo
]
listQueues
conn
=
listQueues
conn
=
PSQL
.
query_
conn
[
sql
|
SELECT * FROM pgmq.list_queues()
|]
PSQL
.
query_
conn
[
sql
|
SELECT * FROM pgmq.list_queues()
|]
-- | Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
-- | Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
-- https://tembo.io/pgmq/api/sql/functions/#pop
-- https://tembo.io/pgmq/api/sql/functions/#pop
popMessage
::
(
SerializableMessage
a
)
popMessage
::
(
SerializableMessage
a
)
...
@@ -183,13 +184,13 @@ popMessage conn queue = do
...
@@ -183,13 +184,13 @@ popMessage conn queue = do
purgeQueue
::
PSQL
.
Connection
->
Queue
->
IO
()
purgeQueue
::
PSQL
.
Connection
->
Queue
->
IO
()
purgeQueue
conn
queue
=
do
purgeQueue
conn
queue
=
do
void
(
PSQL
.
query
conn
[
sql
|
SELECT * FROM pgmq.purge_queue(?)
|]
(
PSQL
.
Only
queue
)
::
IO
[
PSQL
.
Only
Int
])
void
(
PSQL
.
query
conn
[
sql
|
SELECT * FROM pgmq.purge_queue(?)
|]
(
PSQL
.
Only
queue
)
::
IO
[
PSQL
.
Only
Int
])
-- | Read a message from given queue, with given visibility timeout (in seconds)
-- | Read a message from given queue, with given visibility timeout (in seconds)
-- https://tembo.io/pgmq/api/sql/functions/#read
-- https://tembo.io/pgmq/api/sql/functions/#read
readMessage
::
(
SerializableMessage
a
)
readMessage
::
(
SerializableMessage
a
)
=>
PSQL
.
Connection
->
Queue
->
VisibilityTimeout
->
IO
(
Maybe
(
Message
a
))
=>
PSQL
.
Connection
->
Queue
->
VisibilityTimeout
->
IO
(
Maybe
(
Message
a
))
readMessage
conn
queue
vt
=
readMessage
conn
queue
vt
=
readMessages
conn
queue
vt
1
>>=
return
.
headMay
readMessages
conn
queue
vt
1
<&>
headMay
-- | Read a message from given archive
-- | Read a message from given archive
readMessageFromArchive
::
(
SerializableMessage
a
)
readMessageFromArchive
::
(
SerializableMessage
a
)
...
@@ -198,7 +199,7 @@ readMessageFromArchive conn queue msgId = do
...
@@ -198,7 +199,7 @@ readMessageFromArchive conn queue msgId = do
let
archiveTable
=
"a_"
<>
queue
let
archiveTable
=
"a_"
<>
queue
PSQL
.
query
conn
PSQL
.
query
conn
[
sql
|
SELECT msg_id, read_ct, enqueued_at, archived_at, vt, message FROM ? WHERE msg_id = ?
|]
[
sql
|
SELECT msg_id, read_ct, enqueued_at, archived_at, vt, message FROM ? WHERE msg_id = ?
|]
(
PSQL
.
QualifiedIdentifier
(
Just
"pgmq"
)
$
T
.
pack
archiveTable
,
msgId
)
>>=
return
.
headMay
(
PSQL
.
QualifiedIdentifier
(
Just
"pgmq"
)
$
T
.
pack
archiveTable
,
msgId
)
<&>
headMay
-- | Read a message from queue with given ID (for querying purposes, doesn't pop message from queue)
-- | Read a message from queue with given ID (for querying purposes, doesn't pop message from queue)
readMessageById
::
(
SerializableMessage
a
)
readMessageById
::
(
SerializableMessage
a
)
...
@@ -207,7 +208,7 @@ readMessageById conn queue msgId = do
...
@@ -207,7 +208,7 @@ readMessageById conn queue msgId = do
let
queueTable
=
"q_"
<>
queue
let
queueTable
=
"q_"
<>
queue
PSQL
.
query
conn
PSQL
.
query
conn
[
sql
|
SELECT msg_id, read_ct, enqueued_at, NULL, vt, message FROM ? WHERE msg_id = ?
|]
[
sql
|
SELECT msg_id, read_ct, enqueued_at, NULL, vt, message FROM ? WHERE msg_id = ?
|]
(
PSQL
.
QualifiedIdentifier
(
Just
"pgmq"
)
$
T
.
pack
queueTable
,
msgId
)
>>=
return
.
headMay
(
PSQL
.
QualifiedIdentifier
(
Just
"pgmq"
)
$
T
.
pack
queueTable
,
msgId
)
<&>
headMay
{-| Reads given number of messages from given queue
{-| Reads given number of messages from given queue
...
@@ -233,7 +234,7 @@ readMessageWithPoll :: (SerializableMessage a)
...
@@ -233,7 +234,7 @@ readMessageWithPoll :: (SerializableMessage a)
->
PollIntervalMs
->
PollIntervalMs
->
IO
(
Maybe
(
Message
a
))
->
IO
(
Maybe
(
Message
a
))
readMessageWithPoll
conn
queue
vt
maxPollSeconds
pollIntervalMs
=
readMessageWithPoll
conn
queue
vt
maxPollSeconds
pollIntervalMs
=
readMessagesWithPoll
conn
queue
vt
1
maxPollSeconds
pollIntervalMs
>>=
return
.
headMay
readMessagesWithPoll
conn
queue
vt
1
maxPollSeconds
pollIntervalMs
<&>
headMay
-- | Reads given number of messages, polling for given duration if the
-- | Reads given number of messages, polling for given duration if the
-- queue is empty.
-- queue is empty.
...
@@ -251,7 +252,7 @@ readMessagesWithPoll conn queue vt count maxPollSeconds pollIntervalMs =
...
@@ -251,7 +252,7 @@ readMessagesWithPoll conn queue vt count maxPollSeconds pollIntervalMs =
PSQL
.
query
conn
PSQL
.
query
conn
[
sql
|
SELECT msg_id, read_ct, enqueued_at, NULL, vt, message FROM pgmq.read_with_poll(?, ?, ?, ?, ?)
|]
[
sql
|
SELECT msg_id, read_ct, enqueued_at, NULL, vt, message FROM pgmq.read_with_poll(?, ?, ?, ?, ?)
|]
(
queue
,
vt
,
count
,
maxPollSeconds
,
pollIntervalMs
)
(
queue
,
vt
,
count
,
maxPollSeconds
,
pollIntervalMs
)
-- | Sends one message to a queue
-- | Sends one message to a queue
-- https://tembo.io/pgmq/api/sql/functions/#send
-- https://tembo.io/pgmq/api/sql/functions/#send
sendMessage
::
(
SerializableMessage
a
)
sendMessage
::
(
SerializableMessage
a
)
...
@@ -260,8 +261,8 @@ sendMessage conn queue msg delay = do
...
@@ -260,8 +261,8 @@ sendMessage conn queue msg delay = do
[
PSQL
.
Only
msgId
]
<-
PSQL
.
query
conn
[
sql
|
SELECT pgmq.send(?, ?::jsonb, ?)
|]
(
queue
,
PSQL
.
Aeson
msg
,
delay
)
[
PSQL
.
Only
msgId
]
<-
PSQL
.
query
conn
[
sql
|
SELECT pgmq.send(?, ?::jsonb, ?)
|]
(
queue
,
PSQL
.
Aeson
msg
,
delay
)
return
msgId
return
msgId
-- | Sends a batch of messages
-- | Sends a batch of messages
-- https://tembo.io/pgmq/api/sql/functions/#send_batch
-- https://tembo.io/pgmq/api/sql/functions/#send_batch
sendMessages
::
(
SerializableMessage
a
)
sendMessages
::
(
SerializableMessage
a
)
...
@@ -269,8 +270,8 @@ sendMessages :: (SerializableMessage a)
...
@@ -269,8 +270,8 @@ sendMessages :: (SerializableMessage a)
sendMessages
conn
queue
msgs
delay
=
do
sendMessages
conn
queue
msgs
delay
=
do
ret
<-
PSQL
.
query
conn
[
sql
|
SELECT pgmq.send_batch(?, ?::jsonb[], ?)
|]
(
queue
,
PSQL
.
PGArray
(
PSQL
.
Aeson
<$>
msgs
),
delay
)
ret
<-
PSQL
.
query
conn
[
sql
|
SELECT pgmq.send_batch(?, ?::jsonb[], ?)
|]
(
queue
,
PSQL
.
PGArray
(
PSQL
.
Aeson
<$>
msgs
),
delay
)
return
$
(
\
(
PSQL
.
Only
id'
)
->
id'
)
<$>
ret
return
$
(
\
(
PSQL
.
Only
id'
)
->
id'
)
<$>
ret
-- | Sets the visibility timeout of a message for X seconds from now
-- | Sets the visibility timeout of a message for X seconds from now
-- https://tembo.io/pgmq/api/sql/functions/#set_vt
-- https://tembo.io/pgmq/api/sql/functions/#set_vt
setMessageVt
::
PSQL
.
Connection
->
Queue
->
MessageId
->
VisibilityTimeout
->
IO
()
setMessageVt
::
PSQL
.
Connection
->
Queue
->
MessageId
->
VisibilityTimeout
->
IO
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment