diff --git a/gargantext.cabal b/gargantext.cabal index a890ac1ac5a5621206bfa9ebdc8e265e88a3f841..4812a5c95af2a25601d7d5e393cb34a907406932 100644 --- a/gargantext.cabal +++ b/gargantext.cabal @@ -929,4 +929,5 @@ test-suite jobqueue-test , servant-job , stm , text + , time default-language: Haskell2010 diff --git a/package.yaml b/package.yaml index 6bd18a3e117cff71e8355302db883d4b1c43b708..cfa93677230849cc2b9e185e607a5ab1934295eb 100644 --- a/package.yaml +++ b/package.yaml @@ -533,6 +533,7 @@ tests: - http-client-tls - servant-job - stm + - time # garg-doctest: # main: Main.hs # source-dirs: src-doctest diff --git a/src/Gargantext/Utils/Jobs/Map.hs b/src/Gargantext/Utils/Jobs/Map.hs index d2d4417c7816767e6bf599ec1ad97075ca519284..9ee65212dbe5d00eb2b4e4369b26a8deb5030e43 100644 --- a/src/Gargantext/Utils/Jobs/Map.hs +++ b/src/Gargantext/Utils/Jobs/Map.hs @@ -124,13 +124,13 @@ jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w) -- | Generating new 'JobEntry's. addJobEntry :: Ord jid - => jid + => UTCTime + -> jid -> a -> (jid -> a -> Logger w -> IO r) -> JobMap jid w r - -> IO (JobEntry jid w r) -addJobEntry jid input f (JobMap mvar) = do - now <- getCurrentTime + -> STM (JobEntry jid w r) +addJobEntry now jid input f (JobMap mvar) = do let je = JobEntry { jID = jid , jTask = QueuedJ (QueuedJob input (f jid)) @@ -139,8 +139,8 @@ addJobEntry jid input f (JobMap mvar) = do , jStarted = Nothing , jEnded = Nothing } - atomically $ modifyTVar' mvar (Map.insert jid je) - return je + modifyTVar' mvar (Map.insert jid je) + pure je deleteJob :: Ord jid => jid -> JobMap jid w a -> STM () deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid) diff --git a/src/Gargantext/Utils/Jobs/Queue.hs b/src/Gargantext/Utils/Jobs/Queue.hs index c4847317e62fc453c6f5d5c300fb5c5aca4440e6..18390ebf3c4f170177b5e1f07b3795fe7c3a4395 100644 --- a/src/Gargantext/Utils/Jobs/Queue.hs +++ b/src/Gargantext/Utils/Jobs/Queue.hs @@ -4,6 +4,7 @@ module Gargantext.Utils.Jobs.Queue where import Control.Concurrent import Control.Concurrent.STM import Control.Exception +import Control.Monad import Data.Function import Data.List import Data.Ord @@ -94,9 +95,9 @@ newQueue prios = do return $ Queue vars indices prios -- | Add a new element to the queue, with the given kind. -addQueue :: Ord t => t -> a -> Queue t a -> IO () +addQueue :: Ord t => t -> a -> Queue t a -> STM () addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of - Just i -> atomically $ modifyTVar (queueData q Vector.! i) (snocQ a) + Just i -> modifyTVar (queueData q Vector.! i) (snocQ a) Nothing -> error "addQueue: couldn't find queue for given job kind" deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM () @@ -104,6 +105,13 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a) Nothing -> error "deleteQueue: queue type not found?!" +-- | Dump the contents of the queue, for debugging purposes. +debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)] +debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do + readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t) + where + i t = fromJust $ Map.lookup t (queueIndices q) + debugDumpQ t (Q xs ys _) = return $ map (\x -> (t, x)) (xs ++ reverse ys) type Picker a = [(a, STM ())] -> STM (a, STM ()) @@ -125,7 +133,7 @@ popQueue picker q = atomically $ select prioLevels mres <- selectLevel level case mres of Nothing -> select levels - Just res -> return (Just res) + Just res -> pure $ Just res selectLevel :: [(t, Prio)] -> STM (Maybe a) selectLevel xs = do diff --git a/src/Gargantext/Utils/Jobs/State.hs b/src/Gargantext/Utils/Jobs/State.hs index b7c4f337fb8d3c5312f20e750e76e69616b9a28c..ebba3a589d38e73026eb9c900f5f3dbd69ca67e1 100644 --- a/src/Gargantext/Utils/Jobs/State.hs +++ b/src/Gargantext/Utils/Jobs/State.hs @@ -29,11 +29,10 @@ data JobsState t w a = JobsState , jsRunners :: [Async ()] } -nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe) -nextID js st = do - now <- getCurrentTime - n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1) - return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n +nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe) +nextID now js st = do + n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1) + pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n newJobsState :: forall t w a. @@ -72,6 +71,7 @@ newJobsState js prios = do return (jid, popjid) _3 (_, _, c) = c + pushJob :: Ord t => t @@ -80,8 +80,21 @@ pushJob -> JobSettings -> JobsState t w r -> IO (SJ.JobID 'SJ.Safe) -pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do - jid <- nextID js st - _je <- addJobEntry jid input f jmap +pushJob jobkind input f js st = do + now <- getCurrentTime + atomically $ pushJobWithTime now jobkind input f js st + +pushJobWithTime + :: Ord t + => UTCTime + -> t + -> a + -> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r) + -> JobSettings + -> JobsState t w r + -> STM (SJ.JobID 'SJ.Safe) +pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do + jid <- nextID now js st + _je <- addJobEntry now jid input f jmap addQueue jobkind jid jqueue - return jid + pure jid diff --git a/tests/queue/Main.hs b/tests/queue/Main.hs index 9c78a42bb8fc71ff27cf1ff7f868cde699bdce92..c8fd782eda6b697358b131621d050eaabb298094 100644 --- a/tests/queue/Main.hs +++ b/tests/queue/Main.hs @@ -16,6 +16,7 @@ import Data.Maybe import Data.Either import Data.List import Data.Sequence (Seq, (|>), fromList) +import Data.Time import GHC.Stack import Prelude import System.IO.Unsafe @@ -92,28 +93,28 @@ testMaxRunners = do testPrios :: IO () testPrios = do k <- genSecret - let settings = defaultJobSettings 2 k + -- Use a single runner, so that we can check the order of execution + -- without worrying about the runners competing with each other. + let settings = defaultJobSettings 1 k prios = [(B, 10), (C, 1), (D, 5)] - runningDelta job = fromMaybe 0 (lookup job prios) * 1000 st :: JobsState JobT [String] () <- newJobsState settings $ applyPrios prios defaultPrios -- B has the highest priority pickedSchedule <- newMVar (JobSchedule mempty) - let j jobt _jHandle _inp _l = do - -- simulate the running time of a job, then add to the schedule. - -- The running time is proportional to the priority of the job, - -- to account for the fact that we are pushing jobs sequentially, - -- so we have to our account for the submission time. - threadDelay $ jobDuration - runningDelta jobt - addJobToSchedule jobt pickedSchedule + let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule jobs = [ (A, j A) , (C, j C) , (B, j B) , (D, j D) ] - forM_ jobs $ \(t, f) -> void $ pushJob t () f settings st + + -- Push all the jobs in the same STM transaction, so that they are all stored in the queue by + -- the time 'popQueue' gets called. + now <- getCurrentTime + atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st + -- wait for the jobs to finish, waiting for more than the total duration, -- so that we are sure that all jobs have finished, then check the schedule. - threadDelay (5 * jobDuration) + threadDelay jobDuration finalSchedule <- readMVar pickedSchedule finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])