[redis] fix tests

parent 4a9c7096
Pipeline #7326 passed with stages
in 12 minutes and 39 seconds
...@@ -33,16 +33,19 @@ module Async.Worker.Broker.Redis ...@@ -33,16 +33,19 @@ module Async.Worker.Broker.Redis
, RedisWithMsgId(..) ) , RedisWithMsgId(..) )
where where
import Async.Worker.Broker.Types (MessageBroker(..), Queue, SerializableMessage, renderQueue) import Async.Worker.Broker.Types (MessageBroker(..), Queue, SerializableMessage, TimeoutS(..), renderQueue)
-- import Control.Concurrent (threadDelay) -- import Control.Concurrent (threadDelay)
import Control.Monad (void) import Control.Monad (void)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object, withScientific) import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object, withScientific)
import Data.ByteString.Char8 qualified as BS import Data.ByteString.Char8 qualified as BSC
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Maybe (catMaybes) import Data.Maybe (catMaybes, fromMaybe)
import Data.Scientific (floatingOrInteger) import Data.Scientific (floatingOrInteger)
import Data.UnixTime (getUnixTime, UnixTime(..))
import Database.Redis qualified as Redis import Database.Redis qualified as Redis
import Foreign.C.Types (CTime(..))
import Text.Read (readMaybe)
data RedisBroker data RedisBroker
...@@ -116,8 +119,14 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where ...@@ -116,8 +119,14 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
-- Nothing -> undefined -- Nothing -> undefined
-- Just msg -> return msg -- Just msg -> return msg
setMessageTimeout _broker _queue _msgId _timeoutS = setMessageTimeout (RedisBroker' { conn }) queue msgId (TimeoutS timeoutS) = do
pure () ut <- getUnixTime
void $ Redis.runRedis conn $ do
let CTime t = utSeconds ut
let ms = fromIntegral (utMicroSeconds ut) :: Integer
Redis.hset queueK (idToBS msgId) (BSC.pack $ (show $ toInteger t + fromIntegral timeoutS) <> "." <> show ms)
where
queueK = messageTimeoutKey queue
sendMessage b@(RedisBroker' { conn }) queue (RedisM message) = do sendMessage b@(RedisBroker' { conn }) queue (RedisM message) = do
mId <- nextId b queue mId <- nextId b queue
...@@ -143,18 +152,22 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where ...@@ -143,18 +152,22 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
deleteMessage (RedisBroker' { conn }) queue msgId = do deleteMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue let queueK = queueKey queue
let messageK = messageKey queue msgId let messageK = messageKey queue msgId
let timeoutK = messageTimeoutKey queue
-- void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId] -- void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId]
void $ Redis.runRedis conn $ do void $ Redis.runRedis conn $ do
_ <- Redis.lrem queueK 1 (idToBS msgId) void $ Redis.lrem queueK 1 (idToBS msgId)
Redis.del [messageK] void $ Redis.del [messageK]
Redis.hdel timeoutK [idToBS msgId]
-- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do -- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
archiveMessage (RedisBroker' { conn }) queue msgId = do archiveMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue let queueK = queueKey queue
let archiveK = archiveKey queue let archiveK = archiveKey queue
let timeoutK = messageTimeoutKey queue
void $ Redis.runRedis conn $ do void $ Redis.runRedis conn $ do
_ <- Redis.lrem queueK 1 (idToBS msgId) void $ Redis.lrem queueK 1 (idToBS msgId)
Redis.sadd archiveK [idToBS msgId] void $ Redis.sadd archiveK [idToBS msgId]
Redis.hdel timeoutK [idToBS msgId]
-- eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId) -- eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId)
-- case eMove of -- case eMove of
-- Left _ -> undefined -- Left _ -> undefined
...@@ -164,13 +177,16 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where ...@@ -164,13 +177,16 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
-- void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId] -- void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId]
-- TODO This is incorrect: we should include message timeout in this count -- TODO This is incorrect: we should include message timeout in this count
getQueueSize (RedisBroker' { conn }) queue = do -- getQueueSize (RedisBroker' { conn }) queue = do
let queueK = queueKey queue -- let queueK = queueKey queue
-- eLen <- Redis.runRedis conn $ Redis.scard queueK -- -- eLen <- Redis.runRedis conn $ Redis.scard queueK
eLen <- Redis.runRedis conn $ Redis.llen queueK -- eLen <- Redis.runRedis conn $ Redis.llen queueK
case eLen of -- case eLen of
Right len -> return $ fromIntegral len -- Right len -> return $ fromIntegral len
Left _ -> undefined -- Left _ -> undefined
getQueueSize b queue = do
msgIds <- listPendingMessageIds b queue
pure $ length msgIds
getArchivedMessage b@(RedisBroker' { conn }) queue msgId = do getArchivedMessage b@(RedisBroker' { conn }) queue msgId = do
let archiveK = archiveKey queue let archiveK = archiveKey queue
...@@ -180,23 +196,41 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where ...@@ -180,23 +196,41 @@ instance (SerializableMessage a, Show a) => MessageBroker RedisBroker a where
getRedisMessage b queue msgId getRedisMessage b queue msgId
_ -> return Nothing _ -> return Nothing
listPendingMessageIds (RedisBroker' { conn }) queue = do listPendingMessageIds b@(RedisBroker' { conn }) queue = do
let queueK = queueKey queue let queueK = queueKey queue
eMsgIds <- Redis.runRedis conn $ Redis.lrange queueK 0 (-1) eMsgIds <- Redis.runRedis conn $ Redis.lrange queueK 0 (-1)
case eMsgIds of case eMsgIds of
Left _ -> return [] Left _ -> return []
Right msgIds -> return $ catMaybes (bsToId <$> msgIds) Right msgIds -> do
let msgIds' = catMaybes (bsToId <$> msgIds)
ut <- getUnixTime
timeouts <- mapM (getMessageTimeout b queue) msgIds'
pure $ map fst $ filter (\(_msgId, ts) -> (fromMaybe ut ts) <= ut) $ zip msgIds' timeouts
getMessageById b queue msgId = do getMessageById b queue msgId = do
getRedisMessage b queue msgId getRedisMessage b queue msgId
getMessageTimeout :: Broker RedisBroker a -> Queue -> MessageId RedisBroker -> IO (Maybe UnixTime)
getMessageTimeout (RedisBroker' { conn }) queue msgId = do
eData <- Redis.runRedis conn $ Redis.hget queueK (idToBS msgId)
case eData of
Left _ -> undefined
Right Nothing -> pure Nothing
Right (Just timeoutBs) -> do
case BSC.break (== '.') timeoutBs of
(s, ms) -> case (readMaybe $ BSC.unpack s, readMaybe $ BSC.unpack $ BSC.drop 1 ms) of
(Just s', Just ms') -> pure $ Just $ UnixTime (CTime s') ms'
_ -> pure Nothing
where
queueK = messageTimeoutKey queue
-- Helper functions for getting redis keys -- Helper functions for getting redis keys
-- | Redis counter is an 'Int', while sets can only store strings -- | Redis counter is an 'Int', while sets can only store strings
idToBS :: MessageId RedisBroker -> BS.ByteString idToBS :: MessageId RedisBroker -> BSC.ByteString
idToBS (RedisMid msgId) = BSL.toStrict $ Aeson.encode msgId idToBS (RedisMid msgId) = BSL.toStrict $ Aeson.encode msgId
bsToId :: BS.ByteString -> Maybe (MessageId RedisBroker) bsToId :: BSC.ByteString -> Maybe (MessageId RedisBroker)
bsToId bs = RedisMid <$> Aeson.decode (BSL.fromStrict bs) bsToId bs = RedisMid <$> Aeson.decode (BSL.fromStrict bs)
-- | A global prefix used for all keys -- | A global prefix used for all keys
...@@ -204,8 +238,8 @@ beePrefix :: String ...@@ -204,8 +238,8 @@ beePrefix :: String
beePrefix = "bee-" beePrefix = "bee-"
-- | Redis counter that returns message ids -- | Redis counter that returns message ids
idKey :: Queue -> BS.ByteString idKey :: Queue -> BSC.ByteString
idKey queue = BS.pack $ beePrefix <> "sequence-" <> renderQueue queue idKey queue = BSC.pack $ beePrefix <> "sequence-" <> renderQueue queue
nextId :: Broker RedisBroker a -> Queue -> IO (Maybe Int) nextId :: Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId (RedisBroker' { conn }) queue = do nextId (RedisBroker' { conn }) queue = do
...@@ -216,17 +250,20 @@ nextId (RedisBroker' { conn }) queue = do ...@@ -216,17 +250,20 @@ nextId (RedisBroker' { conn }) queue = do
_ -> return Nothing _ -> return Nothing
-- | Key under which a message is stored -- | Key under which a message is stored
messageKey :: Queue -> MessageId RedisBroker -> BS.ByteString messageKey :: Queue -> MessageId RedisBroker -> BSC.ByteString
messageKey queue (RedisMid msgId) = queueKey queue <> BS.pack ("-message-" <> show msgId) messageKey queue (RedisMid msgId) = queueKey queue <> BSC.pack ("-message-" <> show msgId)
-- | Key for storing the set of message ids in queue -- | Key for storing the set of message ids in queue
queueKey :: Queue -> BS.ByteString queueKey :: Queue -> BSC.ByteString
queueKey queue = BS.pack $ beePrefix <> "queue-" <> renderQueue queue queueKey queue = BSC.pack $ beePrefix <> "queue-" <> renderQueue queue
-- | Key for storing the set of message ids in archive -- | Key for storing the set of message ids in archive
archiveKey :: Queue -> BS.ByteString archiveKey :: Queue -> BSC.ByteString
archiveKey queue = BS.pack $ beePrefix <> "archive-" <> renderQueue queue archiveKey queue = BSC.pack $ beePrefix <> "archive-" <> renderQueue queue
-- | Key for storing message timeouts
messageTimeoutKey :: Queue -> BSC.ByteString
messageTimeoutKey queue = BSC.pack $ beePrefix <> "timeout-" <> renderQueue queue
getRedisMessage :: FromJSON a getRedisMessage :: FromJSON a
=> Broker RedisBroker a => Broker RedisBroker a
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment