From ea44909ee1d1076ad6c975d0c22c2c7238e90e15 Mon Sep 17 00:00:00 2001
From: Alp Mestanogullari <alp@well-typed.com>
Date: Tue, 18 Oct 2022 19:09:00 +0200
Subject: [PATCH] improve fairness of job queue when faced with equal priority
 items

---
 src/Gargantext/Utils/Jobs/Queue.hs | 87 ++++++++++++++++++++----------
 src/Gargantext/Utils/Jobs/State.hs | 24 +++++++--
 tests/queue/Main.hs                |  6 +--
 3 files changed, 84 insertions(+), 33 deletions(-)

diff --git a/src/Gargantext/Utils/Jobs/Queue.hs b/src/Gargantext/Utils/Jobs/Queue.hs
index 3526a43a..c4847317 100644
--- a/src/Gargantext/Utils/Jobs/Queue.hs
+++ b/src/Gargantext/Utils/Jobs/Queue.hs
@@ -1,9 +1,10 @@
-{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
 module Gargantext.Utils.Jobs.Queue where
 
 import Control.Concurrent
 import Control.Concurrent.STM
 import Control.Exception
+import Data.Function
 import Data.List
 import Data.Ord
 import Data.Maybe
@@ -51,6 +52,17 @@ popQ q@(Q as bs sz) = case as of
 sizeQ :: Q a -> Int
 sizeQ (Q _ _ sz) = sz
 
+peekQ :: Q a -> Maybe a
+peekQ (Q _ _ 0) = Nothing
+peekQ q = case normalizeQ q of
+  Q (x:_) _ _ -> Just x
+  _           -> Nothing
+
+dropQ :: Q a -> Q a
+dropQ (Q [] [] _) = Q [] [] 0
+dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
+dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
+
 -- | A priority is just a number. The greater, the earlier the job will get picked.
 type Prio = Int
 
@@ -92,35 +104,55 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
   Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
   Nothing -> error "deleteQueue: queue type not found?!"
 
--- | Try to pop the highest priority item off of the queue, per the priorities
---   defined by the @'Map.Map' t 'Prio'@ argument to 'newQueue'.
-popQueue :: Ord t => Queue t a -> IO (Maybe a)
-popQueue q = go queues
-
-  where prios = sortOn (Down . snd) $ Map.toList (queuePrios q)
-        indices = flip map prios $ \(t, _prio) ->
-          case Map.lookup t (queueIndices q) of
-            Just i -> i
-            Nothing -> error "popQueue: couldn't find queue index for given job kind"
-        queues = [ queueData q Vector.! i | i <- indices ]
-        go [] = return Nothing
-        go (q1:qs) = do
-          mitem <- atomically $ do
-            qa <- readTVar q1
-            case popQ qa of
-              Just (a, qa') -> writeTVar q1 qa' >> return (Just a)
-              Nothing       -> return Nothing
-          case mitem of
-            Nothing -> go qs
-            a  -> return a
+
+type Picker a = [(a, STM ())] -> STM (a, STM ())
+
+-- | Figure out the candidates for being popped from the various queues.
+--   We always look at highest priority queues first, and will pick between
+--   equal priority items of different queues (candidates, elements of the
+--   returned lists) by choosing the one that was queued first.
+popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
+popQueue picker q = atomically $ select prioLevels
+
+  where -- TODO: cache this in the 'Queue' data structure?
+        prioLevels :: [[(t, Prio)]]
+        prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
+          Map.toList (queuePrios q)
+
+        select :: [[(t, Prio)]] -> STM (Maybe a)
+        select [] = return Nothing
+        select (level:levels) = do
+          mres <- selectLevel level
+          case mres of
+            Nothing  -> select levels
+            Just res -> return (Just res)
+
+        selectLevel :: [(t, Prio)] -> STM (Maybe a)
+        selectLevel xs = do
+          let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
+              queues  = map (queueData q Vector.!) indices
+              go qvar = readTVar qvar >>= \qu ->
+                return (peekQ qu, modifyTVar' qvar dropQ)
+          mtopItems <- catMaybesFst <$> traverse go queues
+          case mtopItems of
+            Nothing -> return Nothing
+            Just [] -> return Nothing
+            Just topItems -> do
+              (earliestItem, popItem) <- picker topItems
+              popItem
+              return (Just earliestItem)
+
+        catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
+        catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
+        catMaybesFst [] = Just []
 
 -- | A ready-to-use runner that pops the highest priority item off the queue
 --   and processes it using the given function.
-queueRunner :: Ord t => (a -> IO ()) -> Queue t a -> IO ()
-queueRunner f q = go
+queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
+queueRunner picker f q = go
 
   where go = do
-          mres <- popQueue q
+          mres <- popQueue picker q
           case mres of
             Just a -> f a `catch` exc
             Nothing -> return ()
@@ -136,9 +168,10 @@ newQueueWithRunners
   :: EnumBounded t
   => Int -- ^ number of runners
   -> Map.Map t Prio -- ^ priorities
+  -> Picker a  -- ^ how to pick between equal priority items
   -> (a -> IO ()) -- ^ what to do with each item
   -> IO (Queue t a, [IO ()])
-newQueueWithRunners n prios f = do
+newQueueWithRunners n prios picker f = do
   q <- newQueue prios
-  let runners = replicate n (queueRunner f q)
+  let runners = replicate n (queueRunner picker f q)
   return (q, runners)
diff --git a/src/Gargantext/Utils/Jobs/State.hs b/src/Gargantext/Utils/Jobs/State.hs
index 29d337c0..586cd023 100644
--- a/src/Gargantext/Utils/Jobs/State.hs
+++ b/src/Gargantext/Utils/Jobs/State.hs
@@ -6,12 +6,16 @@ import Gargantext.Utils.Jobs.Settings
 
 import Control.Concurrent.Async
 import Control.Concurrent.STM
+import Control.Monad
+import Data.List
 import Data.Map (Map)
+import Data.Maybe
+import Data.Ord
 import Data.Proxy
 import Data.Time.Clock
 import Prelude
 
--- import qualified Data.Map as Map
+import qualified Data.Map as Map
 import qualified Servant.Job.Core as SJ
 import qualified Servant.Job.Types as SJ
 
@@ -32,14 +36,15 @@ nextID js st = do
   return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
 
 newJobsState
-  :: (EnumBounded t, Monoid w)
+  :: forall t w a.
+     (EnumBounded t, Monoid w)
   => JobSettings
   -> Map t Prio
   -> IO (JobsState t w a)
 newJobsState js prios = do
   jmap <- newJobMap
   idgen <- newTVarIO 0
-  (q, runners) <- newQueueWithRunners (jsNumRunners js) prios $ \jid -> do
+  (q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
     mje <- lookupJob jid jmap
     case mje of
       Nothing -> return ()
@@ -54,6 +59,19 @@ newJobsState js prios = do
   runnersAsyncs <- traverse async runners
   return (JobsState jmap q idgen gcAsync runnersAsyncs)
 
+  where picker
+          :: JobMap (SJ.JobID 'SJ.Safe) w a
+          -> Picker (SJ.JobID 'SJ.Safe)
+        picker (JobMap jmap) xs = do
+          jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
+            mje <- Map.lookup jid <$> readTVar jmap
+            case mje of
+              Nothing -> return Nothing
+              Just je -> return $ Just (jid, popjid, jRegistered je)
+          let (jid, popjid, _) = minimumBy (comparing _3) jinfos
+          return (jid, popjid)
+
+        _3 (_, _, c) = c
 pushJob
   :: Ord t
   => t
diff --git a/tests/queue/Main.hs b/tests/queue/Main.hs
index c48027a6..f2d3d3af 100644
--- a/tests/queue/Main.hs
+++ b/tests/queue/Main.hs
@@ -26,8 +26,8 @@ dec A cs = cs { countAs = countAs cs - 1 }
 dec B cs = cs { countBs = countBs cs - 1 }
 
 jobDuration, initialDelay :: Int
-jobDuration = 100000 -- 100ms
-initialDelay = 30000 -- 10ms
+jobDuration = 100000
+initialDelay = 20000
 
 testMaxRunners :: IO ()
 testMaxRunners = do
@@ -100,7 +100,7 @@ testExceptions = do
 testFairness :: IO ()
 testFairness = do
   k <- genSecret
-  let settings = defaultJobSettings k
+  let settings = defaultJobSettings 2 k
   st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
   runningJs <- newTVarIO (Counts 0 0)
   let j jobt _inp _l = do
-- 
2.21.0