1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
module Gargantext.Utils.Jobs.Map (
-- * Types
JobMap(..)
, JobEntry(..)
, J(..)
, QueuedJob(..)
, RunningJob(..)
, LoggerM
, Logger
-- * Functions
, newJobMap
, lookupJob
, gcThread
, addJobEntry
, deleteJob
, runJob
, waitJobDone
, runJ
, waitJ
, pollJ
, killJ
) where
import Control.Concurrent
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Map.Strict (Map)
import Data.Time.Clock
import Prelude
import qualified Data.Map.Strict as Map
import Gargantext.Utils.Jobs.Settings
-- | (Mutable) 'Map' containing job id -> job info mapping.
newtype JobMap jid w a = JobMap
{ jobMap :: TVar (Map jid (JobEntry jid w a))
}
-- | Information associated to a job ID
data JobEntry jid w a = JobEntry
{ jID :: jid
, jTask :: J w a
, jTimeoutAfter :: Maybe UTCTime
, jRegistered :: UTCTime
, jStarted :: Maybe UTCTime
, jEnded :: Maybe UTCTime
}
-- | A job computation, which has a different representation depending on the
-- status of the job.
--
-- A queued job consists of the input to the computation and the computation.
-- A running job consists of an 'Async' as well as an action to get the current logs.
-- A done job consists of the result of the computation and the final logs.
data J w a
= QueuedJ (QueuedJob w a)
| RunningJ (RunningJob w a)
| DoneJ w (Either SomeException a)
-- | An unexecuted job is an input paired with a computation
-- to run with it. Input type is "hidden" to
-- be able to store different job types together.
data QueuedJob w r where
QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
-- | A running job points to the async computation for the job and provides a
-- function to peek at the current logs.
data RunningJob w a = RunningJob
{ rjAsync :: Async.Async a
, rjGetLog :: IO w
}
-- | Polymorphic logger over any monad @m@.
type LoggerM m w = w -> m ()
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = LoggerM IO w
newJobMap :: IO (JobMap jid w a)
newJobMap = JobMap <$> newTVarIO Map.empty
-- | Lookup a job by ID
lookupJob
:: Ord jid
=> jid
-> JobMap jid w a
-> IO (Maybe (JobEntry jid w a))
lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
-- | Ready to use GC thread
gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
gcThread js (JobMap mvar) = go
where go = do
threadDelay (jsGcPeriod js * 1000000)
now <- getCurrentTime
candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
forM_ candidateEntries $ \je -> do
mrunningjob <- atomically $ do
case jTask je of
RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
>> pure (Just rj)
_ -> pure Nothing
case mrunningjob of
Nothing -> pure ()
Just a -> killJ a
go
expired now jobentry = case jTimeoutAfter jobentry of
Just t -> now >= t
_ -> False
-- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
-- /IMPORTANT/: The new value is appended in front. The ordering is important later on
-- when consuming logs from the API (see for example 'pollJob').
jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
-- | Generating new 'JobEntry's.
addJobEntry
:: Ord jid
=> UTCTime
-> jid
-> a
-> (jid -> a -> Logger w -> IO r)
-> JobMap jid w r
-> STM (JobEntry jid w r)
addJobEntry now jid input f (JobMap mvar) = do
let je = JobEntry
{ jID = jid
, jTask = QueuedJ (QueuedJob input (f jid))
, jRegistered = now
, jTimeoutAfter = Nothing
, jStarted = Nothing
, jEnded = Nothing
}
modifyTVar' mvar (Map.insert jid je)
pure je
deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
runJob
:: (Ord jid, Monoid w)
=> jid
-> QueuedJob w a
-> JobMap jid w a
-> JobSettings
-> IO (RunningJob w a)
runJob jid qj (JobMap mvar) js = do
rj <- runJ qj
now <- getCurrentTime
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jTask = RunningJ rj
, jStarted = Just now
, jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
}
pure rj
waitJobDone
:: Ord jid
=> jid
-> RunningJob w a
-> JobMap jid w a
-> IO (Either SomeException a, w)
waitJobDone jid rj (JobMap mvar) = do
r <- waitJ rj
now <- getCurrentTime
logs <- rjGetLog rj
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jEnded = Just now, jTask = DoneJ logs r }
pure (r, logs)
-- | Turn a queued job into a running job by setting up the logging of @w@s and
-- firing up the async action.
runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
runJ (QueuedJob a f) = do
logs <- newTVarIO mempty
act <- Async.async $ f a (jobLog logs)
let readLogs = readTVarIO logs
pure (RunningJob act readLogs)
-- | Wait for a running job to return (blocking).
waitJ :: RunningJob w a -> IO (Either SomeException a)
waitJ (RunningJob act _) = Async.waitCatch act
-- | Poll a running job to see if it's done.
pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
pollJ (RunningJob act _) = Async.poll act
-- | Kill a running job by cancelling the action.
killJ :: RunningJob w a -> IO ()
killJ (RunningJob act _) = Async.cancel act