Commit 862391be authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Add pushJobWithTime

parent d721af08
Pipeline #3897 failed with stage
in 31 minutes and 34 seconds
...@@ -5,7 +5,7 @@ cabal-version: 1.12 ...@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack -- see: https://github.com/sol/hpack
name: gargantext name: gargantext
version: 0.0.6.9.8.7 version: 0.0.6.9.8.7
synopsis: Search, map, share synopsis: Search, map, share
description: Please see README.md description: Please see README.md
category: Data category: Data
...@@ -929,4 +929,5 @@ test-suite jobqueue-test ...@@ -929,4 +929,5 @@ test-suite jobqueue-test
, servant-job , servant-job
, stm , stm
, text , text
, time
default-language: Haskell2010 default-language: Haskell2010
...@@ -533,6 +533,7 @@ tests: ...@@ -533,6 +533,7 @@ tests:
- http-client-tls - http-client-tls
- servant-job - servant-job
- stm - stm
- time
# garg-doctest: # garg-doctest:
# main: Main.hs # main: Main.hs
# source-dirs: src-doctest # source-dirs: src-doctest
......
...@@ -124,13 +124,13 @@ jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w) ...@@ -124,13 +124,13 @@ jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
-- | Generating new 'JobEntry's. -- | Generating new 'JobEntry's.
addJobEntry addJobEntry
:: Ord jid :: Ord jid
=> jid => UTCTime
-> jid
-> a -> a
-> (jid -> a -> Logger w -> IO r) -> (jid -> a -> Logger w -> IO r)
-> JobMap jid w r -> JobMap jid w r
-> IO (JobEntry jid w r) -> STM (JobEntry jid w r)
addJobEntry jid input f (JobMap mvar) = do addJobEntry now jid input f (JobMap mvar) = do
now <- getCurrentTime
let je = JobEntry let je = JobEntry
{ jID = jid { jID = jid
, jTask = QueuedJ (QueuedJob input (f jid)) , jTask = QueuedJ (QueuedJob input (f jid))
...@@ -139,8 +139,8 @@ addJobEntry jid input f (JobMap mvar) = do ...@@ -139,8 +139,8 @@ addJobEntry jid input f (JobMap mvar) = do
, jStarted = Nothing , jStarted = Nothing
, jEnded = Nothing , jEnded = Nothing
} }
atomically $ modifyTVar' mvar (Map.insert jid je) modifyTVar' mvar (Map.insert jid je)
return je pure je
deleteJob :: Ord jid => jid -> JobMap jid w a -> STM () deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid) deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
......
...@@ -4,6 +4,7 @@ module Gargantext.Utils.Jobs.Queue where ...@@ -4,6 +4,7 @@ module Gargantext.Utils.Jobs.Queue where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception import Control.Exception
import Control.Monad
import Data.Function import Data.Function
import Data.List import Data.List
import Data.Ord import Data.Ord
...@@ -94,9 +95,9 @@ newQueue prios = do ...@@ -94,9 +95,9 @@ newQueue prios = do
return $ Queue vars indices prios return $ Queue vars indices prios
-- | Add a new element to the queue, with the given kind. -- | Add a new element to the queue, with the given kind.
addQueue :: Ord t => t -> a -> Queue t a -> IO () addQueue :: Ord t => t -> a -> Queue t a -> STM ()
addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> atomically $ modifyTVar (queueData q Vector.! i) (snocQ a) Just i -> modifyTVar (queueData q Vector.! i) (snocQ a)
Nothing -> error "addQueue: couldn't find queue for given job kind" Nothing -> error "addQueue: couldn't find queue for given job kind"
deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM () deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
...@@ -104,6 +105,13 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of ...@@ -104,6 +105,13 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a) Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
Nothing -> error "deleteQueue: queue type not found?!" Nothing -> error "deleteQueue: queue type not found?!"
-- | Dump the contents of the queue, for debugging purposes.
debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)]
debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do
readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t)
where
i t = fromJust $ Map.lookup t (queueIndices q)
debugDumpQ t (Q xs ys _) = return $ map (\x -> (t, x)) (xs ++ reverse ys)
type Picker a = [(a, STM ())] -> STM (a, STM ()) type Picker a = [(a, STM ())] -> STM (a, STM ())
...@@ -125,7 +133,7 @@ popQueue picker q = atomically $ select prioLevels ...@@ -125,7 +133,7 @@ popQueue picker q = atomically $ select prioLevels
mres <- selectLevel level mres <- selectLevel level
case mres of case mres of
Nothing -> select levels Nothing -> select levels
Just res -> return (Just res) Just res -> pure $ Just res
selectLevel :: [(t, Prio)] -> STM (Maybe a) selectLevel :: [(t, Prio)] -> STM (Maybe a)
selectLevel xs = do selectLevel xs = do
......
...@@ -29,11 +29,10 @@ data JobsState t w a = JobsState ...@@ -29,11 +29,10 @@ data JobsState t w a = JobsState
, jsRunners :: [Async ()] , jsRunners :: [Async ()]
} }
nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe) nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe)
nextID js st = do nextID now js st = do
now <- getCurrentTime n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1)
n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1) pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
newJobsState newJobsState
:: forall t w a. :: forall t w a.
...@@ -72,6 +71,7 @@ newJobsState js prios = do ...@@ -72,6 +71,7 @@ newJobsState js prios = do
return (jid, popjid) return (jid, popjid)
_3 (_, _, c) = c _3 (_, _, c) = c
pushJob pushJob
:: Ord t :: Ord t
=> t => t
...@@ -80,8 +80,21 @@ pushJob ...@@ -80,8 +80,21 @@ pushJob
-> JobSettings -> JobSettings
-> JobsState t w r -> JobsState t w r
-> IO (SJ.JobID 'SJ.Safe) -> IO (SJ.JobID 'SJ.Safe)
pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do pushJob jobkind input f js st = do
jid <- nextID js st now <- getCurrentTime
_je <- addJobEntry jid input f jmap atomically $ pushJobWithTime now jobkind input f js st
pushJobWithTime
:: Ord t
=> UTCTime
-> t
-> a
-> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> STM (SJ.JobID 'SJ.Safe)
pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
jid <- nextID now js st
_je <- addJobEntry now jid input f jmap
addQueue jobkind jid jqueue addQueue jobkind jid jqueue
return jid pure jid
...@@ -16,6 +16,7 @@ import Data.Maybe ...@@ -16,6 +16,7 @@ import Data.Maybe
import Data.Either import Data.Either
import Data.List import Data.List
import Data.Sequence (Seq, (|>), fromList) import Data.Sequence (Seq, (|>), fromList)
import Data.Time
import GHC.Stack import GHC.Stack
import Prelude import Prelude
import System.IO.Unsafe import System.IO.Unsafe
...@@ -92,28 +93,28 @@ testMaxRunners = do ...@@ -92,28 +93,28 @@ testMaxRunners = do
testPrios :: IO () testPrios :: IO ()
testPrios = do testPrios = do
k <- genSecret k <- genSecret
let settings = defaultJobSettings 2 k -- Use a single runner, so that we can check the order of execution
-- without worrying about the runners competing with each other.
let settings = defaultJobSettings 1 k
prios = [(B, 10), (C, 1), (D, 5)] prios = [(B, 10), (C, 1), (D, 5)]
runningDelta job = fromMaybe 0 (lookup job prios) * 1000
st :: JobsState JobT [String] () <- newJobsState settings $ st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios prios defaultPrios -- B has the highest priority applyPrios prios defaultPrios -- B has the highest priority
pickedSchedule <- newMVar (JobSchedule mempty) pickedSchedule <- newMVar (JobSchedule mempty)
let j jobt _jHandle _inp _l = do let j jobt _jHandle _inp _l = addJobToSchedule jobt pickedSchedule
-- simulate the running time of a job, then add to the schedule.
-- The running time is proportional to the priority of the job,
-- to account for the fact that we are pushing jobs sequentially,
-- so we have to our account for the submission time.
threadDelay $ jobDuration - runningDelta jobt
addJobToSchedule jobt pickedSchedule
jobs = [ (A, j A) jobs = [ (A, j A)
, (C, j C) , (C, j C)
, (B, j B) , (B, j B)
, (D, j D) , (D, j D)
] ]
forM_ jobs $ \(t, f) -> void $ pushJob t () f settings st
-- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
-- the time 'popQueue' gets called.
now <- getCurrentTime
atomically $ forM_ jobs $ \(t, f) -> void $ pushJobWithTime now t () f settings st
-- wait for the jobs to finish, waiting for more than the total duration, -- wait for the jobs to finish, waiting for more than the total duration,
-- so that we are sure that all jobs have finished, then check the schedule. -- so that we are sure that all jobs have finished, then check the schedule.
threadDelay (5 * jobDuration) threadDelay jobDuration
finalSchedule <- readMVar pickedSchedule finalSchedule <- readMVar pickedSchedule
finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A]) finalSchedule `shouldBe` JobSchedule (fromList [B, D, C, A])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment