1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.Utils.Jobs.Internal (
serveJobsAPI
-- * Internals for testing
, newJob
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Lens
import Control.Monad
import Control.Monad.Except
import Data.Aeson (ToJSON)
import Data.Foldable (toList)
import Data.Monoid
import Data.Kind (Type)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Prelude
import Servant.API.Alternative
import Servant.API.ContentTypes
import Gargantext.API.Errors.Types (BackendInternalError)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import qualified Data.Text as T
import qualified Servant.Client as C
import qualified Servant.Job.Async as SJ
import qualified Servant.Job.Client as SJ
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
serveJobsAPI
:: ( Ord t, MonadError BackendInternalError m
, MonadJob m t (Seq event) output
, ToJSON event, ToJSON output, MimeRender JSON output
, Foldable callback
)
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (JobError -> BackendInternalError)
-> (env -> JobHandle m -> input -> IO (Either BackendInternalError output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI newJobHandle getenv t joberr f
= newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
:<|> newJob newJobHandle getenv t f
:<|> serveJobAPI t joberr
serveJobAPI
:: forall (m :: Type -> Type) t event output.
(Ord t, MonadError BackendInternalError m, MonadJob m t (Seq event) output, MimeRender JSON output)
=> t
-> (JobError -> BackendInternalError)
-> SJ.JobID 'SJ.Unsafe
-> SJ.AsyncJobServerT event output m
serveJobAPI t joberr jid' = wrap' (killJob t)
:<|> wrap' pollJob
:<|> wrap (waitJob joberr)
where wrap
:: forall a.
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
-> m a
wrap g = do
jid <- handleIDError joberr (checkJID jid')
job <- maybe (throwError $ joberr $ UnknownJob (SJ._id_number jid)) pure =<< findJob jid
g jid job
wrap' g limit offset = wrap (g limit offset)
newJob
:: ( Ord t, MonadJob m t (Seq event) output
, ToJSON event, ToJSON output
, MimeRender JSON output
, Foldable callbacks
)
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (env -> JobHandle m -> input -> IO (Either BackendInternalError output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob newJobHandle getenv jobkind f input = do
je <- getJobEnv
env <- getenv
let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
C.runClientM (SJ.clientMCallback m)
(C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
pushLog logF = \w -> do
postCallback (SJ.mkChanEvent w)
logF w
f' jId inp logF = do
r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> pure a
jid <- queueJob jobkind (input ^. SJ.job_input) f'
pure (SJ.JobStatus jid [] SJ.IsPending Nothing)
pollJob
:: MonadJob m t (Seq event) output
=> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
pollJob limit offset jid je = do
(logs, status, merr) <- case jTask je of
QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
<*> pure SJ.IsRunning
<*> pure Nothing
DoneJ ls r ->
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
in pure (ls, st, me)
-- /NOTE/: We need to be careful with the ordering of the logs here:
-- we want to return the logs ordered from the newest to the oldest,
-- because the API will use 'limit' to show only the newest ones,
-- taking 'limit' of them from the front of the list.
--
-- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
-- and it would be inefficient to reverse the list here, it's important
-- that the concrete implementation of 'rjGetLog' returns the logs in the
-- correct order.
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
waitJob
:: (MonadError e m, MonadJob m t (Seq event) output)
=> (JobError -> e)
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobOutput output)
waitJob joberr jid je = do
r <- case jTask je of
QueuedJ _qj -> do
m <- getJobsMap
erj <- waitTilRunning
case erj of
Left res -> pure res
Right rj -> do
(res, _logs) <- liftIO (waitJobDone jid rj m)
pure res
RunningJ rj -> do
m <- getJobsMap
(res, _logs) <- liftIO (waitJobDone jid rj m)
pure res
DoneJ _ls res -> pure res
either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
where waitTilRunning =
findJob jid >>= \mjob -> case mjob of
Nothing -> error "impossible"
Just je' -> case jTask je' of
QueuedJ _qj -> do
liftIO $ threadDelay 50000 -- wait 50ms
waitTilRunning
RunningJ rj -> pure (Right rj)
DoneJ _ls res -> pure (Left res)
killJob
:: (Ord t, MonadJob m t (Seq event) output)
=> t
-> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
killJob t limit offset jid je = do
(logs, status, merr) <- case jTask je of
QueuedJ _ -> do
removeJob True t jid
pure (mempty, SJ.IsKilled, Nothing)
RunningJ rj -> do
liftIO $ cancel (rjAsync rj)
lgs <- liftIO (rjGetLog rj)
removeJob False t jid
pure (lgs, SJ.IsKilled, Nothing)
DoneJ lgs r -> do
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
removeJob False t jid
pure (lgs, st, me)
-- /NOTE/: Same proviso as in 'pollJob' applies here.
pure $ SJ.jobStatus jid limit offset (toList logs) status merr