Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
haskell-gargantext
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
153
Issues
153
List
Board
Labels
Milestones
Merge Requests
9
Merge Requests
9
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gargantext
haskell-gargantext
Commits
ecaeb913
Commit
ecaeb913
authored
Apr 20, 2023
by
Alexandre Delanoë
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/adinapoli/issue-198' into dev
parents
f2664236
862391be
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
53 additions
and
29 deletions
+53
-29
gargantext.cabal
gargantext.cabal
+1
-0
package.yaml
package.yaml
+1
-0
Map.hs
src/Gargantext/Utils/Jobs/Map.hs
+6
-6
Queue.hs
src/Gargantext/Utils/Jobs/Queue.hs
+11
-3
State.hs
src/Gargantext/Utils/Jobs/State.hs
+22
-9
Main.hs
tests/queue/Main.hs
+12
-11
No files found.
gargantext.cabal
View file @
ecaeb913
...
...
@@ -929,4 +929,5 @@ test-suite jobqueue-test
, servant-job
, stm
, text
, time
default-language: Haskell2010
package.yaml
View file @
ecaeb913
...
...
@@ -533,6 +533,7 @@ tests:
-
http-client-tls
-
servant-job
-
stm
-
time
# garg-doctest:
# main: Main.hs
# source-dirs: src-doctest
...
...
src/Gargantext/Utils/Jobs/Map.hs
View file @
ecaeb913
...
...
@@ -124,13 +124,13 @@ jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
-- | Generating new 'JobEntry's.
addJobEntry
::
Ord
jid
=>
jid
=>
UTCTime
->
jid
->
a
->
(
jid
->
a
->
Logger
w
->
IO
r
)
->
JobMap
jid
w
r
->
IO
(
JobEntry
jid
w
r
)
addJobEntry
jid
input
f
(
JobMap
mvar
)
=
do
now
<-
getCurrentTime
->
STM
(
JobEntry
jid
w
r
)
addJobEntry
now
jid
input
f
(
JobMap
mvar
)
=
do
let
je
=
JobEntry
{
jID
=
jid
,
jTask
=
QueuedJ
(
QueuedJob
input
(
f
jid
))
...
...
@@ -139,8 +139,8 @@ addJobEntry jid input f (JobMap mvar) = do
,
jStarted
=
Nothing
,
jEnded
=
Nothing
}
atomically
$
modifyTVar'
mvar
(
Map
.
insert
jid
je
)
return
je
modifyTVar'
mvar
(
Map
.
insert
jid
je
)
pure
je
deleteJob
::
Ord
jid
=>
jid
->
JobMap
jid
w
a
->
STM
()
deleteJob
jid
(
JobMap
mvar
)
=
modifyTVar'
mvar
(
Map
.
delete
jid
)
...
...
src/Gargantext/Utils/Jobs/Queue.hs
View file @
ecaeb913
...
...
@@ -4,6 +4,7 @@ module Gargantext.Utils.Jobs.Queue where
import
Control.Concurrent
import
Control.Concurrent.STM
import
Control.Exception
import
Control.Monad
import
Data.Function
import
Data.List
import
Data.Ord
...
...
@@ -94,9 +95,9 @@ newQueue prios = do
return
$
Queue
vars
indices
prios
-- | Add a new element to the queue, with the given kind.
addQueue
::
Ord
t
=>
t
->
a
->
Queue
t
a
->
IO
()
addQueue
::
Ord
t
=>
t
->
a
->
Queue
t
a
->
STM
()
addQueue
jobkind
a
q
=
case
Map
.
lookup
jobkind
(
queueIndices
q
)
of
Just
i
->
atomically
$
modifyTVar
(
queueData
q
Vector
.!
i
)
(
snocQ
a
)
Just
i
->
modifyTVar
(
queueData
q
Vector
.!
i
)
(
snocQ
a
)
Nothing
->
error
"addQueue: couldn't find queue for given job kind"
deleteQueue
::
(
Eq
a
,
Ord
t
)
=>
t
->
a
->
Queue
t
a
->
STM
()
...
...
@@ -104,6 +105,13 @@ deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just
i
->
modifyTVar
(
queueData
q
Vector
.!
i
)
(
deleteQ
a
)
Nothing
->
error
"deleteQueue: queue type not found?!"
-- | Dump the contents of the queue, for debugging purposes.
debugDumpQueue
::
(
Enum
t
,
Bounded
t
,
Ord
t
)
=>
Queue
t
a
->
STM
[(
t
,
a
)]
debugDumpQueue
q
=
mconcat
<$>
(
forM
[
minBound
..
maxBound
]
$
\
t
->
do
readTVar
(
queueData
q
Vector
.!
(
i
t
))
>>=
debugDumpQ
t
)
where
i
t
=
fromJust
$
Map
.
lookup
t
(
queueIndices
q
)
debugDumpQ
t
(
Q
xs
ys
_
)
=
return
$
map
(
\
x
->
(
t
,
x
))
(
xs
++
reverse
ys
)
type
Picker
a
=
[(
a
,
STM
()
)]
->
STM
(
a
,
STM
()
)
...
...
@@ -125,7 +133,7 @@ popQueue picker q = atomically $ select prioLevels
mres
<-
selectLevel
level
case
mres
of
Nothing
->
select
levels
Just
res
->
return
(
Just
res
)
Just
res
->
pure
$
Just
res
selectLevel
::
[(
t
,
Prio
)]
->
STM
(
Maybe
a
)
selectLevel
xs
=
do
...
...
src/Gargantext/Utils/Jobs/State.hs
View file @
ecaeb913
...
...
@@ -29,11 +29,10 @@ data JobsState t w a = JobsState
,
jsRunners
::
[
Async
()
]
}
nextID
::
JobSettings
->
JobsState
t
w
a
->
IO
(
SJ
.
JobID
'S
J
.
Safe
)
nextID
js
st
=
do
now
<-
getCurrentTime
n
<-
atomically
$
stateTVar
(
jobsIdGen
st
)
$
\
i
->
(
i
,
i
+
1
)
return
$
SJ
.
newID
(
Proxy
::
Proxy
"job"
)
(
jsSecretKey
js
)
now
n
nextID
::
UTCTime
->
JobSettings
->
JobsState
t
w
a
->
STM
(
SJ
.
JobID
'S
J
.
Safe
)
nextID
now
js
st
=
do
n
<-
stateTVar
(
jobsIdGen
st
)
$
\
i
->
(
i
,
i
+
1
)
pure
$
SJ
.
newID
(
Proxy
::
Proxy
"job"
)
(
jsSecretKey
js
)
now
n
newJobsState
::
forall
t
w
a
.
...
...
@@ -72,6 +71,7 @@ newJobsState js prios = do
return
(
jid
,
popjid
)
_3
(
_
,
_
,
c
)
=
c
pushJob
::
Ord
t
=>
t
...
...
@@ -80,8 +80,21 @@ pushJob
->
JobSettings
->
JobsState
t
w
r
->
IO
(
SJ
.
JobID
'S
J
.
Safe
)
pushJob
jobkind
input
f
js
st
@
(
JobsState
jmap
jqueue
_idgen
_
_
)
=
do
jid
<-
nextID
js
st
_je
<-
addJobEntry
jid
input
f
jmap
pushJob
jobkind
input
f
js
st
=
do
now
<-
getCurrentTime
atomically
$
pushJobWithTime
now
jobkind
input
f
js
st
pushJobWithTime
::
Ord
t
=>
UTCTime
->
t
->
a
->
(
SJ
.
JobID
'S
J
.
Safe
->
a
->
Logger
w
->
IO
r
)
->
JobSettings
->
JobsState
t
w
r
->
STM
(
SJ
.
JobID
'S
J
.
Safe
)
pushJobWithTime
now
jobkind
input
f
js
st
@
(
JobsState
jmap
jqueue
_idgen
_
_
)
=
do
jid
<-
nextID
now
js
st
_je
<-
addJobEntry
now
jid
input
f
jmap
addQueue
jobkind
jid
jqueue
return
jid
pure
jid
tests/queue/Main.hs
View file @
ecaeb913
...
...
@@ -16,6 +16,7 @@ import Data.Maybe
import
Data.Either
import
Data.List
import
Data.Sequence
(
Seq
,
(
|>
),
fromList
)
import
Data.Time
import
GHC.Stack
import
Prelude
import
System.IO.Unsafe
...
...
@@ -92,28 +93,28 @@ testMaxRunners = do
testPrios
::
IO
()
testPrios
=
do
k
<-
genSecret
let
settings
=
defaultJobSettings
2
k
-- Use a single runner, so that we can check the order of execution
-- without worrying about the runners competing with each other.
let
settings
=
defaultJobSettings
1
k
prios
=
[(
B
,
10
),
(
C
,
1
),
(
D
,
5
)]
runningDelta
job
=
fromMaybe
0
(
lookup
job
prios
)
*
1000
st
::
JobsState
JobT
[
String
]
()
<-
newJobsState
settings
$
applyPrios
prios
defaultPrios
-- B has the highest priority
pickedSchedule
<-
newMVar
(
JobSchedule
mempty
)
let
j
jobt
_jHandle
_inp
_l
=
do
-- simulate the running time of a job, then add to the schedule.
-- The running time is proportional to the priority of the job,
-- to account for the fact that we are pushing jobs sequentially,
-- so we have to our account for the submission time.
threadDelay
$
jobDuration
-
runningDelta
jobt
addJobToSchedule
jobt
pickedSchedule
let
j
jobt
_jHandle
_inp
_l
=
addJobToSchedule
jobt
pickedSchedule
jobs
=
[
(
A
,
j
A
)
,
(
C
,
j
C
)
,
(
B
,
j
B
)
,
(
D
,
j
D
)
]
forM_
jobs
$
\
(
t
,
f
)
->
void
$
pushJob
t
()
f
settings
st
-- Push all the jobs in the same STM transaction, so that they are all stored in the queue by
-- the time 'popQueue' gets called.
now
<-
getCurrentTime
atomically
$
forM_
jobs
$
\
(
t
,
f
)
->
void
$
pushJobWithTime
now
t
()
f
settings
st
-- wait for the jobs to finish, waiting for more than the total duration,
-- so that we are sure that all jobs have finished, then check the schedule.
threadDelay
(
5
*
jobDuration
)
threadDelay
jobDuration
finalSchedule
<-
readMVar
pickedSchedule
finalSchedule
`
shouldBe
`
JobSchedule
(
fromList
[
B
,
D
,
C
,
A
])
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment