[redis] implement message id using INCR counter

parent d72ceed5
Pipeline #6444 passed with stages
in 9 minutes and 57 seconds
...@@ -19,9 +19,11 @@ https://redis.io/glossary/redis-queue/ ...@@ -19,9 +19,11 @@ https://redis.io/glossary/redis-queue/
module Async.Worker.Broker.Redis module Async.Worker.Broker.Redis
( RedisBroker ( RedisBroker
, BrokerInitParams(..) ) , BrokerInitParams(..)
, RedisWithMsgId(..) )
where where
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object)
import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage) import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage)
import Control.Monad (void) import Control.Monad (void)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
...@@ -37,7 +39,8 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where ...@@ -37,7 +39,8 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
RedisBroker' { RedisBroker' {
conn :: Redis.Connection conn :: Redis.Connection
} }
data BrokerMessage RedisBroker a = RedisBM a data BrokerMessage RedisBroker a =
RedisBM (RedisWithMsgId a)
deriving (Show) deriving (Show)
data Message RedisBroker a = RedisM a data Message RedisBroker a = RedisM a
data MessageId RedisBroker = RedisMid Int data MessageId RedisBroker = RedisMid Int
...@@ -46,8 +49,8 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where ...@@ -46,8 +49,8 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
-- We're using simple QUEUE so we don't care about message id as we -- We're using simple QUEUE so we don't care about message id as we
-- won't be deleting/archiving the messages -- won't be deleting/archiving the messages
messageId _ = RedisMid 0 messageId (RedisBM (RedisWithMsgId { rmidId })) = RedisMid rmidId
getMessage (RedisBM msg) = RedisM msg getMessage (RedisBM (RedisWithMsgId { rmida })) = RedisM rmida
toMessage message = RedisM message toMessage message = RedisM message
toA (RedisM message) = message toA (RedisM message) = message
initBroker (RedisBrokerInitParams connInfo) = do initBroker (RedisBrokerInitParams connInfo) = do
...@@ -76,8 +79,14 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where ...@@ -76,8 +79,14 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
Just dmsg -> return $ RedisBM dmsg Just dmsg -> return $ RedisBM dmsg
Nothing -> undefined Nothing -> undefined
sendMessage (RedisBroker' { conn }) queue (RedisM message) = sendMessage (RedisBroker' { conn }) queue (RedisM message) = do
void $ Redis.runRedis conn $ Redis.lpush (BS.pack queue) [BSL.toStrict $ Aeson.encode message] let key = "key-" <> queue
eId <- Redis.runRedis conn $ Redis.incr $ BS.pack key
case eId of
Left _err -> undefined
Right id' -> do
let m = RedisWithMsgId { rmidId = fromIntegral id', rmida = message }
void $ Redis.runRedis conn $ Redis.lpush (BS.pack queue) [BSL.toStrict $ Aeson.encode m]
-- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do -- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
deleteMessage _broker _queue _msgId = do deleteMessage _broker _queue _msgId = do
...@@ -96,3 +105,20 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where ...@@ -96,3 +105,20 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
Left _ -> return 0 Left _ -> return 0
-- | Helper datatype to store message with a unique id.
-- We fetch the id by using 'INCR'
-- https://redis.io/docs/latest/commands/incr/
data RedisWithMsgId a =
RedisWithMsgId { rmida :: a
, rmidId :: Int }
deriving (Show, Eq)
instance FromJSON a => FromJSON (RedisWithMsgId a) where
parseJSON = withObject "RedisWithMsgId" $ \o -> do
rmida <- o .: "rmida"
rmidId <- o .: "rmidId"
return $ RedisWithMsgId { rmida, rmidId }
instance ToJSON a => ToJSON (RedisWithMsgId a) where
toJSON (RedisWithMsgId { .. }) = toJSON $ object [
"rmida" .= rmida
, "rmidId" .= rmidId
]
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
module Main where module Main where
import Async.Worker.Broker.Redis qualified as R
import Async.Worker.Types qualified as WT import Async.Worker.Types qualified as WT
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Test.Tasty import Test.Tasty
...@@ -19,7 +20,8 @@ propertyTests = testGroup "Property tests" [aesonPropTests] ...@@ -19,7 +20,8 @@ propertyTests = testGroup "Property tests" [aesonPropTests]
aesonPropTests = testGroup "Aeson (de-)serialization property tests" $ aesonPropTests = testGroup "Aeson (de-)serialization property tests" $
[ aesonPropJobMetadataTests [ aesonPropJobMetadataTests
, aesonPropJobTests ] , aesonPropJobTests
, aesonPropRedisTests ]
instance QC.Arbitrary WT.ArchiveStrategy where instance QC.Arbitrary WT.ArchiveStrategy where
arbitrary = QC.elements [ WT.ASDelete, WT.ASArchive ] arbitrary = QC.elements [ WT.ASDelete, WT.ASArchive ]
...@@ -60,6 +62,19 @@ aesonPropJobTests = testGroup "Aeson WT.Job (de-)serialization tests" $ ...@@ -60,6 +62,19 @@ aesonPropJobTests = testGroup "Aeson WT.Job (de-)serialization tests" $
Aeson.decode (Aeson.encode (j :: WT.Job String)) == Just j Aeson.decode (Aeson.encode (j :: WT.Job String)) == Just j
] ]
instance QC.Arbitrary a => QC.Arbitrary (R.RedisWithMsgId a) where
arbitrary = do
rmidId <- arbitrary
rmida <- arbitrary
return $ R.RedisWithMsgId { rmida, rmidId }
aesonPropRedisTests = testGroup "Aeson RedisWithMsgId (de-)serialization tests" $
[ QC.testProperty "Aeson.decode . Aeson.encode == id" $
\j ->
Aeson.decode (Aeson.encode (j :: R.RedisWithMsgId String)) == Just j
]
unitTests = testGroup "Unit tests" [] unitTests = testGroup "Unit tests" []
-- [ testCase "List comparison (different length)" $ -- [ testCase "List comparison (different length)" $
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment