Commit 2faf39cc authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Remove need for liftBase when using the jobs api

parent 80dda00e
......@@ -269,7 +269,7 @@ type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI ForgotPasswordJob $ \jHandle p ->
forgotPasswordAsync' p (liftBase . jobHandleLogger jHandle)
forgotPasswordAsync' p (jobHandleLogger jHandle)
forgotPasswordAsync' :: (FlowCmdM env err m)
=> ForgotPasswordAsyncParams
......
......@@ -834,7 +834,7 @@ apiNgramsAsync _dId =
let
log' x = do
printDebug "tableNgramsPostChartsAsync" x
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
in tableNgramsPostChartsAsync i log'
-- Did the given list of ngrams changed since the given version?
......
......@@ -196,7 +196,7 @@ jsonPostAsync lId =
let
log'' x = do
-- printDebug "postAsync ListId" x
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
in postAsync' lId f log''
postAsync' :: FlowCmdM env err m
......@@ -292,7 +292,7 @@ csvPostAsync lId =
let log'' x = do
-- printDebug "[csvPostAsync] filetype" ft
-- printDebug "[csvPostAsync] name" n
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
csvPostAsync' lId f log''
......
......@@ -46,7 +46,7 @@ import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataAnnuaire(..), HyperdataContact)
import Gargantext.Database.Admin.Types.Hyperdata.Contact (hyperdataContact)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (($), liftBase, (.), {-printDebug,-} pure)
import Gargantext.Prelude (($), {-printDebug,-} pure)
import qualified Gargantext.Utils.Aeson as GUA
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
......@@ -77,8 +77,8 @@ api_async u nId =
let
log' x = do
-- printDebug "addContact" x
liftBase $ (jobHandleLogger jHandle) x
in addContact u nId p (liftBase . log')
jobHandleLogger jHandle x
in addContact u nId p log'
addContact :: (HasSettings env, FlowCmdM env err m)
=> User
......
......@@ -70,7 +70,7 @@ type API = Summary " Document upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadDocumentJob $ \jHandle q -> do
documentUploadAsync uId nId q (liftBase . jobHandleLogger jHandle)
documentUploadAsync uId nId q (jobHandleLogger jHandle)
documentUploadAsync :: (FlowCmdM env err m)
=> UserId
......
......@@ -71,10 +71,7 @@ instance ToSchema Params
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI DocumentFromWriteNodeJob $ \jHandle p ->
let
log' x = do
liftBase $ (jobHandleLogger jHandle) x
in documentsFromWriteNodes uId nId p (liftBase . log')
documentsFromWriteNodes uId nId p (jobHandleLogger jHandle)
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -106,7 +106,7 @@ fileAsyncApi uId nId =
let
log' x = do
-- printDebug "addWithFile" x
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
in addWithFile uId nId i log'
......
......@@ -55,7 +55,7 @@ type API = Summary " FrameCalc upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadFrameCalcJob $ \jHandle p ->
frameCalcUploadAsync uId nId p (liftBase . jobHandleLogger jHandle) (jobLogInit 5)
frameCalcUploadAsync uId nId p (jobHandleLogger jHandle) (jobLogInit 5)
......
......@@ -78,7 +78,7 @@ postNodeAsyncAPI
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI NewNodeJob $ \jHandle p ->
postNodeAsync uId nId p (liftBase . jobHandleLogger jHandle)
postNodeAsync uId nId p (jobHandleLogger jHandle)
------------------------------------------------------------------------
postNodeAsync :: FlowCmdM env err m
......
......@@ -43,7 +43,7 @@ import Gargantext.Database.Query.Table.Node (defaultList, getNode)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), liftBase, (.), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
......@@ -98,8 +98,8 @@ api uId nId =
let
log' x = do
-- printDebug "updateNode" x
liftBase $ (jobHandleLogger jHandle) x
in updateNode uId nId p (liftBase . log')
jobHandleLogger jHandle x
in updateNode uId nId p log'
updateNode :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -284,7 +284,7 @@ addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI AddCorpusQueryJob $ \jHandle q -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . jobHandleLogger jHandle)
New.addToCorpusWithQuery user cid q (Just limit) (jobHandleLogger jHandle)
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
......@@ -296,7 +296,7 @@ addCorpusWithForm user cid =
let
log'' x = do
--printDebug "[addToCorpusWithForm] " x
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3)
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
......@@ -305,10 +305,10 @@ addCorpusWithFile user cid =
let
log'' x = do
-- printDebug "[addToCorpusWithFile]" x
liftBase $ (jobHandleLogger jHandle) x
jobHandleLogger jHandle x
in New.addToCorpusWithFile user cid i log''
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI AddAnnuaireFormJob $ \jHandle i ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . jobHandleLogger jHandle)
Annuaire.addToAnnuaireWithForm cid i (jobHandleLogger jHandle)
......@@ -258,7 +258,7 @@ type GraphAsyncAPI = Summary "Recompute graph"
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI RecomputeGraphJob $ \jHandle _ ->
graphRecompute u n (liftBase . jobHandleLogger jHandle)
graphRecompute u n (jobHandleLogger jHandle)
--graphRecompute :: UserId
......
......@@ -37,7 +37,7 @@ serveJobsAPI
, m ~ (GargM env GargError)
)
=> JobType m
-> (JobHandle (JobEventType m) -> input -> m (JobOutputType m))
-> (JobHandle m (JobEventType m) -> input -> m (JobOutputType m))
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i -> do
putStrLn ("Running job of type: " ++ show jobType)
......
......@@ -40,7 +40,7 @@ serveJobsAPI
=> m env
-> t
-> (JobError -> e)
-> (env -> JobHandle event -> input -> IO (Either e output))
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
......@@ -76,7 +76,7 @@ newJob
)
=> m env
-> t
-> (env -> JobHandle event -> input -> IO (Either e output))
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
......@@ -91,7 +91,7 @@ newJob getenv jobkind f input = do
logF w
f' jId inp logF = do
r <- f env (mkJobHandle jId (pushLog logF . Seq.singleton)) inp
r <- f env (mkJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
......
......@@ -6,6 +6,7 @@ module Gargantext.Utils.Jobs.Map (
, J(..)
, QueuedJob(..)
, RunningJob(..)
, LoggerM
, Logger
-- * Functions
......@@ -75,9 +76,12 @@ data RunningJob w a = RunningJob
, rjGetLog :: IO w
}
-- | Polymorphic logger over any monad @m@.
type LoggerM m w = w -> m ()
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = w -> IO ()
type Logger w = LoggerM IO w
newJobMap :: IO (JobMap jid w a)
newJobMap = JobMap <$> newTVarIO Map.empty
......
......@@ -11,6 +11,7 @@ module Gargantext.Utils.Jobs.Monad (
-- * Tracking jobs status
, MonadJobStatus(..)
, getLatestJobStatus
, updateJobProgress
-- * Functions
, newJobEnv
......@@ -179,17 +180,17 @@ removeJob queued t jid = do
-- | An opaque handle that abstracts over the concrete identifier for
-- a job. The constructor for this type is deliberately not exported.
data JobHandle event = JobHandle {
data JobHandle m event = JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: Logger event
, _jh_logger :: LoggerM m event
}
-- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe -> Logger event -> JobHandle event
mkJobHandle :: SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m event
mkJobHandle jId = JobHandle jId
jobHandleLogger :: JobHandle event -> Logger event
jobHandleLogger :: JobHandle m event -> LoggerM m event
jobHandleLogger (JobHandle _ lgr) = lgr
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
......@@ -198,19 +199,13 @@ class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJo
type JobOutputType m :: Type
type JobEventType m :: Type
instance MonadIO m => MonadJobStatus (ReaderT (JobEnv t (Seq event) a) m) where
type JobType (ReaderT (JobEnv t (Seq event) a) m) = t
type JobOutputType (ReaderT (JobEnv t (Seq event) a) m) = a
type JobEventType (ReaderT (JobEnv t (Seq event) a) m) = event
--
-- Tracking jobs status API
--
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: MonadJobStatus m => JobHandle (JobEventType m) -> m (Maybe (JobEventType m))
getLatestJobStatus :: MonadJobStatus m => JobHandle m (JobEventType m) -> m (Maybe (JobEventType m))
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- findJob jId
case mb_jb of
......@@ -224,3 +219,19 @@ getLatestJobStatus (JobHandle jId _) = do
DoneJ lgs _ -> pure $ case viewr lgs of
EmptyR -> Nothing
_ :> l -> Just l
updateJobProgress :: (Monoid (JobEventType m), MonadJobStatus m)
=> JobHandle m (JobEventType m)
-- ^ The handle that uniquely identifies this job.
-> (JobEventType m -> JobEventType m)
-- ^ A /pure/ function to update the 'JobEventType'. The input
-- is the /latest/ event, i.e. the current progress status. If
-- this is the first time we report progress and therefore there
-- is no previous progress status, this function will be applied
-- over 'mempty', thus the 'Monoid' constraint.
-> m ()
updateJobProgress hdl@(JobHandle _jId logStatus) updateJobStatus = do
latestStatus <- getLatestJobStatus hdl
case latestStatus of
Nothing -> logStatus (updateJobStatus mempty)
Just s -> logStatus (updateJobStatus s)
......@@ -11,7 +11,6 @@ import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Control.Monad.Except
import Data.Aeson
import Data.Either
import Data.List
......@@ -162,18 +161,29 @@ instance Exception MyDummyError where
instance ToJSON MyDummyError where
toJSON (SomethingWentWrong _) = String "SomethingWentWrong"
type Progress = Int
data MyDummyLog =
Step_0
| Step_1
Step_0 !Progress
| Step_1 !Progress
deriving (Show, Eq, Ord, Generic)
instance Monoid MyDummyLog where
mempty = Step_0 0
instance Semigroup MyDummyLog where
_ <> _ = error "not needed"
instance ToJSON MyDummyLog
newtype MyDummyEnv = MyDummyEnv { _MyDummyEnv :: JobEnv MyDummyJob (Seq MyDummyLog) () }
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv (ExceptT MyDummyError IO) a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv, MonadError MyDummyError)
MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv)
runMyDummyMonad :: MyDummyEnv -> MyDummyMonad a -> IO a
runMyDummyMonad env = flip runReaderT env . _MyDummyMonad
instance MonadJob MyDummyMonad MyDummyJob (Seq MyDummyLog) () where
getJobEnv = asks _MyDummyEnv
......@@ -193,14 +203,15 @@ shouldBeE a b = liftIO (shouldBe a b)
type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
withJob :: TheEnv
-> (TheEnv -> JobHandle MyDummyLog -> () -> IO (Either MyDummyError ()))
-> IO (Either MyDummyError (SJ.JobStatus 'SJ.Safe MyDummyLog))
withJob myEnv f = do
runExceptT $ flip runReaderT (MyDummyEnv myEnv) $ _MyDummyMonad $ do
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
f env hdl input) (SJ.JobInput () Nothing)
withJob_ :: TheEnv -> (TheEnv -> JobHandle MyDummyLog -> () -> IO (Either MyDummyError ())) -> IO ()
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO (SJ.JobStatus 'SJ.Safe MyDummyLog)
withJob myEnv f = runMyDummyMonad (MyDummyEnv myEnv) $
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
runMyDummyMonad (MyDummyEnv myEnv) $ f env hdl input) (SJ.JobInput () Nothing)
withJob_ :: TheEnv
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO ()
withJob_ env f = void (withJob env f)
testFetchJobStatus :: IO ()
......@@ -210,19 +221,21 @@ testFetchJobStatus = do
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts <- newMVar []
withJob_ myEnv $ \env hdl _input -> do
mb_status <- runReaderT (getLatestJobStatus hdl) env
withJob_ myEnv $ \_ hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
jobHandleLogger hdl Step_0
mb_status' <- runReaderT (getLatestJobStatus hdl) env
updateJobProgress hdl (const $ Step_0 20)
mb_status' <- getLatestJobStatus hdl
updateJobProgress hdl (\(Step_0 x) -> Step_0 (x + 5))
mb_status'' <- getLatestJobStatus hdl
modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : xs)
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure $ Right ()
threadDelay 500_000
-- Check the events
readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just Step_0]
readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just (Step_0 20), Just (Step_0 25)]
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
......@@ -233,23 +246,23 @@ testFetchJobStatusNoContention = do
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \env hdl _input -> do
jobHandleLogger hdl Step_1
mb_status <- runReaderT (getLatestJobStatus hdl) env
modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
let job1 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_1 100)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure $ Right ()
let job2 = \() -> withJob_ myEnv $ \env hdl _input -> do
jobHandleLogger hdl Step_0
mb_status <- runReaderT (getLatestJobStatus hdl) env
modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
let job2 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_0 50)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure $ Right ()
Async.forConcurrently_ [job1, job2] ($ ())
threadDelay 500_000
-- Check the events
readMVar evts1 >>= \expected -> expected `shouldBe` [Just Step_1]
readMVar evts2 >>= \expected -> expected `shouldBe` [Just Step_0]
readMVar evts1 >>= \expected -> expected `shouldBe` [Just (Step_1 100)]
readMVar evts2 >>= \expected -> expected `shouldBe` [Just (Step_0 50)]
main :: IO ()
main = hspec $ do
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment