Commit ea44909e authored by Alp Mestanogullari's avatar Alp Mestanogullari

improve fairness of job queue when faced with equal priority items

parent 0a919635
Pipeline #3295 passed with stage
in 92 minutes and 40 seconds
{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.Queue where module Gargantext.Utils.Jobs.Queue where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception import Control.Exception
import Data.Function
import Data.List import Data.List
import Data.Ord import Data.Ord
import Data.Maybe import Data.Maybe
...@@ -51,6 +52,17 @@ popQ q@(Q as bs sz) = case as of ...@@ -51,6 +52,17 @@ popQ q@(Q as bs sz) = case as of
sizeQ :: Q a -> Int sizeQ :: Q a -> Int
sizeQ (Q _ _ sz) = sz sizeQ (Q _ _ sz) = sz
peekQ :: Q a -> Maybe a
peekQ (Q _ _ 0) = Nothing
peekQ q = case normalizeQ q of
Q (x:_) _ _ -> Just x
_ -> Nothing
dropQ :: Q a -> Q a
dropQ (Q [] [] _) = Q [] [] 0
dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
-- | A priority is just a number. The greater, the earlier the job will get picked. -- | A priority is just a number. The greater, the earlier the job will get picked.
type Prio = Int type Prio = Int
...@@ -92,35 +104,55 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of ...@@ -92,35 +104,55 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a) Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
Nothing -> error "deleteQueue: queue type not found?!" Nothing -> error "deleteQueue: queue type not found?!"
-- | Try to pop the highest priority item off of the queue, per the priorities
-- defined by the @'Map.Map' t 'Prio'@ argument to 'newQueue'. type Picker a = [(a, STM ())] -> STM (a, STM ())
popQueue :: Ord t => Queue t a -> IO (Maybe a)
popQueue q = go queues -- | Figure out the candidates for being popped from the various queues.
-- We always look at highest priority queues first, and will pick between
where prios = sortOn (Down . snd) $ Map.toList (queuePrios q) -- equal priority items of different queues (candidates, elements of the
indices = flip map prios $ \(t, _prio) -> -- returned lists) by choosing the one that was queued first.
case Map.lookup t (queueIndices q) of popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
Just i -> i popQueue picker q = atomically $ select prioLevels
Nothing -> error "popQueue: couldn't find queue index for given job kind"
queues = [ queueData q Vector.! i | i <- indices ] where -- TODO: cache this in the 'Queue' data structure?
go [] = return Nothing prioLevels :: [[(t, Prio)]]
go (q1:qs) = do prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
mitem <- atomically $ do Map.toList (queuePrios q)
qa <- readTVar q1
case popQ qa of select :: [[(t, Prio)]] -> STM (Maybe a)
Just (a, qa') -> writeTVar q1 qa' >> return (Just a) select [] = return Nothing
Nothing -> return Nothing select (level:levels) = do
case mitem of mres <- selectLevel level
Nothing -> go qs case mres of
a -> return a Nothing -> select levels
Just res -> return (Just res)
selectLevel :: [(t, Prio)] -> STM (Maybe a)
selectLevel xs = do
let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
queues = map (queueData q Vector.!) indices
go qvar = readTVar qvar >>= \qu ->
return (peekQ qu, modifyTVar' qvar dropQ)
mtopItems <- catMaybesFst <$> traverse go queues
case mtopItems of
Nothing -> return Nothing
Just [] -> return Nothing
Just topItems -> do
(earliestItem, popItem) <- picker topItems
popItem
return (Just earliestItem)
catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
catMaybesFst [] = Just []
-- | A ready-to-use runner that pops the highest priority item off the queue -- | A ready-to-use runner that pops the highest priority item off the queue
-- and processes it using the given function. -- and processes it using the given function.
queueRunner :: Ord t => (a -> IO ()) -> Queue t a -> IO () queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
queueRunner f q = go queueRunner picker f q = go
where go = do where go = do
mres <- popQueue q mres <- popQueue picker q
case mres of case mres of
Just a -> f a `catch` exc Just a -> f a `catch` exc
Nothing -> return () Nothing -> return ()
...@@ -136,9 +168,10 @@ newQueueWithRunners ...@@ -136,9 +168,10 @@ newQueueWithRunners
:: EnumBounded t :: EnumBounded t
=> Int -- ^ number of runners => Int -- ^ number of runners
-> Map.Map t Prio -- ^ priorities -> Map.Map t Prio -- ^ priorities
-> Picker a -- ^ how to pick between equal priority items
-> (a -> IO ()) -- ^ what to do with each item -> (a -> IO ()) -- ^ what to do with each item
-> IO (Queue t a, [IO ()]) -> IO (Queue t a, [IO ()])
newQueueWithRunners n prios f = do newQueueWithRunners n prios picker f = do
q <- newQueue prios q <- newQueue prios
let runners = replicate n (queueRunner f q) let runners = replicate n (queueRunner picker f q)
return (q, runners) return (q, runners)
...@@ -6,12 +6,16 @@ import Gargantext.Utils.Jobs.Settings ...@@ -6,12 +6,16 @@ import Gargantext.Utils.Jobs.Settings
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Monad
import Data.List
import Data.Map (Map) import Data.Map (Map)
import Data.Maybe
import Data.Ord
import Data.Proxy import Data.Proxy
import Data.Time.Clock import Data.Time.Clock
import Prelude import Prelude
-- import qualified Data.Map as Map import qualified Data.Map as Map
import qualified Servant.Job.Core as SJ import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ import qualified Servant.Job.Types as SJ
...@@ -32,14 +36,15 @@ nextID js st = do ...@@ -32,14 +36,15 @@ nextID js st = do
return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
newJobsState newJobsState
:: (EnumBounded t, Monoid w) :: forall t w a.
(EnumBounded t, Monoid w)
=> JobSettings => JobSettings
-> Map t Prio -> Map t Prio
-> IO (JobsState t w a) -> IO (JobsState t w a)
newJobsState js prios = do newJobsState js prios = do
jmap <- newJobMap jmap <- newJobMap
idgen <- newTVarIO 0 idgen <- newTVarIO 0
(q, runners) <- newQueueWithRunners (jsNumRunners js) prios $ \jid -> do (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
mje <- lookupJob jid jmap mje <- lookupJob jid jmap
case mje of case mje of
Nothing -> return () Nothing -> return ()
...@@ -54,6 +59,19 @@ newJobsState js prios = do ...@@ -54,6 +59,19 @@ newJobsState js prios = do
runnersAsyncs <- traverse async runners runnersAsyncs <- traverse async runners
return (JobsState jmap q idgen gcAsync runnersAsyncs) return (JobsState jmap q idgen gcAsync runnersAsyncs)
where picker
:: JobMap (SJ.JobID 'SJ.Safe) w a
-> Picker (SJ.JobID 'SJ.Safe)
picker (JobMap jmap) xs = do
jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
mje <- Map.lookup jid <$> readTVar jmap
case mje of
Nothing -> return Nothing
Just je -> return $ Just (jid, popjid, jRegistered je)
let (jid, popjid, _) = minimumBy (comparing _3) jinfos
return (jid, popjid)
_3 (_, _, c) = c
pushJob pushJob
:: Ord t :: Ord t
=> t => t
......
...@@ -26,8 +26,8 @@ dec A cs = cs { countAs = countAs cs - 1 } ...@@ -26,8 +26,8 @@ dec A cs = cs { countAs = countAs cs - 1 }
dec B cs = cs { countBs = countBs cs - 1 } dec B cs = cs { countBs = countBs cs - 1 }
jobDuration, initialDelay :: Int jobDuration, initialDelay :: Int
jobDuration = 100000 -- 100ms jobDuration = 100000
initialDelay = 30000 -- 10ms initialDelay = 20000
testMaxRunners :: IO () testMaxRunners :: IO ()
testMaxRunners = do testMaxRunners = do
...@@ -100,7 +100,7 @@ testExceptions = do ...@@ -100,7 +100,7 @@ testExceptions = do
testFairness :: IO () testFairness :: IO ()
testFairness = do testFairness = do
k <- genSecret k <- genSecret
let settings = defaultJobSettings k let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO (Counts 0 0) runningJs <- newTVarIO (Counts 0 0)
let j jobt _inp _l = do let j jobt _inp _l = do
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment