1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
module Gargantext.Utils.Jobs.Monad (
-- * Types and classes
JobEnv(..)
, NumRunners
, JobError(..)
, MonadJob(..)
-- * Tracking jobs status
, MonadJobStatus(..)
-- * Functions
, newJobEnv
, defaultJobSettings
, genSecret
, getJobsSettings
, getJobsState
, getJobsMap
, getJobsQueue
, queueJob
, findJob
, checkJID
, withJob
, handleIDError
, removeJob
) where
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.State
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Control.Monad.Reader
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Time.Clock
import qualified Data.Text as T
import Network.HTTP.Client (Manager)
import Prelude
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
data JobEnv t w a = JobEnv
{ jeSettings :: JobSettings
, jeState :: JobsState t w a
, jeManager :: Manager
}
newJobEnv
:: (EnumBounded t, Monoid w)
=> JobSettings
-> Map t Prio
-> Manager
-> IO (JobEnv t w a)
newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
type NumRunners = Int
defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
defaultJobSettings numRunners k = JobSettings
{ jsNumRunners = numRunners
, jsJobTimeout = 30 * 60 -- 30 minutes
, jsIDTimeout = 30 * 60 -- 30 minutes
, jsGcPeriod = 1 * 60 -- 1 minute
, jsSecretKey = k
}
genSecret :: IO SJ.SecretKey
genSecret = SJ.generateSecretKey
class MonadIO m => MonadJob m t w a | m -> t w a where
getJobEnv :: m (JobEnv t w a)
instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
getJobEnv = ask
getJobsSettings :: MonadJob m t w a => m JobSettings
getJobsSettings = jeSettings <$> getJobEnv
getJobsState :: MonadJob m t w a => m (JobsState t w a)
getJobsState = jeState <$> getJobEnv
getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
getJobsMap = jobsData <$> getJobsState
getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
getJobsQueue = jobsQ <$> getJobsState
queueJob
:: (MonadJob m t w a, Ord t)
=> t
-> i
-> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
-> m (SJ.JobID 'SJ.Safe)
queueJob jobkind input f = do
js <- getJobsSettings
st <- getJobsState
liftIO (pushJob jobkind input f js st)
findJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Safe
-> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
findJob jid = do
jmap <- getJobsMap
liftIO $ lookupJob jid jmap
data JobError
= InvalidIDType
| IDExpired
| InvalidMacID
| UnknownJob
| JobException SomeException
deriving Show
checkJID
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> m (Either JobError (SJ.JobID 'SJ.Safe))
checkJID (SJ.PrivateID tn n t d) = do
now <- liftIO getCurrentTime
js <- getJobsSettings
if | tn /= "job" -> return (Left InvalidIDType)
| now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
| d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
| otherwise -> return $ Right (SJ.PrivateID tn n t d)
withJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
-> m (Either JobError (Maybe r))
withJob jid f = do
r <- checkJID jid
case r of
Left e -> return (Left e)
Right jid' -> do
mj <- findJob jid'
case mj of
Nothing -> return (Right Nothing)
Just j -> Right . Just <$> f jid' j
handleIDError
:: MonadError e m
=> (JobError -> e)
-> m (Either JobError a)
-> m a
handleIDError toE act = act >>= \r -> case r of
Left err -> throwError (toE err)
Right a -> return a
removeJob
:: (Ord t, MonadJob m t w a)
=> Bool -- is it queued (and we have to remove jid from queue)
-> t
-> SJ.JobID 'SJ.Safe
-> m ()
removeJob queued t jid = do
q <- getJobsQueue
m <- getJobsMap
liftIO . atomically $ do
when queued $
deleteQueue t jid q
deleteJob jid m
--
-- Tracking jobs status
--
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJobStatus m where
-- | This is type family for the concrete 'JobHandle' that is associated to
-- a job when it starts and it can be used to query for its completion status. Different environment
-- can decide how this will look like.
type JobHandle m :: Type
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: JobHandle m -> m (JobEventType m)
-- | Adds an extra \"tracer\" that logs events to the passed action. Produces
-- a new 'JobHandle'.
withTracer :: Logger (JobEventType m) -> JobHandle m -> (JobHandle m -> m a) -> m a
-- Creating events
-- | Start tracking a new 'JobEventType' with 'n' remaining steps.
markStarted :: Int -> JobHandle m -> m ()
-- | Mark 'n' steps of the job as succeeded, while simultaneously substracting this number
-- from the remaining steps.
markProgress :: Int -> JobHandle m -> m ()
-- | Mark 'n' step of the job as failed, while simultaneously substracting this number
-- from the remaining steps. Attach an optional error message to the failure.
markFailure :: Int -> Maybe T.Text -> JobHandle m -> m ()
-- | Finish tracking a job by marking all the remaining steps as succeeded.
markComplete :: JobHandle m -> m ()
-- | Finish tracking a job by marking all the remaining steps as failed. Attach an optional
-- message to the failure.
markFailed :: Maybe T.Text -> JobHandle m -> m ()