Commit 29418bb5 authored by Alfredo Di Napoli

Pass a JobHandle to the serveJobsAPI continuation

parent af381f0a
......@@ -268,7 +268,7 @@ type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI ForgotPasswordJob $ \p log' ->
serveJobsAPI ForgotPasswordJob $ \_jHandle p log' ->
forgotPasswordAsync' p (liftBase . log')
forgotPasswordAsync' :: (FlowCmdM env err m)
-- |
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.API.Admin.EnvTypes where
......@@ -105,6 +106,12 @@ instance HasJobEnv Env JobLog JobLog where
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Dual [JobLog]) JobLog where
getJobEnv = asks (view env_jobs)
instance Jobs.MonadJobStatus (ReaderT Env (ExceptT GargError IO)) Dual where
type JobType (ReaderT Env (ExceptT GargError IO)) = GargJob
type JobOutputType (ReaderT Env (ExceptT GargError IO)) = JobLog
type JobEventType (ReaderT Env (ExceptT GargError IO)) = JobLog
type JobErrorType (ReaderT Env (ExceptT GargError IO)) = GargError
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
......@@ -830,7 +830,7 @@ apiNgramsTableDoc dId = getTableNgramsDoc dId
apiNgramsAsync :: NodeId -> ServerT TableNgramsAsyncApi (GargM Env GargError)
apiNgramsAsync _dId =
serveJobsAPI TableNgramsJob $ \i log ->
serveJobsAPI TableNgramsJob $ \_jHandle i log ->
log' x = do
printDebug "tableNgramsPostChartsAsync" x
......@@ -192,7 +192,7 @@ toIndexedNgrams m t = Indexed <$> i <*> n
jsonPostAsync :: ServerT JSONAPI (GargM Env GargError)
jsonPostAsync lId =
serveJobsAPI UpdateNgramsListJobJSON $ \f log' ->
serveJobsAPI UpdateNgramsListJobJSON $ \_jHandle f log' ->
log'' x = do
-- printDebug "postAsync ListId" x
......@@ -288,7 +288,7 @@ csvPost l m = do
csvPostAsync :: ServerT CSVAPI (GargM Env GargError)
csvPostAsync lId =
serveJobsAPI UpdateNgramsListJobCSV $ \f@(WithTextFile _ft _ _n) log' -> do
serveJobsAPI UpdateNgramsListJobCSV $ \_jHandle f@(WithTextFile _ft _ _n) log' -> do
let log'' x = do
-- printDebug "[csvPostAsync] filetype" ft
-- printDebug "[csvPostAsync] name" n
......@@ -73,7 +73,7 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
api_async :: User -> NodeId -> ServerT API_Async (GargM Env GargError)
api_async u nId =
serveJobsAPI AddContactJob $ \p log ->
serveJobsAPI AddContactJob $ \_jHandle p log ->
log' x = do
-- printDebug "addContact" x
......@@ -69,7 +69,7 @@ type API = Summary " Document upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadDocumentJob $ \q log' -> do
serveJobsAPI UploadDocumentJob $ \_jHandle q log' -> do
documentUploadAsync uId nId q (liftBase . log')
documentUploadAsync :: (FlowCmdM env err m)
......@@ -70,7 +70,7 @@ instance ToSchema Params
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI DocumentFromWriteNodeJob $ \p log'' ->
serveJobsAPI DocumentFromWriteNodeJob $ \_jHandle p log'' ->
log' x = do
liftBase $ log'' x
......@@ -102,7 +102,7 @@ type FileAsyncApi = Summary "File Async Api"
fileAsyncApi :: UserId -> NodeId -> ServerT FileAsyncApi (GargM Env GargError)
fileAsyncApi uId nId =
serveJobsAPI AddFileJob $ \i l ->
serveJobsAPI AddFileJob $ \_jHandle i l ->
log' x = do
-- printDebug "addWithFile" x
......@@ -54,7 +54,7 @@ type API = Summary " FrameCalc upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadFrameCalcJob $ \p logs ->
serveJobsAPI UploadFrameCalcJob $ \_jHandle p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
......@@ -77,7 +77,7 @@ type PostNodeAsync = Summary "Post Node"
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI NewNodeJob $ \p logs ->
serveJobsAPI NewNodeJob $ \_jHandle p logs ->
postNodeAsync uId nId p (liftBase . logs)
......@@ -94,7 +94,7 @@ data Charts = Sources | Authors | Institutes | Ngrams | All
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UpdateNodeJob $ \p log'' ->
serveJobsAPI UpdateNodeJob $ \_jHandle p log'' ->
log' x = do
-- printDebug "updateNode" x
......@@ -282,7 +282,7 @@ waitAPI n = do
addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI AddCorpusQueryJob $ \q log' -> do
serveJobsAPI AddCorpusQueryJob $ \_jHandle q log' -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . log')
{- let log' x = do
......@@ -292,7 +292,7 @@ addCorpusWithQuery user cid =
addCorpusWithForm :: User -> ServerT New.AddWithForm (GargM Env GargError)
addCorpusWithForm user cid =
serveJobsAPI AddCorpusFormJob $ \i log' ->
serveJobsAPI AddCorpusFormJob $ \_jHandle i log' ->
log'' x = do
--printDebug "[addToCorpusWithForm] " x
......@@ -301,7 +301,7 @@ addCorpusWithForm user cid =
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
addCorpusWithFile user cid =
serveJobsAPI AddCorpusFileJob $ \i log' ->
serveJobsAPI AddCorpusFileJob $ \_jHandle i log' ->
log'' x = do
-- printDebug "[addToCorpusWithFile]" x
......@@ -310,5 +310,5 @@ addCorpusWithFile user cid =
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI AddAnnuaireFormJob $ \i log' ->
serveJobsAPI AddAnnuaireFormJob $ \_jHandle i log' ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . log')
......@@ -257,7 +257,7 @@ type GraphAsyncAPI = Summary "Recompute graph"
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI RecomputeGraphJob $ \_ log' ->
serveJobsAPI RecomputeGraphJob $ \_ _jHandle log' ->
graphRecompute u n (liftBase . log')
......@@ -29,19 +29,19 @@ jobErrorToGargError = GargJobError
:: (
Foldable callbacks
, Ord jobType
, Show jobType
, ToJSON event
, ToJSON result
, MonadJob m jobType (Dual [event]) result
, Ord (JobType m)
, Show (JobType m)
, ToJSON (JobEventType m)
, ToJSON (JobOutputType m)
, MonadJobStatus m Dual
, m ~ (GargM env GargError)
=> jobType
-> (input -> Logger event -> m result)
-> SJ.AsyncJobsServerT' ctI ctO callbacks event input result m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env i l -> do
=> JobType m
-> (Internal.JobHandle -> input -> Logger (JobEventType m) -> m (JobOutputType m))
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i l -> do
putStrLn ("Running job of type: " ++ show jobType)
runExceptT $ runReaderT (f i l) env
runExceptT $ runReaderT (f jHandle i l) env
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
{-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.Internal (serveJobsAPI) where
module Gargantext.Utils.Jobs.Internal (
, JobHandle -- opaque
) where
import Control.Concurrent
import Control.Concurrent.Async
......@@ -22,6 +25,12 @@ import qualified Servant.Job.Async as SJ
import qualified Servant.Job.Client as SJ
import qualified Servant.Job.Types as SJ
-- | An opaque handle that abstracts over the concrete identifier for
-- a job. The constructor for this type is deliberately not exported.
newtype JobHandle =
JobHandle { _jh_id :: SJ.JobID 'SJ.Safe }
deriving (Eq, Ord)
:: ( Ord t, Exception e, MonadError e m
, MonadJob m t (Dual [event]) output
......@@ -31,7 +40,7 @@ serveJobsAPI
=> m env
-> t
-> (JobError -> e)
-> (env -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
......@@ -67,7 +76,7 @@ newJob
=> m env
-> t
-> (env -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
......@@ -81,8 +90,8 @@ newJob getenv jobkind f input = do
postCallback (SJ.mkChanEvent e)
logF e
f' inp logF = do
r <- f env inp (pushLog logF . Dual . (:[]))
f' jId inp logF = do
r <- f env (JobHandle jId) inp (pushLog logF . Dual . (:[]))
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
module Gargantext.Utils.Jobs.Map where
module Gargantext.Utils.Jobs.Map (
-- * Types
, JobEntry(..)
, J(..)
, QueuedJob(..)
, RunningJob(..)
, Logger
-- * Functions
, newJobMap
, lookupJob
, gcThread
, jobLog
, addJobEntry
, deleteJob
, runJob
, waitJobDone
, runJ
, waitJ
, pollJ
, killJ
) where
import Control.Concurrent
import Control.Concurrent.Async
......@@ -99,14 +121,14 @@ addJobEntry
:: Ord jid
=> jid
-> a
-> (a -> Logger w -> IO r)
-> (jid -> a -> Logger w -> IO r)
-> JobMap jid w r
-> IO (JobEntry jid w r)
addJobEntry jid input f (JobMap mvar) = do
now <- getCurrentTime
let je = JobEntry
{ jID = jid
, jTask = QueuedJ (QueuedJob input f)
, jTask = QueuedJ (QueuedJob input (f jid))
, jRegistered = now
, jTimeoutAfter = Nothing
, jStarted = Nothing
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses #-}
module Gargantext.Utils.Jobs.Monad where
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
module Gargantext.Utils.Jobs.Monad (
-- * Types and classes
, NumRunners
, JobError(..)
, MonadJob(..)
, MonadJobStatus(..)
-- * Functions
, newJobEnv
, defaultJobSettings
, genSecret
, getJobsSettings
, getJobsState
, getJobsMap
, getJobsQueue
, queueJob
, findJob
, checkJID
, withJob
, handleIDError
, removeJob
) where
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.Map
......@@ -9,6 +32,7 @@ import Gargantext.Utils.Jobs.State
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Time.Clock
import Network.HTTP.Client (Manager)
......@@ -64,7 +88,7 @@ queueJob
:: (MonadJob m t w a, Ord t)
=> t
-> i
-> (i -> Logger w -> IO a)
-> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
-> m (SJ.JobID 'SJ.Safe)
queueJob jobkind input f = do
js <- getJobsSettings
......@@ -136,3 +160,14 @@ removeJob queued t jid = do
when queued $
deleteQueue t jid q
deleteJob jid m
-- Tracking jobs status
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJob m (JobType m) (t [JobEventType m]) (JobOutputType m) => MonadJobStatus m t where
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
type JobErrorType m :: Type
......@@ -76,7 +76,7 @@ pushJob
:: Ord t
=> t
-> a
-> (a -> Logger w -> IO r)
-> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> IO (SJ.JobID 'SJ.Safe)
......@@ -36,7 +36,7 @@ testMaxRunners = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO []
let j num _inp _l = do
let j num _jHandle _inp _l = do
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
threadDelay jobDuration
atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
......@@ -59,7 +59,7 @@ testPrios = do
st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios [(B, 10)] defaultPrios -- B has higher priority
runningJs <- newTVarIO (Counts 0 0)
let j jobt _inp _l = do
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
......@@ -86,7 +86,7 @@ testExceptions = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
threadDelay initialDelay
mjob <- lookupJob jid (jobsData st)
......@@ -103,7 +103,7 @@ testFairness = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO (Counts 0 0)
let j jobt _inp _l = do
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
