Commit f2338519 authored by Alexandre Delanoë's avatar Alexandre Delanoë

Merge remote-tracking branch 'origin/adinapoli/issue-198-part-2' into dev-merge

parents edfce2a2 8664117b
...@@ -1211,7 +1211,7 @@ constraints: any.AC-Angle ==1.0, ...@@ -1211,7 +1211,7 @@ constraints: any.AC-Angle ==1.0,
any.hspec-contrib ==0.5.1, any.hspec-contrib ==0.5.1,
any.hspec-core ==2.7.10, any.hspec-core ==2.7.10,
any.hspec-discover ==2.7.10, any.hspec-discover ==2.7.10,
any.hspec-expectations ==0.8.2, any.hspec-expectations ==0.8.3,
any.hspec-expectations-json ==1.0.0.4, any.hspec-expectations-json ==1.0.0.4,
any.hspec-expectations-lifted ==0.10.0, any.hspec-expectations-lifted ==0.10.0,
any.hspec-expectations-pretty-diff ==0.7.2.6, any.hspec-expectations-pretty-diff ==0.7.2.6,
...@@ -2019,6 +2019,7 @@ constraints: any.AC-Angle ==1.0, ...@@ -2019,6 +2019,7 @@ constraints: any.AC-Angle ==1.0,
any.record-hasfield ==1.0, any.record-hasfield ==1.0,
any.record-wrangler ==0.1.1.0, any.record-wrangler ==0.1.1.0,
any.records-sop ==0.1.1.0, any.records-sop ==0.1.1.0,
any.recover-rtti ==0.4.3,
any.recursion-schemes ==5.2.2.2, any.recursion-schemes ==5.2.2.2,
any.reducers ==3.12.4, any.reducers ==3.12.4,
any.ref-fd ==0.5, any.ref-fd ==0.5,
......
...@@ -868,11 +868,12 @@ test-suite garg-test ...@@ -868,11 +868,12 @@ test-suite garg-test
Parsers.Date Parsers.Date
Parsers.Types Parsers.Types
Parsers.WOS Parsers.WOS
Utils
Utils.Crypto Utils.Crypto
Utils.Jobs Utils.Jobs
Paths_gargantext Paths_gargantext
hs-source-dirs: hs-source-dirs:
src-test test
default-extensions: default-extensions:
DataKinds DataKinds
DeriveGeneric DeriveGeneric
...@@ -912,6 +913,7 @@ test-suite garg-test ...@@ -912,6 +913,7 @@ test-suite garg-test
, gargantext , gargantext
, gargantext-prelude , gargantext-prelude
, hspec , hspec
, hspec-expectations >= 0.8.3
, http-client , http-client
, http-client-tls , http-client-tls
, mtl , mtl
...@@ -920,6 +922,7 @@ test-suite garg-test ...@@ -920,6 +922,7 @@ test-suite garg-test
, patches-map , patches-map
, quickcheck-instances , quickcheck-instances
, raw-strings-qq , raw-strings-qq
, recover-rtti
, servant-job , servant-job
, stm , stm
, tasty , tasty
......
...@@ -505,7 +505,7 @@ executables: ...@@ -505,7 +505,7 @@ executables:
tests: tests:
garg-test: garg-test:
main: Main.hs main: Main.hs
source-dirs: src-test source-dirs: test
default-extensions: default-extensions:
- DataKinds - DataKinds
- DeriveGeneric - DeriveGeneric
......
...@@ -97,6 +97,7 @@ lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar ...@@ -97,6 +97,7 @@ lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO () gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
gcThread js (JobMap mvar) = go gcThread js (JobMap mvar) = go
where go = do where go = do
threadDelay (jsGcPeriod js * 1000000)
now <- getCurrentTime now <- getCurrentTime
candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
forM_ candidateEntries $ \je -> do forM_ candidateEntries $ \je -> do
...@@ -108,7 +109,6 @@ gcThread js (JobMap mvar) = go ...@@ -108,7 +109,6 @@ gcThread js (JobMap mvar) = go
case mrunningjob of case mrunningjob of
Nothing -> return () Nothing -> return ()
Just a -> killJ a Just a -> killJ a
threadDelay (jsGcPeriod js * 1000000)
go go
expired now jobentry = case jTimeoutAfter jobentry of expired now jobentry = case jTimeoutAfter jobentry of
......
...@@ -116,6 +116,7 @@ extra-deps: ...@@ -116,6 +116,7 @@ extra-deps:
- hgal-2.0.0.2@sha256:13d58afd0668b9cb881c612eff8488a0e289edd4bbffa893df4beee60cfeb73b,653 - hgal-2.0.0.2@sha256:13d58afd0668b9cb881c612eff8488a0e289edd4bbffa893df4beee60cfeb73b,653
- hsparql-0.3.8 - hsparql-0.3.8
- hstatistics-0.3.1 - hstatistics-0.3.1
- hspec-expectations-0.8.3
- json-stream-0.4.2.4@sha256:8b7f17d54a6e1e6311756270f8bcf51e91bab4300945400de66118470dcf51b9,4716 - json-stream-0.4.2.4@sha256:8b7f17d54a6e1e6311756270f8bcf51e91bab4300945400de66118470dcf51b9,4716
- located-base-0.1.1.1@sha256:7c6395f2b6fbf2d5f76c3514f774423838c0ea94e1c6a5530dd3c94b30c9d1c8,1904 - located-base-0.1.1.1@sha256:7c6395f2b6fbf2d5f76c3514f774423838c0ea94e1c6a5530dd3c94b30c9d1c8,1904
- logging-effect-1.3.12@sha256:72d168dd09887649ba9501627219b6027cbec2d5541931555b7885b133785ce3,1679 - logging-effect-1.3.12@sha256:72d168dd09887649ba9501627219b6027cbec2d5541931555b7885b133785ce3,1679
...@@ -123,6 +124,7 @@ extra-deps: ...@@ -123,6 +124,7 @@ extra-deps:
- monoid-extras-0.5.1@sha256:438dbfd7b4dce47d8f0ca577f56caf94bd1e21391afa545cad09fe7cf2e5793d,2333 - monoid-extras-0.5.1@sha256:438dbfd7b4dce47d8f0ca577f56caf94bd1e21391afa545cad09fe7cf2e5793d,2333
- rake-0.0.1@sha256:3380f6567fb17505d1095b7f32222c0b631fa04126ad39726c84262da99c08b3,2025 - rake-0.0.1@sha256:3380f6567fb17505d1095b7f32222c0b631fa04126ad39726c84262da99c08b3,2025
- random-1.2.1 - random-1.2.1
- recover-rtti-0.4.3
- servant-cassava-0.10.1@sha256:07e7b6ca67cf57dcb4a0041a399a25d058844505837c6479e01d62be59d01fdf,1665 - servant-cassava-0.10.1@sha256:07e7b6ca67cf57dcb4a0041a399a25d058844505837c6479e01d62be59d01fdf,1665
- servant-ekg-0.3.1@sha256:19bd9dc3943983da8e79d6f607614c68faea4054fb889d508c8a2b67b6bdd448,2203 - servant-ekg-0.3.1@sha256:19bd9dc3943983da8e79d6f607614c68faea4054fb889d508c8a2b67b6bdd448,2203
- servant-flatten-0.2@sha256:276896f7c5cdec5b8f8493f6205fded0cc602d050b58fdb09a6d7c85c3bb0837,1234 - servant-flatten-0.2@sha256:276896f7c5cdec5b8f8493f6205fded0cc602d050b58fdb09a6d7c85c3bb0837,1234
......
{-# LANGUAGE ScopedTypeVariables #-}
module Utils where
import Prelude
import Control.Exception
import Test.Tasty.HUnit
-- | Marks the input 'Assertion' as pending, by ignoring any exception
-- thrown by it.
pending :: String -> Assertion -> Assertion
pending reason act = act `catch` (\(e :: SomeException) -> do
putStrLn $ "PENDING: " <> reason
putStrLn (displayException e))
...@@ -17,11 +17,13 @@ import Data.Either ...@@ -17,11 +17,13 @@ import Data.Either
import Data.List import Data.List
import Data.Sequence (Seq, (|>), fromList) import Data.Sequence (Seq, (|>), fromList)
import Data.Time import Data.Time
import Debug.RecoverRTTI (anythingToString)
import Prelude import Prelude
import System.IO.Unsafe import System.IO.Unsafe
import Network.HTTP.Client.TLS (newTlsManager) import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Client (Manager) import Network.HTTP.Client (Manager)
import Test.Hspec hiding (pending) import Test.Hspec
import Test.Hspec.Expectations.Contrib (annotate)
import qualified Servant.Job.Types as SJ import qualified Servant.Job.Types as SJ
import qualified Servant.Job.Core as SJ import qualified Servant.Job.Core as SJ
...@@ -33,6 +35,7 @@ import Gargantext.Utils.Jobs.State ...@@ -33,6 +35,7 @@ import Gargantext.Utils.Jobs.State
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.API.Admin.EnvTypes as EnvTypes import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.API.Admin.Orchestrator.Types import Gargantext.API.Admin.Orchestrator.Types
import Control.Concurrent.Async
data JobT = A data JobT = A
...@@ -53,18 +56,29 @@ addJobToSchedule jobt mvar = do ...@@ -53,18 +56,29 @@ addJobToSchedule jobt mvar = do
data Counts = Counts { countAs :: Int, countBs :: Int } data Counts = Counts { countAs :: Int, countBs :: Int }
deriving (Eq, Show) deriving (Eq, Show)
jobDuration, initialDelay :: Int jobDuration :: Int
jobDuration = 100000 jobDuration = 100000
initialDelay = 20000
type Timer = TVar Bool
-- | Use in conjuction with 'registerDelay' to create an 'STM' transaction -- | Use in conjuction with 'registerDelay' to create an 'STM' transaction
-- that will simulate the duration of a job by waiting the timeout registered -- that will simulate the duration of a job by waiting the timeout registered
-- by 'registerDelay' before continuing. -- by 'registerDelay' before continuing.
waitJobSTM :: TVar Bool -> STM () waitTimerSTM :: Timer -> STM ()
waitJobSTM tv = do waitTimerSTM tv = do
v <- readTVar tv v <- readTVar tv
check v check v
-- | Samples the running jobs from the first 'TVar' and write them
-- in the queue.
sampleRunningJobs :: Timer -> TVar [String] -> TQueue [String]-> STM ()
sampleRunningJobs timer runningJs samples = do
waitTimerSTM timer
runningNow <- readTVar runningJs
case runningNow of
[] -> pure () -- ignore empty runs, when the system is kickstarting.
xs -> writeTQueue samples xs
-- | The aim of this test is to ensure that the \"max runners\" setting is -- | The aim of this test is to ensure that the \"max runners\" setting is
-- respected, i.e. we have no more than \"N\" jobs running at the same time. -- respected, i.e. we have no more than \"N\" jobs running at the same time.
testMaxRunners :: IO () testMaxRunners :: IO ()
...@@ -76,13 +90,27 @@ testMaxRunners = do ...@@ -76,13 +90,27 @@ testMaxRunners = do
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
now <- getCurrentTime now <- getCurrentTime
runningJs <- newTVarIO [] runningJs <- newTVarIO []
samples <- newTQueueIO
remainingJs <- newTVarIO num_jobs remainingJs <- newTVarIO num_jobs
-- Not the most elegant solution, but in order to test the \"max runners\"
-- parameter we start an asynchronous computation that continuously reads the content
-- of 'runningJs' and at the end ensures that this value was
-- always <= \"max_runners" (but crucially not 0).
asyncReader <- async $ forever $ do
samplingFrequency <- registerDelay 100_000
atomically $ sampleRunningJobs samplingFrequency runningJs samples
let duration = 1_000_000 let duration = 1_000_000
j num _jHandle _inp _l = do j num _jHandle _inp _l = do
durationTimer <- registerDelay duration durationTimer <- registerDelay duration
-- NOTE: We do the modification of the 'runningJs' and the rest
-- in two transactions on purpose, to give a chance to the async
-- sampler to sample the status of the world.
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
atomically $ do atomically $ do
modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs) waitTimerSTM durationTimer
waitJobSTM durationTimer
modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs) modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
modifyTVar remainingJs pred modifyTVar remainingJs pred
jobs = [ (A, j n) | n <- [1..num_jobs::Int] ] jobs = [ (A, j n) | n <- [1..num_jobs::Int] ]
...@@ -94,16 +122,19 @@ testMaxRunners = do ...@@ -94,16 +122,19 @@ testMaxRunners = do
x <- readTVar remainingJs x <- readTVar remainingJs
check (x == 0) check (x == 0)
-- Wait for the jobs to finish, then stop the sampler.
waitFinished waitFinished
cancel asyncReader
r1 <- readTVarIO runningJs -- Check that we got /some/ samples and for each of them,
sort r1 `shouldBe` ["Job #1", "Job #2"] -- let's check only two runners at max were alive.
threadDelay jobDuration allSamples <- atomically $ flushTQueue samples
r2 <- readTVarIO runningJs length allSamples `shouldSatisfy` (> 0)
sort r2 `shouldBe` ["Job #3", "Job #4"]
threadDelay jobDuration forM_ allSamples $ \runLog -> do
r3 <- readTVarIO runningJs annotate "predicate to satisfy: (x `isInfixOf` [\"Job #1\", \"Job #2\"] || x `isInfixOf` [\"Job #3\", \"Job #4\"]" $
r3 `shouldBe` [] shouldSatisfy (sort runLog)
(\x -> x `isInfixOf` ["Job #1", "Job #2"] || x `isInfixOf` ["Job #3", "Job #4"])
testPrios :: IO () testPrios :: IO ()
testPrios = do testPrios = do
...@@ -136,18 +167,19 @@ testPrios = do ...@@ -136,18 +167,19 @@ testPrios = do
testExceptions :: IO () testExceptions :: IO ()
testExceptions = do testExceptions = do
k <- genSecret k <- genSecret
let settings = defaultJobSettings 2 k let settings = defaultJobSettings 1 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A () jid <- pushJob A ()
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn) (\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st settings st
threadDelay initialDelay -- Wait 1 second to make sure the job is finished.
threadDelay $ 1_000_000
mjob <- lookupJob jid (jobsData st) mjob <- lookupJob jid (jobsData st)
case mjob of case mjob of
Nothing -> error "boo" Nothing -> fail "lookupJob failed, job not found!"
Just je -> case jTask je of Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True DoneJ _ r -> isLeft r `shouldBe` True
_ -> error "boo2" unexpected -> fail $ "Expected job to be done, but got: " <> anythingToString unexpected
return () return ()
testFairness :: IO () testFairness :: IO ()
...@@ -373,26 +405,20 @@ testMarkProgress = do ...@@ -373,26 +405,20 @@ testMarkProgress = do
] ]
} }
pending :: String -> IO () -> IO ()
pending reason act = act `catch` (\(e :: SomeException) -> do
putStrLn $ "PENDING: " <> reason
putStrLn (displayException e))
test :: Spec test :: Spec
test = do test = do
describe "job queue" $ do describe "job queue" $ do
it "respects max runners limit" $ it "respects max runners limit" $
pending "Ticket #198" testMaxRunners testMaxRunners
it "respects priorities" $ it "respects priorities" $
testPrios testPrios
it "can handle exceptions" $ it "can handle exceptions" $
pending "Ticket #198" testExceptions testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $ it "fairly picks equal-priority-but-different-kind jobs" $
testFairness testFairness
describe "job status update and tracking" $ do describe "job status update and tracking" $ do
it "can fetch the latest job status" $ it "can fetch the latest job status" $
pending "Ticket #198" testFetchJobStatus testFetchJobStatus
it "can spin two separate jobs and track their status separately" $ it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention testFetchJobStatusNoContention
it "marking stuff behaves as expected" $ it "marking stuff behaves as expected" $
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment