Commit 80dda00e authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Move logging function inside JobHandle

parent 91831d90
Pipeline #3809 failed with stage
in 31 minutes and 57 seconds
......@@ -63,7 +63,7 @@ import Gargantext.Database.Query.Tree.Root (getRoot)
import Gargantext.Database.Schema.Node (NodePoly(_node_id))
import Gargantext.Prelude hiding (reverse)
import Gargantext.Prelude.Crypto.Pass.User (gargPass)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
import Servant.Auth.Server
import qualified Data.Text as Text
......@@ -268,8 +268,8 @@ type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI ForgotPasswordJob $ \_jHandle p log' ->
forgotPasswordAsync' p (liftBase . log')
serveJobsAPI ForgotPasswordJob $ \jHandle p ->
forgotPasswordAsync' p (liftBase . jobHandleLogger jHandle)
forgotPasswordAsync' :: (FlowCmdM env err m)
=> ForgotPasswordAsyncParams
......
......@@ -121,7 +121,7 @@ import Gargantext.Prelude hiding (log)
import Gargantext.Prelude.Clock (hasTime, getTime)
import Prelude (error)
import Servant hiding (Patch)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import System.IO (stderr)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary (Arbitrary, arbitrary)
......@@ -830,11 +830,11 @@ apiNgramsTableDoc dId = getTableNgramsDoc dId
apiNgramsAsync :: NodeId -> ServerT TableNgramsAsyncApi (GargM Env GargError)
apiNgramsAsync _dId =
serveJobsAPI TableNgramsJob $ \_jHandle i log ->
serveJobsAPI TableNgramsJob $ \jHandle i ->
let
log' x = do
printDebug "tableNgramsPostChartsAsync" x
liftBase $ log x
liftBase $ (jobHandleLogger jHandle) x
in tableNgramsPostChartsAsync i log'
-- Did the given list of ngrams changed since the given version?
......
......@@ -47,7 +47,7 @@ import Gargantext.Database.Schema.Ngrams
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
-- import Servant.Job.Async
import qualified Data.ByteString.Lazy as BSL
......@@ -192,11 +192,11 @@ toIndexedNgrams m t = Indexed <$> i <*> n
------------------------------------------------------------------------
jsonPostAsync :: ServerT JSONAPI (GargM Env GargError)
jsonPostAsync lId =
serveJobsAPI UpdateNgramsListJobJSON $ \_jHandle f log' ->
serveJobsAPI UpdateNgramsListJobJSON $ \jHandle f ->
let
log'' x = do
-- printDebug "postAsync ListId" x
liftBase $ log' x
liftBase $ (jobHandleLogger jHandle) x
in postAsync' lId f log''
postAsync' :: FlowCmdM env err m
......@@ -288,11 +288,11 @@ csvPost l m = do
------------------------------------------------------------------------
csvPostAsync :: ServerT CSVAPI (GargM Env GargError)
csvPostAsync lId =
serveJobsAPI UpdateNgramsListJobCSV $ \_jHandle f@(WithTextFile _ft _ _n) log' -> do
serveJobsAPI UpdateNgramsListJobCSV $ \jHandle f@(WithTextFile _ft _ _n) -> do
let log'' x = do
-- printDebug "[csvPostAsync] filetype" ft
-- printDebug "[csvPostAsync] name" n
liftBase $ log' x
liftBase $ (jobHandleLogger jHandle) x
csvPostAsync' lId f log''
......
......@@ -48,7 +48,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact (hyperdataContact)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (($), liftBase, (.), {-printDebug,-} pure)
import qualified Gargantext.Utils.Aeson as GUA
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
------------------------------------------------------------------------
type API = "contact" :> Summary "Contact endpoint"
......@@ -73,11 +73,11 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
----------------------------------------------------------------------
api_async :: User -> NodeId -> ServerT API_Async (GargM Env GargError)
api_async u nId =
serveJobsAPI AddContactJob $ \_jHandle p log ->
serveJobsAPI AddContactJob $ \jHandle p ->
let
log' x = do
-- printDebug "addContact" x
liftBase $ log x
liftBase $ (jobHandleLogger jHandle) x
in addContact u nId p (liftBase . log')
addContact :: (HasSettings env, FlowCmdM env err m)
......
......@@ -28,7 +28,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Prelude
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus(..))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
data DocumentUpload = DocumentUpload
......@@ -69,8 +69,8 @@ type API = Summary " Document upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadDocumentJob $ \_jHandle q log' -> do
documentUploadAsync uId nId q (liftBase . log')
serveJobsAPI UploadDocumentJob $ \jHandle q -> do
documentUploadAsync uId nId q (liftBase . jobHandleLogger jHandle)
documentUploadAsync :: (FlowCmdM env err m)
=> UserId
......
......@@ -43,7 +43,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getChildrenByType, getClosestParentIdByType', getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata, node_name, node_date)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Core.Text.Corpus.Parsers.Date (split')
import Servant
import Text.Read (readMaybe)
......@@ -70,10 +70,10 @@ instance ToSchema Params
------------------------------------------------------------------------
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI DocumentFromWriteNodeJob $ \_jHandle p log'' ->
serveJobsAPI DocumentFromWriteNodeJob $ \jHandle p ->
let
log' x = do
liftBase $ log'' x
liftBase $ (jobHandleLogger jHandle) x
in documentsFromWriteNodes uId nId p (liftBase . log')
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m)
......
......@@ -31,7 +31,7 @@ import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Data.Either
data RESPONSE deriving Typeable
......@@ -102,11 +102,11 @@ type FileAsyncApi = Summary "File Async Api"
fileAsyncApi :: UserId -> NodeId -> ServerT FileAsyncApi (GargM Env GargError)
fileAsyncApi uId nId =
serveJobsAPI AddFileJob $ \_jHandle i l ->
serveJobsAPI AddFileJob $ \jHandle i ->
let
log' x = do
-- printDebug "addWithFile" x
liftBase $ l x
liftBase $ (jobHandleLogger jHandle) x
in addWithFile uId nId i log'
......
......@@ -32,7 +32,7 @@ import Gargantext.Database.Prelude (HasConfig)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Core (Lang)
data FrameCalcUpload = FrameCalcUpload {
......@@ -54,8 +54,8 @@ type API = Summary " FrameCalc upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadFrameCalcJob $ \_jHandle p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
serveJobsAPI UploadFrameCalcJob $ \jHandle p ->
frameCalcUploadAsync uId nId p (liftBase . jobHandleLogger jHandle) (jobLogInit 5)
......
......@@ -41,7 +41,7 @@ import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.Node.User
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
------------------------------------------------------------------------
data PostNode = PostNode { pn_name :: Text
......@@ -77,8 +77,8 @@ type PostNodeAsync = Summary "Post Node"
postNodeAsyncAPI
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI NewNodeJob $ \_jHandle p logs ->
postNodeAsync uId nId p (liftBase . logs)
serveJobsAPI NewNodeJob $ \jHandle p ->
postNodeAsync uId nId p (liftBase . jobHandleLogger jHandle)
------------------------------------------------------------------------
postNodeAsync :: FlowCmdM env err m
......
......@@ -44,7 +44,7 @@ import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), liftBase, (.), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
import Test.QuickCheck (elements)
......@@ -94,11 +94,11 @@ data Charts = Sources | Authors | Institutes | Ngrams | All
------------------------------------------------------------------------
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UpdateNodeJob $ \_jHandle p log'' ->
serveJobsAPI UpdateNodeJob $ \jHandle p ->
let
log' x = do
-- printDebug "updateNode" x
liftBase $ log'' x
liftBase $ (jobHandleLogger jHandle) x
in updateNode uId nId p (liftBase . log')
updateNode :: (HasSettings env, FlowCmdM env err m)
......
......@@ -45,7 +45,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (HasConfig(..))
import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_scrapers)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import qualified Gargantext.API.GraphQL as GraphQL
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
......@@ -282,9 +282,9 @@ waitAPI n = do
addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI AddCorpusQueryJob $ \_jHandle q log' -> do
serveJobsAPI AddCorpusQueryJob $ \jHandle q -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . log')
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . jobHandleLogger jHandle)
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
......@@ -292,23 +292,23 @@ addCorpusWithQuery user cid =
addCorpusWithForm :: User -> ServerT New.AddWithForm (GargM Env GargError)
addCorpusWithForm user cid =
serveJobsAPI AddCorpusFormJob $ \_jHandle i log' ->
serveJobsAPI AddCorpusFormJob $ \jHandle i ->
let
log'' x = do
--printDebug "[addToCorpusWithForm] " x
liftBase $ log' x
liftBase $ (jobHandleLogger jHandle) x
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3)
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
addCorpusWithFile user cid =
serveJobsAPI AddCorpusFileJob $ \_jHandle i log' ->
serveJobsAPI AddCorpusFileJob $ \jHandle i ->
let
log'' x = do
-- printDebug "[addToCorpusWithFile]" x
liftBase $ log' x
liftBase $ (jobHandleLogger jHandle) x
in New.addToCorpusWithFile user cid i log''
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI AddAnnuaireFormJob $ \_jHandle i log' ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . log')
serveJobsAPI AddAnnuaireFormJob $ \jHandle i ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . jobHandleLogger jHandle)
......@@ -47,7 +47,7 @@ import Gargantext.Database.Query.Table.Node.User (getNodeUser)
import Gargantext.Database.Schema.Node
import Gargantext.Database.Schema.Ngrams
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
import Servant.Job.Async (AsyncJobsAPI)
import Servant.XML
......@@ -257,8 +257,8 @@ type GraphAsyncAPI = Summary "Recompute graph"
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI RecomputeGraphJob $ \_ _jHandle log' ->
graphRecompute u n (liftBase . log')
serveJobsAPI RecomputeGraphJob $ \jHandle _ ->
graphRecompute u n (liftBase . jobHandleLogger jHandle)
--graphRecompute :: UserId
......
......@@ -4,6 +4,8 @@ module Gargantext.Utils.Jobs (
serveJobsAPI
-- * Parsing and reading @GargJob@s from disk
, readPrios
-- * Handy re-exports
, jobHandleLogger
) where
import Control.Monad.Except
......@@ -16,7 +18,6 @@ import Text.Read (readMaybe)
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Prelude
import qualified Gargantext.Utils.Jobs.Internal as Internal
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import qualified Servant.Job.Async as SJ
......@@ -36,11 +37,11 @@ serveJobsAPI
, m ~ (GargM env GargError)
)
=> JobType m
-> (JobHandle -> input -> Logger (JobEventType m) -> m (JobOutputType m))
-> (JobHandle (JobEventType m) -> input -> m (JobOutputType m))
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i l -> do
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i -> do
putStrLn ("Running job of type: " ++ show jobType)
runExceptT $ runReaderT (f jHandle i l) env
runExceptT $ runReaderT (f jHandle i) env
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
......
{-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.Utils.Jobs.Internal (
serveJobsAPI
-- * Internals for testing
......@@ -38,7 +40,7 @@ serveJobsAPI
=> m env
-> t
-> (JobError -> e)
-> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle event -> input -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
......@@ -74,7 +76,7 @@ newJob
)
=> m env
-> t
-> (env -> JobHandle -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle event -> input -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
......@@ -84,12 +86,12 @@ newJob getenv jobkind f input = do
C.runClientM (SJ.clientMCallback m)
(C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
pushLog logF e = do
postCallback (SJ.mkChanEvent e)
logF e
pushLog logF = \w -> do
postCallback (SJ.mkChanEvent w)
logF w
f' jId inp logF = do
r <- f env (unsafeMkJobHandle jId) inp (pushLog logF . Seq.singleton)
r <- f env (mkJobHandle jId (pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
......
......@@ -26,7 +26,8 @@ module Gargantext.Utils.Jobs.Monad (
, withJob
, handleIDError
, removeJob
, unsafeMkJobHandle
, mkJobHandle
, jobHandleLogger
) where
import Gargantext.Utils.Jobs.Settings
......@@ -178,12 +179,18 @@ removeJob queued t jid = do
-- | An opaque handle that abstracts over the concrete identifier for
-- a job. The constructor for this type is deliberately not exported.
newtype JobHandle =
JobHandle { _jh_id :: SJ.JobID 'SJ.Safe }
deriving (Eq, Ord)
data JobHandle event = JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: Logger event
}
-- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe -> Logger event -> JobHandle event
mkJobHandle jId = JobHandle jId
unsafeMkJobHandle :: SJ.JobID 'SJ.Safe -> JobHandle
unsafeMkJobHandle = JobHandle
jobHandleLogger :: JobHandle event -> Logger event
jobHandleLogger (JobHandle _ lgr) = lgr
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
......@@ -203,8 +210,8 @@ instance MonadIO m => MonadJobStatus (ReaderT (JobEnv t (Seq event) a) m) where
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: MonadJobStatus m => JobHandle -> m (Maybe (JobEventType m))
getLatestJobStatus (JobHandle jId) = do
getLatestJobStatus :: MonadJobStatus m => JobHandle (JobEventType m) -> m (Maybe (JobEventType m))
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- findJob jId
case mb_jb of
Nothing -> pure Nothing
......
......@@ -6,6 +6,7 @@
module Main where
import Control.Concurrent
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
......@@ -192,14 +193,14 @@ shouldBeE a b = liftIO (shouldBe a b)
type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
withJob :: TheEnv
-> (TheEnv -> JobHandle -> () -> Logger MyDummyLog -> IO (Either MyDummyError ()))
-> (TheEnv -> JobHandle MyDummyLog -> () -> IO (Either MyDummyError ()))
-> IO (Either MyDummyError (SJ.JobStatus 'SJ.Safe MyDummyLog))
withJob myEnv f = do
runExceptT $ flip runReaderT (MyDummyEnv myEnv) $ _MyDummyMonad $ do
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input logStatus ->
f env hdl input logStatus) (SJ.JobInput () Nothing)
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
f env hdl input) (SJ.JobInput () Nothing)
withJob_ :: TheEnv -> (TheEnv -> JobHandle -> () -> Logger MyDummyLog -> IO (Either MyDummyError ())) -> IO ()
withJob_ :: TheEnv -> (TheEnv -> JobHandle MyDummyLog -> () -> IO (Either MyDummyError ())) -> IO ()
withJob_ env f = void (withJob env f)
testFetchJobStatus :: IO ()
......@@ -209,11 +210,11 @@ testFetchJobStatus = do
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts <- newMVar []
withJob_ myEnv $ \env hdl _input logStatus -> do
withJob_ myEnv $ \env hdl _input -> do
mb_status <- runReaderT (getLatestJobStatus hdl) env
-- now let's log something
logStatus Step_0
jobHandleLogger hdl Step_0
mb_status' <- runReaderT (getLatestJobStatus hdl) env
modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : xs)
......@@ -232,18 +233,19 @@ testFetchJobStatusNoContention = do
evts1 <- newMVar []
evts2 <- newMVar []
withJob_ myEnv $ \env hdl _input logStatus -> do
logStatus Step_1
let job1 = \() -> withJob_ myEnv $ \env hdl _input -> do
jobHandleLogger hdl Step_1
mb_status <- runReaderT (getLatestJobStatus hdl) env
modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure $ Right ()
withJob_ myEnv $ \env hdl _input logStatus -> do
logStatus Step_0
let job2 = \() -> withJob_ myEnv $ \env hdl _input -> do
jobHandleLogger hdl Step_0
mb_status <- runReaderT (getLatestJobStatus hdl) env
modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure $ Right ()
Async.forConcurrently_ [job1, job2] ($ ())
threadDelay 500_000
-- Check the events
readMVar evts1 >>= \expected -> expected `shouldBe` [Just Step_1]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment