Abstract broker compiles now, with pgmq implementation and a simple test

parent 04a62e78
Pipeline #6399 failed with stages
......@@ -220,6 +220,7 @@ test-suite test-pgmq-integration
, postgresql-simple >= 0.6 && < 0.8
, tasty >= 1.5 && < 1.6
, tasty-hspec >= 1.2.0 && < 2
, text >= 1.2 && < 2.2
, haskell-pgmq
......
......@@ -8,17 +8,19 @@ Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.PGMQ
( PGMQBroker(..) )
where
import Async.Worker.Broker (Broker(..), Queue, HasMessageId(..))
import Async.Worker.Types (Job, WorkerBroker)
import Async.Worker.Broker.Types (HasBroker(..), Queue)
import Async.Worker.Types (Job(..), defaultMetadata)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PGMQ.Simple qualified as PGMQ
import Database.PGMQ.Types qualified as PGMQ
......@@ -27,25 +29,49 @@ import Database.PGMQ.Types qualified as PGMQ
type ConnParams = PSQL.ConnectInfo
type Timeout = Int
-- newtype PGMQMessage a = PGMQMessage (PGMQ.Message (Job a))
data PGMQBroker =
PGMQBroker {
conn :: PSQL.Connection
}
data PGMQBroker = PGMQBroker PSQL.Connection
-- instance ( PGMQ.SerializableMessage a )
-- => Broker PGMQBroker (PGMQ.Message (Job a)) (Job a) Int where
instance HasBroker PGMQBroker a where
data Broker PGMQBroker a =
PGMQBroker' {
conn :: PSQL.Connection
}
data BrokerMessage PGMQBroker a = PGMQMessage (PGMQ.Message (Job a))
data Message PGMQBroker a = PGMQJob (Job a)
type MessageId PGMQBroker = Int
type BrokerInitParams PGMQBroker = PGMQBroker
instance (PGMQ.MessageClass a) => Broker PGMQBroker (PGMQ.Message (Job a)) (Job a) Int where
createQueue (PGMQBroker { conn }) queue = do
messageId (PGMQMessage (PGMQ.Message { msgId })) = msgId
getMessage (PGMQMessage (PGMQ.Message { message })) = PGMQJob message
toMessage job = PGMQJob (Job { job, metadata = defaultMetadata })
toA (PGMQJob (Job { job })) = job
initBroker (PGMQBroker conn) = PGMQBroker' { conn }
createQueue (PGMQBroker' { conn }) queue = do
PGMQ.initialize conn
PGMQ.createQueue conn queue
readMessageWaiting q@(PGMQBroker { conn }) queue = do
mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
case mMsg of
Nothing -> readMessageWaiting q queue
Just msg -> return msg
dropQueue (PGMQBroker' { conn }) queue = do
PGMQ.dropQueue conn queue
sendMessage (PGMQBroker { conn }) queue msg =
PGMQ.sendMessage conn queue msg 0
-- readMessageWaiting :: PGMQ.SerializableMessage a
-- => Broker PGMQBroker -> Queue -> IO (BrokerMessage PGMQBroker a)
readMessageWaiting q@(PGMQBroker' { conn }) queue = loop
where
-- loop :: PGMQ.SerializableMessage a => IO (BrokerMessage PGMQBroker' a)
loop = do
mMsg <- PGMQ.readMessageWithPoll conn queue 10 5 100
case mMsg of
Nothing -> readMessageWaiting q queue
Just msg -> return $ PGMQMessage msg
sendMessage (PGMQBroker' { conn }) queue (PGMQJob job) =
PGMQ.sendMessage conn queue job 0
-- -- TODO
-- ackMessage :: (PGMQQueue a) -> QueueName -> (Job a) -> IO ()
......@@ -61,3 +87,4 @@ instance (PGMQ.MessageClass a) => Broker PGMQBroker (PGMQ.Message (Job a)) (Job
-- handleJob :: (PGMQQueue a) -> (Job a) -> IO ()
-- handleJob
......@@ -9,16 +9,26 @@ Portability : POSIX
-}
{-
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FunctionalDependencies #-}
-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Async.Worker.Broker.Types
( Queue
, HasMessageId(..)
, BrokerMessage(..)
, Broker(..) )
-- , HasMessageId(..)
, HasBroker(..)
)
where
import Data.Aeson (FromJSON, ToJSON)
import Data.Kind (Type)
import Data.Typeable (Typeable)
type Queue = String
......@@ -26,48 +36,85 @@ type Queue = String
{-| A message in the queue system must have some properties. In
particular, it must have some sort of 'id'.
-}
class HasMessageId message msgId where
messageId :: message -> msgId
-- TODO There are 3 types of messages here:
-- - 'a' the underlying, user-defined message
-- - 'Job a' worker definition, containing message metadata
-- - "a broker message", i.e. for PGMQ, it returns things like: msgId, readCt, enqueuedAt, vt
-- - 'a' is read-write
-- - 'Job a' is read-write
-- - "broker message" is read-only (i.e. we can't save it to
-- broker. Instead, we save 'Job a' and get "broker message" when
-- reading. In this sense, read and send are not symmetrical.)
-- class HasMessageId message msgId where
-- class HasMessageId m where
-- data family Message m :: Type
-- type family MessageId m :: Type
-- messageId :: Message m -> MessageId m
{- NOTE There are 3 types of messages here:
- 'a' the underlying, user-defined message
- 'Job a' worker definition, containing message metadata
- 'BrokerMessage message m', i.e. for PGMQ, it returns things like:
msgId, readCt, enqueuedAt, vt
- 'a' is read-write
- 'Job a' is read-write
- 'BrokerMessage' is read-only, i.e. we can't save it to broker and
it doesn't make sense to construct it on Haskell side. Instead, we
save 'Job a' and get 'BrokerMessage' when reading. In this sense,
read and send are not symmetrical (similarly, Opaleye has Read and
Write tables).
-}
-- | So this is the broker message that contains our message inside
class BrokerMessage message m where
getMessage :: message -> m
-- class BrokerMessage brokerMessage message where
-- getMessage :: brokerMessage -> message
type SerializableMessage a = ( FromJSON a
, ToJSON a
-- NOTE This shouldn't be necessary
, Typeable a )
{-|
This is an interface for basic broker functionality.
-}
class Broker broker brokerMessage message msgId where
-- class Broker broker brokerMessage message msgId | brokerMessage -> message, brokerMessage -> msgId where
class HasBroker b a where
-- | Data representing the broker
data family Broker b a :: Type
-- | Data represenging message that is returned by broker
data family BrokerMessage b a :: Type
-- | Data that we serialize into broker
data family Message b a :: Type
-- | How to get the message id (needed for delete/archive operations)
type family MessageId b :: Type
type family BrokerInitParams b :: Type
-- | Operation for getting the message id from BrokerMessage
messageId :: BrokerMessage b a -> MessageId b
-- | BrokerMessage contains Message inside
getMessage :: BrokerMessage b a -> Message b a
-- | Convert 'a' to 'Message b a'
toMessage :: a -> Message b a
-- | Conver 'Message b a' to 'a'
toA :: Message b a -> a
-- | Initialize broker
initBroker :: BrokerInitParams b -> Broker b a
{-| Create new queue with given name. Optionally any other
initializations can be added here. -}
createQueue :: broker -> Queue -> IO ()
createQueue :: Broker b a -> Queue -> IO ()
{-| Drop queue -}
dropQueue :: broker -> Queue -> IO ()
dropQueue :: Broker b a -> Queue -> IO ()
{-| Read message, waiting for it if not present -}
readMessageWaiting :: broker -> Queue -> IO brokerMessage
readMessageWaiting :: SerializableMessage a => Broker b a -> Queue -> IO (BrokerMessage b a)
{-| Send message -}
sendMessage :: broker -> Queue -> message -> IO ()
sendMessage :: SerializableMessage a => Broker b a -> Queue -> Message b a -> IO ()
{-| Delete message -}
deleteMessage :: broker -> Queue -> msgId -> IO ()
deleteMessage :: Broker b a -> Queue -> MessageId b -> IO ()
{-| Archive message -}
archiveMessage :: broker -> Queue -> msgId -> IO ()
archiveMessage :: Broker b a -> Queue -> MessageId b -> IO ()
-- NOTE This is worker-specific
......
......@@ -17,16 +17,17 @@ module Async.Worker.Types
, ErrorStrategy(..)
, TimeoutStrategy(..)
, JobMetadata(..)
, defaultMetadata
, Job(..)
, getJob
, jobTimeout
, JobMessage
, WorkerBroker
, State(..)
, PerformAction )
-- , JobMessage
-- , WorkerBroker
, State(..) )
-- , PerformAction )
where
import Async.Worker.Broker.Types (HasMessageId, Broker, Queue)
import Async.Worker.Broker.Types (Broker, BrokerMessage, Queue)
import Control.Applicative ((<|>))
import Data.Aeson (FromJSON(..), ToJSON(..), object, (.=), (.:), withObject, withText)
import Data.Text qualified as T
......@@ -139,6 +140,12 @@ instance FromJSON JobMetadata where
timeoutStrategy <- o .: "tstrat"
timeout <- o .: "timeout"
return $ JobMetadata { .. }
defaultMetadata :: JobMetadata
defaultMetadata =
JobMetadata { archiveStrategy = ASArchive
, errorStrategy = ESArchive
, timeoutStrategy = TSArchive
, timeout = 10 }
-- | Worker has specific message type, because each message carries
-- | around some metadata for the worker itself
......@@ -165,24 +172,25 @@ jobTimeout (Job { metadata }) = timeout metadata
-- | The worker job, as it is serialized in a queue
type JobMessage a msgId = HasMessageId (Job a) msgId
-- type JobMessage a msgId = HasMessageId (Job a) msgId
-- | Broker associated with the abstract worker
type WorkerBroker b brokerMessage a msgId = Broker b brokerMessage (Job a) msgId
-- type WorkerBroker b brokerMessage a msgId = Broker b brokerMessage (Job a) msgId
-- | Main state for a running worker
data State broker brokerMessage a msgId =
State { broker :: broker
-- | Main state for a running worker ('b' is broker, 'a' is the underlying message)
data State b a =
State { broker :: Broker b a
, queueName :: Queue -- name of queue
-- custom name for this worker
, name :: String
, performAction :: PerformAction broker brokerMessage a msgId }
, performAction :: State b a -> BrokerMessage b a -> IO () }
-- , performAction :: PerformAction broker brokerMessage a msgId }
-- | Callback definition (what to execute when a message arrives)
type PerformAction broker brokerMessage a msgId =
(WorkerBroker broker brokerMessage a msgId, JobMessage a msgId)
=> State broker brokerMessage a msgId -> brokerMessage -> IO ()
-- type PerformAction broker brokerMessage a msgId =
-- (WorkerBroker broker brokerMessage a msgId, JobMessage a msgId)
-- => State broker brokerMessage a msgId -> brokerMessage -> IO ()
-- -- | Thrown when job times out
......
......@@ -45,7 +45,7 @@ import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.Newtypes qualified as PSQL (Aeson(..))
import Database.PostgreSQL.Simple.SqlQQ (sql)
import Database.PostgreSQL.Simple.Types qualified as PSQL (PGArray(..))
import Database.PGMQ.Types (Delay, MaxPollSeconds, Message, MessageCount, MessageId, Metrics, MessageClass, PollIntervalMs, Queue, QueueInfo, VisibilityTimeout)
import Database.PGMQ.Types (Delay, MaxPollSeconds, Message, MessageCount, MessageId, Metrics, SerializableMessage, PollIntervalMs, Queue, QueueInfo, VisibilityTimeout)
import Safe (headMay)
......@@ -142,7 +142,7 @@ listQueues conn =
-- | Read a message and immediately delete it from the queue. Returns `None` if the queue is empty.
-- https://tembo.io/pgmq/api/sql/functions/#pop
popMessage :: (MessageClass a)
popMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> IO (Maybe (Message a))
popMessage conn queue = do
PSQL.query conn [sql| SELECT * FROM pgmq.pop(?) |] (PSQL.Only queue) >>= return . headMay
......@@ -155,7 +155,7 @@ purgeQueue conn queue = do
-- | Read a message from given queue, with given visibility timeout (in seconds)
-- https://tembo.io/pgmq/api/sql/functions/#read
readMessage :: (MessageClass a)
readMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> VisibilityTimeout -> IO (Maybe (Message a))
readMessage conn queue vt =
readMessages conn queue vt 1 >>= return . headMay
......@@ -163,7 +163,7 @@ readMessage conn queue vt =
{-| Reads given number of messages from given queue
https://tembo.io/pgmq/api/sql/functions/#read -}
readMessages :: (MessageClass a)
readMessages :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> VisibilityTimeout -> MessageCount -> IO [Message a]
readMessages conn queue vt count =
PSQL.query conn [sql| SELECT * FROM pgmq.read(?, ?, ?) |] (queue, vt, count)
......@@ -174,7 +174,7 @@ readMessages conn queue vt count =
NOTE This is a blocking operation.
https://tembo.io/pgmq/api/sql/functions/#read_with_poll -}
readMessageWithPoll :: (MessageClass a)
readMessageWithPoll :: (SerializableMessage a)
=> PSQL.Connection
-> Queue
-> VisibilityTimeout
......@@ -188,7 +188,7 @@ readMessageWithPoll conn queue vt maxPollSeconds pollIntervalMs =
-- queue is empty.
-- NOTE This is a blocking operation.
-- https://tembo.io/pgmq/api/sql/functions/#read_with_poll
readMessagesWithPoll :: (MessageClass a)
readMessagesWithPoll :: (SerializableMessage a)
=> PSQL.Connection
-> Queue
-> VisibilityTimeout
......@@ -202,14 +202,14 @@ readMessagesWithPoll conn queue vt count maxPollSeconds pollIntervalMs =
-- | Sends one message to a queue
-- https://tembo.io/pgmq/api/sql/functions/#send
sendMessage :: (MessageClass a)
sendMessage :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> a -> Delay -> IO ()
sendMessage conn queue msg delay =
void (PSQL.query conn [sql| SELECT pgmq.send(?, ?::jsonb, ?) |] (queue, PSQL.Aeson msg, delay) :: IO [PSQL.Only Int])
-- | Sends a batch of messages
-- https://tembo.io/pgmq/api/sql/functions/#send_batch
sendMessages :: (MessageClass a)
sendMessages :: (SerializableMessage a)
=> PSQL.Connection -> Queue -> [a] -> Delay -> IO ()
sendMessages conn queue msgs delay =
void (PSQL.query conn [sql| SELECT pgmq.send_batch(?, ?::jsonb[], ?) |] (queue, PSQL.PGArray (PSQL.Aeson <$> msgs), delay) :: IO [PSQL.Only Int])
......
......@@ -15,7 +15,6 @@ Portability : POSIX
module Database.PGMQ.Types
where
import Async.Worker.Broker.Types (HasMessageId(..), BrokerMessage(..))
import Data.Aeson (FromJSON, ToJSON)
import Data.Time.LocalTime (ZonedTime, zonedTimeToUTC)
import Data.Typeable (Typeable)
......@@ -51,10 +50,10 @@ type PollIntervalMs = Int
-- | Basic message typeclass for PGMQ: it has to be jsonb-serializable
type MessageClass a = ( FromJSON a
, ToJSON a
-- NOTE This shouldn't be necessary
, Typeable a )
type SerializableMessage a = ( FromJSON a
, ToJSON a
-- NOTE This shouldn't be necessary
, Typeable a )
-- | Message, as returned by the 'pgmq.read' function
data Message a =
......@@ -70,7 +69,7 @@ instance Eq a => Eq (Message a) where
(readCt msg1) == (readCt msg2) &&
(zonedTimeToUTC $ vt msg1) == (zonedTimeToUTC $ vt msg2) &&
(message msg1) == (message msg2)
instance MessageClass a => PSQL.FromRow (Message a) where
instance SerializableMessage a => PSQL.FromRow (Message a) where
fromRow = do
msgId <- PSQL.field
readCt <- PSQL.field
......@@ -79,10 +78,6 @@ instance MessageClass a => PSQL.FromRow (Message a) where
messageA <- PSQL.field
return $ Message { message = PSQL.getAeson messageA, .. }
instance HasMessageId (Message a) Int where
messageId (Message { msgId }) = msgId
instance BrokerMessage (Message a) a where
getMessage (Message { message }) = message
-- | Returned by pgmq.list_queues
data QueueInfo =
......
......@@ -2,12 +2,22 @@
Generic Broker tests. All brokers should satisfy them.
-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.Integration.Broker
( brokerTests )
( brokerTests
, initPGMQBroker )
where
import Async.Worker.Broker.PGMQ (PGMQBroker(..))
import Async.Worker.Broker.Types qualified as BT
import Database.PostgreSQL.Simple qualified as PSQL
import Control.Exception (bracket)
import Data.Aeson (ToJSON(..), FromJSON(..), withText)
import Data.Text qualified as T
import Test.Hspec
data TestEnv b =
......@@ -20,24 +30,48 @@ testQueue = "test"
data Message =
Message { text :: String }
deriving (Show, Eq)
instance ToJSON Message where
toJSON (Message { text }) = toJSON text
instance FromJSON Message where
parseJSON = withText "Message" $ \text -> do
pure $ Message { text = T.unpack text }
initPGMQBroker :: IO (BT.Broker PGMQBroker Message)
initPGMQBroker = do
let connInfo = PSQL.defaultConnectInfo { PSQL.connectUser = "postgres"
, PSQL.connectDatabase = "postgres" }
conn <- PSQL.connect connInfo
pure $ BT.initBroker (PGMQBroker conn)
withBroker :: BT.Broker b => b -> (TestEnv b -> IO ()) -> IO ()
withBroker b = bracket (setUpBroker b) (tearDownBroker b)
where
setUpBroker b = do
BT.createQueue b testQueue
-- withBroker :: (BT.Broker b bm m msgId) => b -> (TestEnv b -> IO ()) -> IO ()
-- withBroker b = bracket (setUpBroker b) tearDownBroker
-- where
-- setUpBroker :: (BT.Broker b bm m msgId) => b -> IO (TestEnv b)
-- setUpBroker b = do
-- BT.createQueue b testQueue
return $ TestEnv { broker = b
, queue = testQueue }
-- return $ TestEnv { broker = b
-- , queue = testQueue }
tearDownBroker b = do
BT.dropQueue b testQueue
-- tearDownBroker b = do
-- BT.dropQueue b testQueue
brokerTests :: BT.Broker broker => broker -> Spec
brokerTests b = sequential $ aroundAll (withBroker b) $ describe "Broker tests" $ do
it "can send and receive a message" $ \(TestEnv { broker, queue }) -> do
-- brokerTests :: BT.Broker broker brokerMessage message msgId => broker -> Spec
-- brokerTests b = sequential $ aroundAll (withBroker b) $ describe "Broker tests" $ do
-- brokerTests :: ( BT.BrokerMessage brokerMessage Message
-- , BT.Broker broker brokerMessage Message msgId )
-- => broker -> Spec
brokerTests :: BT.HasBroker b Message => BT.Broker b Message -> Spec
brokerTests b = describe "Broker tests" $ do
-- it "can send and receive a message" $ \(TestEnv { broker, queue }) -> do
it "can send and receive a message" $ do
BT.dropQueue b testQueue
BT.createQueue b testQueue
let msg = Message { text = "test" }
BT.sendMessage b testQueue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting b testQueue
msg `shouldBe` (BT.toA $ BT.getMessage msg2)
module Main where
import Test.Integration.Broker (brokerTests)
import Test.Integration.Broker (brokerTests, initPGMQBroker)
import Test.Integration.PGMQ.Simple (pgmqSimpleTests)
import Test.Tasty
import Test.Tasty.Hspec
......@@ -9,7 +9,8 @@ import Test.Tasty.Hspec
main :: IO ()
main = do
brokerSpec <- testSpec "brokerTests" brokerTests
pgmqBroker <- initPGMQBroker
brokerSpec <- testSpec "brokerTests" (brokerTests pgmqBroker)
pgmqSimpleSpec <- testSpec "pgmqSimpleTests" pgmqSimpleTests
defaultMain $ testGroup "integration tests"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment