[redis] improve the redis queue

parent 0d008286
Pipeline #6462 passed with stages
in 10 minutes and 27 seconds
......@@ -8,6 +8,6 @@ packages:
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: 1bfa37fd1714bff74cf5a8f256a847ca52447dfc
tag: 268398735f61008af099918a24b3fb57f9533ba3
tests: true
......@@ -81,7 +81,6 @@ handleMessage state@(State { .. }) brokerMessage = do
ASArchive -> do
-- putStrLn $ formatStr state $ "archiving completed job " <> show msgId <> " (strategy: " <> show archiveStrategy <> ")"
archiveMessage broker queueName msgId
case mTimeout of
Just _ -> archiveHandler
......
......@@ -77,3 +77,7 @@ instance (SerializableMessage a, Show a) => HasBroker PGMQBroker a where
case mMetrics of
Nothing -> return 0
Just (PGMQ.Metrics { queueLength }) -> return queueLength
getArchivedMessage (PGMQBroker' { conn }) queue (PGMQMid msgId) = do
mMsg <- PGMQ.readMessageFromArchive conn queue msgId
pure $ PGMQBM <$> mMsg
......@@ -9,6 +9,13 @@ Portability : POSIX
Based on lists:
https://redis.io/glossary/redis-queue/
The design is as follows:
- for each queue we have an 'id counter'
- each queue is represented as a list of message ids
- each message is stored under unique key, derived from its id
- the above allows us to have an archive with messages
- deleting a message means removing it's unique key from Redis
-}
......@@ -23,10 +30,11 @@ module Async.Worker.Broker.Redis
, RedisWithMsgId(..) )
where
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object)
import Async.Worker.Broker.Types (HasBroker(..), SerializableMessage)
import Async.Worker.Broker.Types (HasBroker(..), Queue, SerializableMessage)
import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Data.Aeson qualified as Aeson
import Data.Aeson (FromJSON(..), ToJSON(..), (.:), (.=), withObject, object)
import Data.ByteString.Char8 qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Database.Redis qualified as Redis
......@@ -64,47 +72,130 @@ instance (SerializableMessage a, Show a) => HasBroker RedisBroker a where
return ()
-- dropQueue (RedisBroker' { conn }) queue = do
dropQueue _broker _queue = do
-- We don't care about this
return ()
dropQueue (RedisBroker' { conn }) queue = do
let queueK = queueKey queue
void $ Redis.runRedis conn $ Redis.del [queueK]
readMessageWaiting q@(RedisBroker' { conn }) queue = loop
readMessageWaiting b@(RedisBroker' { conn }) queue = loop
where
queueK = queueKey queue
loop = do
eMsg <- Redis.runRedis conn $ Redis.blpop [BS.pack queue] 10
case eMsg of
eMsgId <- Redis.runRedis conn $ Redis.spop queueK
case eMsgId of
Left _ -> undefined
Right Nothing -> readMessageWaiting q queue
Right (Just (_queue, msg)) -> case Aeson.decode (BSL.fromStrict msg) of
Just dmsg -> return $ RedisBM dmsg
Right Nothing -> do
threadDelay 100
readMessageWaiting b queue
Right (Just msgIdBS) -> case bsToId msgIdBS of
Nothing -> undefined
sendMessage (RedisBroker' { conn }) queue (RedisM message) = do
let key = "key-" <> queue
eId <- Redis.runRedis conn $ Redis.incr $ BS.pack key
case eId of
Left _err -> undefined
Right id' -> do
let m = RedisWithMsgId { rmidId = fromIntegral id', rmida = message }
void $ Redis.runRedis conn $ Redis.lpush (BS.pack queue) [BSL.toStrict $ Aeson.encode m]
Just msgId -> do
mMsg <- getRedisMessage b queue msgId
case mMsg of
Nothing -> undefined
Just msg -> return msg
sendMessage b@(RedisBroker' { conn }) queue (RedisM message) = do
mId <- nextId b queue
case mId of
Nothing -> undefined
Just id' -> do
let msgId = RedisMid id'
let m = RedisWithMsgId { rmidId = id', rmida = message }
let msgK = messageKey queue msgId
let queueK = queueKey queue
void $ Redis.runRedis conn $ do
_ <- Redis.set msgK (BSL.toStrict $ Aeson.encode m)
Redis.sadd queueK [idToBS msgId]
-- deleteMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
deleteMessage _broker _queue _msgId = do
-- Nothing
return ()
deleteMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue
void $ Redis.runRedis conn $ Redis.srem queueK [idToBS msgId]
let messageK = messageKey queue msgId
void $ Redis.runRedis conn $ Redis.del [messageK]
-- archiveMessage (RedisBroker' { conn }) queue (RedisMid msgId) = do
archiveMessage _broker _queue _msgId = do
-- Nothing
return ()
archiveMessage (RedisBroker' { conn }) queue msgId = do
let queueK = queueKey queue
let archiveK = archiveKey queue
eMove <- Redis.runRedis conn $ Redis.smove queueK archiveK (idToBS msgId)
case eMove of
Left _ -> undefined
Right True -> return ()
Right False -> do
-- OK so the queue might not have the id, we just add it to archive to make sure
void $ Redis.runRedis conn $ Redis.sadd archiveK [idToBS msgId]
getQueueSize (RedisBroker' { conn }) queue = do
eLen <- Redis.runRedis conn $ Redis.llen (BS.pack queue)
let queueK = queueKey queue
eLen <- Redis.runRedis conn $ Redis.scard queueK
case eLen of
Right len -> return $ fromIntegral len
Left _ -> return 0
getArchivedMessage b@(RedisBroker' { conn }) queue msgId = do
let archiveK = archiveKey queue
eIsMember <- Redis.runRedis conn $ Redis.sismember archiveK (idToBS msgId)
case eIsMember of
Right True -> do
getRedisMessage b queue msgId
_ -> return Nothing
-- Helper functions for getting redis keys
-- | Redis counter is an 'Int', while sets can only store strings
idToBS :: MessageId RedisBroker -> BS.ByteString
idToBS (RedisMid msgId) = BSL.toStrict $ Aeson.encode msgId
bsToId :: BS.ByteString -> Maybe (MessageId RedisBroker)
bsToId bs = RedisMid <$> Aeson.decode (BSL.fromStrict bs)
-- | A global prefix used for all keys
beePrefix :: String
beePrefix = "bee-"
-- | Redis counter that returns message ids
idKey :: Queue -> BS.ByteString
idKey queue = BS.pack $ beePrefix <> "key-" <> queue
nextId :: Broker RedisBroker a -> Queue -> IO (Maybe Int)
nextId (RedisBroker' { conn }) queue = do
let key = idKey queue
eId <- Redis.runRedis conn $ Redis.incr key
case eId of
Right id' -> return (Just $ fromInteger id')
_ -> return Nothing
-- | Key under which a message is stored
messageKey :: Queue -> MessageId RedisBroker -> BS.ByteString
messageKey queue (RedisMid msgId) = BS.pack $ beePrefix <> "queue-" <> queue <> "-message-" <> show msgId
getRedisMessage :: FromJSON a
=> Broker RedisBroker a
-> Queue
-> MessageId RedisBroker
-> IO (Maybe (BrokerMessage RedisBroker a))
getRedisMessage (RedisBroker' { conn }) queue msgId = do
let msgKey = messageKey queue msgId
eMsg <- Redis.runRedis conn $ Redis.get msgKey
case eMsg of
Left _ -> return Nothing
Right Nothing -> return Nothing
Right (Just msg) ->
case Aeson.decode (BSL.fromStrict msg) of
Just dmsg -> return $ Just $ RedisBM dmsg
Nothing -> return Nothing
-- | Key for storing the set of message ids in queue
queueKey :: Queue -> BS.ByteString
queueKey queue = BS.pack $ beePrefix <> "queue-" <> queue
-- | Key for storing the set of message ids in archive
archiveKey :: Queue -> BS.ByteString
archiveKey queue = BS.pack $ beePrefix <> "archive-" <> queue
-- | Helper datatype to store message with a unique id.
-- We fetch the id by using 'INCR'
-- https://redis.io/docs/latest/commands/incr/
......
......@@ -131,3 +131,6 @@ class (
{-| Queue size -}
getQueueSize :: Broker b a -> Queue -> IO Int
{-| Read archived message -}
getArchivedMessage :: Broker b a -> Queue -> MessageId b -> IO (Maybe (BrokerMessage b a))
......@@ -17,9 +17,10 @@ import Async.Worker.Broker.Redis qualified as Redis
import Async.Worker.Broker.Types qualified as BT
import Control.Exception (bracket)
import Data.Aeson (ToJSON(..), FromJSON(..), withText)
import Data.Maybe (isJust)
import Data.Text qualified as T
import Test.Hspec
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName)
import Test.Integration.Utils (getPSQLEnvConnectInfo, getRedisEnvConnectInfo, randomQueueName, waitUntil)
data TestEnv b =
......@@ -70,15 +71,25 @@ brokerTests :: (BT.HasBroker b Message)
brokerTests bInitParams =
parallel $ around (withBroker bInitParams) $ describe "Broker tests" $ do
it "can send and receive a message" $ \(TestEnv { broker, queue }) -> do
BT.dropQueue broker queue
BT.createQueue broker queue
let msg = Message { text = "test" }
BT.sendMessage broker queue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting broker queue
putStrLn $ "[messageId] " <> show (BT.messageId msg2)
msg `shouldBe` (BT.toA $ BT.getMessage msg2)
-- putStrLn $ "[messageId] " <> show (BT.messageId msg2)
msg `shouldBe` BT.toA (BT.getMessage msg2)
it "can send, archive and read message from archive" $ \(TestEnv { broker, queue }) -> do
let msg = Message { text = "test" }
BT.sendMessage broker queue (BT.toMessage msg)
msg2 <- BT.readMessageWaiting broker queue
let msgId = BT.messageId msg2
BT.archiveMessage broker queue msgId
putStrLn $ "Reading msg " <> show msgId <> " from archive queue " <> queue
-- It might take some time to archive a message so we wait a bit
waitUntil (isJust <$> BT.getArchivedMessage broker queue msgId) 200
msgArchive <- BT.getArchivedMessage broker queue msgId
let msgIdArchive = BT.messageId <$> msgArchive
msgIdArchive `shouldBe` Just msgId
pgmqBrokerInitParams :: IO (BT.BrokerInitParams PGMQ.PGMQBroker Message)
pgmqBrokerInitParams = do
......
......@@ -17,7 +17,7 @@ import Database.Redis qualified as Redis
import System.Environment (lookupEnv)
import System.Timeout qualified as Timeout
import Test.Hspec (expectationFailure, shouldBe, shouldSatisfy, Expectation, HasCallStack)
import Test.RandomStrings (randomASCII, randomString, onlyAlphaNum)
import Test.RandomStrings (randomASCII, randomString, onlyLower)
-- | PSQL connect info that is fetched from env
......@@ -43,7 +43,7 @@ getRedisEnvConnectInfo = do
-- | Given a queue prefix, add a random suffix to create a queue name
randomQueueName :: B.Queue -> IO B.Queue
randomQueueName prefix = do
postfix <- randomString (onlyAlphaNum randomASCII) 10
postfix <- randomString (onlyLower randomASCII) 10
return $ prefix <> "_" <> postfix
-- | Given a predicate IO action, test it for given number of
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment