Commit c08b0410 authored by Alexandre Delanoë's avatar Alexandre Delanoë

Merge remote-tracking branch 'origin/adinapoli/issue-185-job-api' into dev-merge

parents c14de336 2faf39cc
......@@ -101,7 +101,7 @@ library
Gargantext.Database.Schema.Ngrams
Gargantext.Defaults
Gargantext.Utils.Jobs
Gargantext.Utils.Jobs.API
Gargantext.Utils.Jobs.Internal
Gargantext.Utils.Jobs.Map
Gargantext.Utils.Jobs.Monad
Gargantext.Utils.Jobs.Queue
......@@ -899,11 +899,17 @@ test-suite jobqueue-test
StrictData
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
build-depends:
async
aeson
, async
, base
, containers
, extra
, gargantext
, hspec
, http-client
, http-client-tls
, mtl
, servant-job
, stm
, text
default-language: Haskell2010
......@@ -126,7 +126,7 @@ library:
- Gargantext.Database.Schema.Ngrams
- Gargantext.Defaults
- Gargantext.Utils.Jobs
- Gargantext.Utils.Jobs.API
- Gargantext.Utils.Jobs.Internal
- Gargantext.Utils.Jobs.Map
- Gargantext.Utils.Jobs.Monad
- Gargantext.Utils.Jobs.Queue
......@@ -517,10 +517,16 @@ tests:
- -rtsopts
- -with-rtsopts=-N
dependencies:
- aeson
- async
- base
- containers
- gargantext
- mtl
- hspec
- async
- http-client
- http-client-tls
- servant-job
- stm
# garg-doctest:
# main: Main.hs
......
......@@ -63,7 +63,7 @@ import Gargantext.Database.Query.Tree.Root (getRoot)
import Gargantext.Database.Schema.Node (NodePoly(_node_id))
import Gargantext.Prelude hiding (reverse)
import Gargantext.Prelude.Crypto.Pass.User (gargPass)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
import Servant.Auth.Server
import qualified Data.Text as Text
......@@ -268,8 +268,8 @@ type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI ForgotPasswordJob $ \p log' ->
forgotPasswordAsync' p (liftBase . log')
serveJobsAPI ForgotPasswordJob $ \jHandle p ->
forgotPasswordAsync' p (jobHandleLogger jHandle)
forgotPasswordAsync' :: (FlowCmdM env err m)
=> ForgotPasswordAsyncParams
......
-- |
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.API.Admin.EnvTypes where
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader
import Data.Monoid
import Data.Pool (Pool)
import Data.Sequence (Seq)
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
import Network.HTTP.Client (Manager)
......@@ -57,7 +58,7 @@ data Env = Env
, _env_manager :: !Manager
, _env_self_url :: !BaseUrl
, _env_scrapers :: !ScrapersEnv
, _env_jobs :: !(Jobs.JobEnv GargJob (Dual [JobLog]) JobLog)
, _env_jobs :: !(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
, _env_config :: !GargConfig
, _env_mail :: !MailConfig
, _env_nlp :: !NLPServerMap
......@@ -102,9 +103,14 @@ instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Dual [JobLog]) JobLog where
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs)
instance Jobs.MonadJobStatus (ReaderT Env (ExceptT GargError IO)) where
type JobType (ReaderT Env (ExceptT GargError IO)) = GargJob
type JobOutputType (ReaderT Env (ExceptT GargError IO)) = JobLog
type JobEventType (ReaderT Env (ExceptT GargError IO)) = JobLog
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
}
......
......@@ -121,7 +121,7 @@ import Gargantext.Prelude hiding (log)
import Gargantext.Prelude.Clock (hasTime, getTime)
import Prelude (error)
import Servant hiding (Patch)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import System.IO (stderr)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary (Arbitrary, arbitrary)
......@@ -830,11 +830,11 @@ apiNgramsTableDoc dId = getTableNgramsDoc dId
apiNgramsAsync :: NodeId -> ServerT TableNgramsAsyncApi (GargM Env GargError)
apiNgramsAsync _dId =
serveJobsAPI TableNgramsJob $ \i log ->
serveJobsAPI TableNgramsJob $ \jHandle i ->
let
log' x = do
printDebug "tableNgramsPostChartsAsync" x
liftBase $ log x
jobHandleLogger jHandle x
in tableNgramsPostChartsAsync i log'
-- Did the given list of ngrams changed since the given version?
......
......@@ -47,7 +47,7 @@ import Gargantext.Database.Schema.Ngrams
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
-- import Servant.Job.Async
import qualified Data.ByteString.Lazy as BSL
......@@ -192,11 +192,11 @@ toIndexedNgrams m t = Indexed <$> i <*> n
------------------------------------------------------------------------
jsonPostAsync :: ServerT JSONAPI (GargM Env GargError)
jsonPostAsync lId =
serveJobsAPI UpdateNgramsListJobJSON $ \f log' ->
serveJobsAPI UpdateNgramsListJobJSON $ \jHandle f ->
let
log'' x = do
-- printDebug "postAsync ListId" x
liftBase $ log' x
jobHandleLogger jHandle x
in postAsync' lId f log''
postAsync' :: FlowCmdM env err m
......@@ -288,11 +288,11 @@ csvPost l m = do
------------------------------------------------------------------------
csvPostAsync :: ServerT CSVAPI (GargM Env GargError)
csvPostAsync lId =
serveJobsAPI UpdateNgramsListJobCSV $ \f@(WithTextFile _ft _ _n) log' -> do
serveJobsAPI UpdateNgramsListJobCSV $ \jHandle f@(WithTextFile _ft _ _n) -> do
let log'' x = do
-- printDebug "[csvPostAsync] filetype" ft
-- printDebug "[csvPostAsync] name" n
liftBase $ log' x
jobHandleLogger jHandle x
csvPostAsync' lId f log''
......
......@@ -46,9 +46,9 @@ import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataAnnuaire(..), HyperdataContact)
import Gargantext.Database.Admin.Types.Hyperdata.Contact (hyperdataContact)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (($), liftBase, (.), {-printDebug,-} pure)
import Gargantext.Prelude (($), {-printDebug,-} pure)
import qualified Gargantext.Utils.Aeson as GUA
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
------------------------------------------------------------------------
type API = "contact" :> Summary "Contact endpoint"
......@@ -73,12 +73,12 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
----------------------------------------------------------------------
api_async :: User -> NodeId -> ServerT API_Async (GargM Env GargError)
api_async u nId =
serveJobsAPI AddContactJob $ \p log ->
serveJobsAPI AddContactJob $ \jHandle p ->
let
log' x = do
-- printDebug "addContact" x
liftBase $ log x
in addContact u nId p (liftBase . log')
jobHandleLogger jHandle x
in addContact u nId p log'
addContact :: (HasSettings env, FlowCmdM env err m)
=> User
......
......@@ -28,7 +28,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Prelude
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus(..))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
data DocumentUpload = DocumentUpload
......@@ -69,8 +69,8 @@ type API = Summary " Document upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadDocumentJob $ \q log' -> do
documentUploadAsync uId nId q (liftBase . log')
serveJobsAPI UploadDocumentJob $ \jHandle q -> do
documentUploadAsync uId nId q (jobHandleLogger jHandle)
documentUploadAsync :: (FlowCmdM env err m)
=> UserId
......
......@@ -43,7 +43,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getChildrenByType, getClosestParentIdByType', getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata, node_name, node_date)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Core.Text.Corpus.Parsers.Date (split')
import Servant
import Text.Read (readMaybe)
......@@ -70,11 +70,8 @@ instance ToSchema Params
------------------------------------------------------------------------
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI DocumentFromWriteNodeJob $ \p log'' ->
let
log' x = do
liftBase $ log'' x
in documentsFromWriteNodes uId nId p (liftBase . log')
serveJobsAPI DocumentFromWriteNodeJob $ \jHandle p ->
documentsFromWriteNodes uId nId p (jobHandleLogger jHandle)
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -31,7 +31,7 @@ import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Data.Either
data RESPONSE deriving Typeable
......@@ -102,11 +102,11 @@ type FileAsyncApi = Summary "File Async Api"
fileAsyncApi :: UserId -> NodeId -> ServerT FileAsyncApi (GargM Env GargError)
fileAsyncApi uId nId =
serveJobsAPI AddFileJob $ \i l ->
serveJobsAPI AddFileJob $ \jHandle i ->
let
log' x = do
-- printDebug "addWithFile" x
liftBase $ l x
jobHandleLogger jHandle x
in addWithFile uId nId i log'
......
......@@ -32,7 +32,7 @@ import Gargantext.Database.Prelude (HasConfig)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Core (Lang)
data FrameCalcUpload = FrameCalcUpload {
......@@ -54,8 +54,8 @@ type API = Summary " FrameCalc upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadFrameCalcJob $ \p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
serveJobsAPI UploadFrameCalcJob $ \jHandle p ->
frameCalcUploadAsync uId nId p (jobHandleLogger jHandle) (jobLogInit 5)
......
......@@ -41,7 +41,7 @@ import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.Node.User
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
------------------------------------------------------------------------
data PostNode = PostNode { pn_name :: Text
......@@ -77,8 +77,8 @@ type PostNodeAsync = Summary "Post Node"
postNodeAsyncAPI
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI NewNodeJob $ \p logs ->
postNodeAsync uId nId p (liftBase . logs)
serveJobsAPI NewNodeJob $ \jHandle p ->
postNodeAsync uId nId p (jobHandleLogger jHandle)
------------------------------------------------------------------------
postNodeAsync :: FlowCmdM env err m
......
......@@ -43,8 +43,8 @@ import Gargantext.Database.Query.Table.Node (defaultList, getNode)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), liftBase, (.), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
import Test.QuickCheck (elements)
......@@ -94,12 +94,12 @@ data Charts = Sources | Authors | Institutes | Ngrams | All
------------------------------------------------------------------------
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UpdateNodeJob $ \p log'' ->
serveJobsAPI UpdateNodeJob $ \jHandle p ->
let
log' x = do
-- printDebug "updateNode" x
liftBase $ log'' x
in updateNode uId nId p (liftBase . log')
jobHandleLogger jHandle x
in updateNode uId nId p log'
updateNode :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -45,7 +45,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (HasConfig(..))
import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_scrapers)
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import qualified Gargantext.API.GraphQL as GraphQL
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
......@@ -282,9 +282,9 @@ waitAPI n = do
addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI AddCorpusQueryJob $ \q log' -> do
serveJobsAPI AddCorpusQueryJob $ \jHandle q -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . log')
New.addToCorpusWithQuery user cid q (Just limit) (jobHandleLogger jHandle)
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
......@@ -292,23 +292,23 @@ addCorpusWithQuery user cid =
addCorpusWithForm :: User -> ServerT New.AddWithForm (GargM Env GargError)
addCorpusWithForm user cid =
serveJobsAPI AddCorpusFormJob $ \i log' ->
serveJobsAPI AddCorpusFormJob $ \jHandle i ->
let
log'' x = do
--printDebug "[addToCorpusWithForm] " x
liftBase $ log' x
jobHandleLogger jHandle x
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3)
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
addCorpusWithFile user cid =
serveJobsAPI AddCorpusFileJob $ \i log' ->
serveJobsAPI AddCorpusFileJob $ \jHandle i ->
let
log'' x = do
-- printDebug "[addToCorpusWithFile]" x
liftBase $ log' x
jobHandleLogger jHandle x
in New.addToCorpusWithFile user cid i log''
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI AddAnnuaireFormJob $ \i log' ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . log')
serveJobsAPI AddAnnuaireFormJob $ \jHandle i ->
Annuaire.addToAnnuaireWithForm cid i (jobHandleLogger jHandle)
......@@ -47,7 +47,7 @@ import Gargantext.Database.Query.Table.Node.User (getNodeUser)
import Gargantext.Database.Schema.Node
import Gargantext.Database.Schema.Ngrams
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Servant
import Servant.Job.Async (AsyncJobsAPI)
import Servant.XML
......@@ -257,8 +257,8 @@ type GraphAsyncAPI = Summary "Recompute graph"
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI RecomputeGraphJob $ \_ log' ->
graphRecompute u n (liftBase . log')
serveJobsAPI RecomputeGraphJob $ \jHandle _ ->
graphRecompute u n (jobHandleLogger jHandle)
--graphRecompute :: UserId
......
module Gargantext.Utils.Jobs where
{-# LANGUAGE TypeFamilies #-}
module Gargantext.Utils.Jobs (
-- * Serving the JOBS API
serveJobsAPI
-- * Parsing and reading @GargJob@s from disk
, readPrios
-- * Handy re-exports
, jobHandleLogger
) where
import Control.Monad.Except
import Control.Monad.Reader
import Data.Aeson (ToJSON)
import Prelude
import System.Directory (doesFileExist)
import Text.Read (readMaybe)
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Prelude
import qualified Gargantext.Utils.Jobs.API as API
import Gargantext.Utils.Jobs.Map
import qualified Gargantext.Utils.Jobs.Internal as Internal
import Gargantext.Utils.Jobs.Monad
import qualified Servant.Job.Async as SJ
......@@ -20,17 +27,21 @@ jobErrorToGargError
jobErrorToGargError = GargJobError
serveJobsAPI
:: Foldable callbacks
=> GargJob
-> (input -> Logger JobLog -> GargM Env GargError JobLog)
-> JobsServerAPI ctI ctO callbacks input
serveJobsAPI t f = API.serveJobsAPI ask t jobErrorToGargError $ \env i l -> do
putStrLn ("Running job of type: " ++ show t)
runExceptT $ runReaderT (f i l) env
type JobsServerAPI ctI ctO callbacks input =
SJ.AsyncJobsServerT' ctI ctO callbacks JobLog input JobLog
(GargM Env GargError)
:: (
Foldable callbacks
, Ord (JobType m)
, Show (JobType m)
, ToJSON (JobEventType m)
, ToJSON (JobOutputType m)
, MonadJobStatus m
, m ~ (GargM env GargError)
)
=> JobType m
-> (JobHandle m (JobEventType m) -> input -> m (JobOutputType m))
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i -> do
putStrLn ("Running job of type: " ++ show jobType)
runExceptT $ runReaderT (f jHandle i) env
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
......
{-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.API where
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.Utils.Jobs.Internal (
serveJobsAPI
-- * Internals for testing
, newJob
) where
import Control.Concurrent
import Control.Concurrent.Async
......@@ -8,8 +14,11 @@ import Control.Lens
import Control.Monad
import Control.Monad.Except
import Data.Aeson (ToJSON)
import Data.Foldable (toList)
import Data.Monoid
import Data.Kind (Type)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Prelude
import Servant.API
......@@ -24,14 +33,14 @@ import qualified Servant.Job.Types as SJ
serveJobsAPI
:: ( Ord t, Exception e, MonadError e m
, MonadJob m t (Dual [event]) output
, MonadJob m t (Seq event) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callback
)
=> m env
-> t
-> (JobError -> e)
-> (env -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
......@@ -40,7 +49,7 @@ serveJobsAPI getenv t joberr f
serveJobAPI
:: forall (m :: Type -> Type) e t event output.
(Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
(Ord t, MonadError e m, MonadJob m t (Seq event) output)
=> t
-> (JobError -> e)
-> SJ.JobID 'SJ.Unsafe
......@@ -51,7 +60,7 @@ serveJobAPI t joberr jid' = wrap' (killJob t)
where wrap
:: forall a.
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
-> m a
wrap g = do
jid <- handleIDError joberr (checkJID jid')
......@@ -61,13 +70,13 @@ serveJobAPI t joberr jid' = wrap' (killJob t)
wrap' g limit offset = wrap (g limit offset)
newJob
:: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
:: ( Ord t, Exception e, MonadJob m t (Seq event) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callbacks
)
=> m env
-> t
-> (env -> input -> Logger event -> IO (Either e output))
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
......@@ -77,12 +86,12 @@ newJob getenv jobkind f input = do
C.runClientM (SJ.clientMCallback m)
(C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
pushLog logF e = do
postCallback (SJ.mkChanEvent e)
logF e
pushLog logF = \w -> do
postCallback (SJ.mkChanEvent w)
logF w
f' inp logF = do
r <- f env inp (pushLog logF . Dual . (:[]))
f' jId inp logF = do
r <- f env (mkJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
......@@ -91,14 +100,14 @@ newJob getenv jobkind f input = do
return (SJ.JobStatus jid [] SJ.IsPending Nothing)
pollJob
:: MonadJob m t (Dual [event]) output
:: MonadJob m t (Seq event) output
=> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
pollJob limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
(logs, status, merr) <- case jTask je of
QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
<*> pure SJ.IsRunning
......@@ -107,13 +116,13 @@ pollJob limit offset jid je = do
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
in pure (ls, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
waitJob
:: (MonadError e m, MonadJob m t (Dual [event]) output)
:: (MonadError e m, MonadJob m t (Seq event) output)
=> (JobError -> e)
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobOutput output)
waitJob joberr jid je = do
r <- case jTask je of
......@@ -143,15 +152,15 @@ waitJob joberr jid je = do
DoneJ _ls res -> return (Left res)
killJob
:: (Ord t, MonadJob m t (Dual [event]) output)
:: (Ord t, MonadJob m t (Seq event) output)
=> t
-> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
killJob t limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
(logs, status, merr) <- case jTask je of
QueuedJ _ -> do
removeJob True t jid
return (mempty, SJ.IsKilled, Nothing)
......@@ -165,4 +174,4 @@ killJob t limit offset jid je = do
me = either (Just . T.pack . show) (const Nothing) r
removeJob False t jid
pure (lgs, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
{-# LANGUAGE GADTs #-}
module Gargantext.Utils.Jobs.Map where
module Gargantext.Utils.Jobs.Map (
-- * Types
JobMap(..)
, JobEntry(..)
, J(..)
, QueuedJob(..)
, RunningJob(..)
, LoggerM
, Logger
-- * Functions
, newJobMap
, lookupJob
, gcThread
, jobLog
, addJobEntry
, deleteJob
, runJob
, waitJobDone
, runJ
, waitJ
, pollJ
, killJ
) where
import Control.Concurrent
import Control.Concurrent.Async
......@@ -53,9 +76,12 @@ data RunningJob w a = RunningJob
, rjGetLog :: IO w
}
-- | Polymorphic logger over any monad @m@.
type LoggerM m w = w -> m ()
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = w -> IO ()
type Logger w = LoggerM IO w
newJobMap :: IO (JobMap jid w a)
newJobMap = JobMap <$> newTVarIO Map.empty
......@@ -99,14 +125,14 @@ addJobEntry
:: Ord jid
=> jid
-> a
-> (a -> Logger w -> IO r)
-> (jid -> a -> Logger w -> IO r)
-> JobMap jid w r
-> IO (JobEntry jid w r)
addJobEntry jid input f (JobMap mvar) = do
now <- getCurrentTime
let je = JobEntry
{ jID = jid
, jTask = QueuedJ (QueuedJob input f)
, jTask = QueuedJ (QueuedJob input (f jid))
, jRegistered = now
, jTimeoutAfter = Nothing
, jStarted = Nothing
......
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses #-}
module Gargantext.Utils.Jobs.Monad where
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
module Gargantext.Utils.Jobs.Monad (
-- * Types and classes
JobEnv(..)
, NumRunners
, JobError(..)
, JobHandle -- opaque
, MonadJob(..)
-- * Tracking jobs status
, MonadJobStatus(..)
, getLatestJobStatus
, updateJobProgress
-- * Functions
, newJobEnv
, defaultJobSettings
, genSecret
, getJobsSettings
, getJobsState
, getJobsMap
, getJobsQueue
, queueJob
, findJob
, checkJID
, withJob
, handleIDError
, removeJob
, mkJobHandle
, jobHandleLogger
) where
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.Map
......@@ -9,7 +39,11 @@ import Gargantext.Utils.Jobs.State
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Control.Monad.Reader
import Data.Functor ((<&>))
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Sequence (Seq, viewr, ViewR(..))
import Data.Time.Clock
import Network.HTTP.Client (Manager)
import Prelude
......@@ -48,6 +82,9 @@ genSecret = SJ.generateSecretKey
class MonadIO m => MonadJob m t w a | m -> t w a where
getJobEnv :: m (JobEnv t w a)
instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
getJobEnv = ask
getJobsSettings :: MonadJob m t w a => m JobSettings
getJobsSettings = jeSettings <$> getJobEnv
......@@ -64,7 +101,7 @@ queueJob
:: (MonadJob m t w a, Ord t)
=> t
-> i
-> (i -> Logger w -> IO a)
-> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
-> m (SJ.JobID 'SJ.Safe)
queueJob jobkind input f = do
js <- getJobsSettings
......@@ -136,3 +173,65 @@ removeJob queued t jid = do
when queued $
deleteQueue t jid q
deleteJob jid m
--
-- Tracking jobs status
--
-- | An opaque handle that abstracts over the concrete identifier for
-- a job. The constructor for this type is deliberately not exported.
data JobHandle m event = JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: LoggerM m event
}
-- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m event
mkJobHandle jId = JobHandle jId
jobHandleLogger :: JobHandle m event -> LoggerM m event
jobHandleLogger (JobHandle _ lgr) = lgr
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
--
-- Tracking jobs status API
--
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: MonadJobStatus m => JobHandle m (JobEventType m) -> m (Maybe (JobEventType m))
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- findJob jId
case mb_jb of
Nothing -> pure Nothing
Just j -> case jTask j of
QueuedJ _ -> pure Nothing
RunningJ rj -> liftIO (rjGetLog rj) <&>
\lgs -> case viewr lgs of
EmptyR -> Nothing
_ :> l -> Just l
DoneJ lgs _ -> pure $ case viewr lgs of
EmptyR -> Nothing
_ :> l -> Just l
updateJobProgress :: (Monoid (JobEventType m), MonadJobStatus m)
=> JobHandle m (JobEventType m)
-- ^ The handle that uniquely identifies this job.
-> (JobEventType m -> JobEventType m)
-- ^ A /pure/ function to update the 'JobEventType'. The input
-- is the /latest/ event, i.e. the current progress status. If
-- this is the first time we report progress and therefore there
-- is no previous progress status, this function will be applied
-- over 'mempty', thus the 'Monoid' constraint.
-> m ()
updateJobProgress hdl@(JobHandle _jId logStatus) updateJobStatus = do
latestStatus <- getLatestJobStatus hdl
case latestStatus of
Nothing -> logStatus (updateJobStatus mempty)
Just s -> logStatus (updateJobStatus s)
......@@ -76,7 +76,7 @@ pushJob
:: Ord t
=> t
-> a
-> (a -> Logger w -> IO r)
-> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> IO (SJ.JobID 'SJ.Safe)
......
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE NumericUnderscores #-}
module Main where
import Control.Concurrent
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Reader
import Data.Aeson
import Data.Either
import Data.List
import Data.Sequence (Seq)
import GHC.Generics
import GHC.Stack
import Prelude
import System.IO.Unsafe
import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Client (Manager)
import Test.Hspec
import qualified Servant.Job.Types as SJ
import qualified Servant.Job.Core as SJ
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.State
......@@ -36,7 +53,7 @@ testMaxRunners = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO []
let j num _inp _l = do
let j num _jHandle _inp _l = do
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
threadDelay jobDuration
atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
......@@ -59,7 +76,7 @@ testPrios = do
st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios [(B, 10)] defaultPrios -- B has higher priority
runningJs <- newTVarIO (Counts 0 0)
let j jobt _inp _l = do
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
......@@ -86,7 +103,7 @@ testExceptions = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
(\_jHandle _inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
threadDelay initialDelay
mjob <- lookupJob jid (jobsData st)
......@@ -103,7 +120,7 @@ testFairness = do
let settings = defaultJobSettings 2 k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO (Counts 0 0)
let j jobt _inp _l = do
let j jobt _jHandle _inp _l = do
atomically $ modifyTVar runningJs (inc jobt)
threadDelay jobDuration
atomically $ modifyTVar runningJs (dec jobt)
......@@ -130,6 +147,123 @@ testFairness = do
r4 <- readTVarIO runningJs
r4 `shouldBe` (Counts 0 0)
data MyDummyJob
= MyDummyJob
deriving (Show, Eq, Ord, Enum, Bounded)
data MyDummyError
= SomethingWentWrong JobError
deriving (Show)
instance Exception MyDummyError where
toException _ = toException (userError "SomethingWentWrong")
instance ToJSON MyDummyError where
toJSON (SomethingWentWrong _) = String "SomethingWentWrong"
type Progress = Int
data MyDummyLog =
Step_0 !Progress
| Step_1 !Progress
deriving (Show, Eq, Ord, Generic)
instance Monoid MyDummyLog where
mempty = Step_0 0
instance Semigroup MyDummyLog where
_ <> _ = error "not needed"
instance ToJSON MyDummyLog
newtype MyDummyEnv = MyDummyEnv { _MyDummyEnv :: JobEnv MyDummyJob (Seq MyDummyLog) () }
newtype MyDummyMonad a =
MyDummyMonad { _MyDummyMonad :: ReaderT MyDummyEnv IO a }
deriving (Functor, Applicative, Monad, MonadIO, MonadReader MyDummyEnv)
runMyDummyMonad :: MyDummyEnv -> MyDummyMonad a -> IO a
runMyDummyMonad env = flip runReaderT env . _MyDummyMonad
instance MonadJob MyDummyMonad MyDummyJob (Seq MyDummyLog) () where
getJobEnv = asks _MyDummyEnv
instance MonadJobStatus MyDummyMonad where
type JobType MyDummyMonad = MyDummyJob
type JobOutputType MyDummyMonad = ()
type JobEventType MyDummyMonad = MyDummyLog
testTlsManager :: Manager
testTlsManager = unsafePerformIO newTlsManager
{-# NOINLINE testTlsManager #-}
shouldBeE :: (MonadIO m, HasCallStack, Show a, Eq a) => a -> a -> m ()
shouldBeE a b = liftIO (shouldBe a b)
type TheEnv = JobEnv MyDummyJob (Seq MyDummyLog) ()
withJob :: TheEnv
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO (SJ.JobStatus 'SJ.Safe MyDummyLog)
withJob myEnv f = runMyDummyMonad (MyDummyEnv myEnv) $
newJob @_ @MyDummyError getJobEnv MyDummyJob (\env hdl input ->
runMyDummyMonad (MyDummyEnv myEnv) $ f env hdl input) (SJ.JobInput () Nothing)
withJob_ :: TheEnv
-> (TheEnv -> JobHandle MyDummyMonad MyDummyLog -> () -> MyDummyMonad (Either MyDummyError ()))
-> IO ()
withJob_ env f = void (withJob env f)
testFetchJobStatus :: IO ()
testFetchJobStatus = do
k <- genSecret
let settings = defaultJobSettings 2 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts <- newMVar []
withJob_ myEnv $ \_ hdl _input -> do
mb_status <- getLatestJobStatus hdl
-- now let's log something
updateJobProgress hdl (const $ Step_0 20)
mb_status' <- getLatestJobStatus hdl
updateJobProgress hdl (\(Step_0 x) -> Step_0 (x + 5))
mb_status'' <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts (\xs -> pure $ mb_status : mb_status' : mb_status'' : xs)
pure $ Right ()
threadDelay 500_000
-- Check the events
readMVar evts >>= \expected -> expected `shouldBe` [Nothing, Just (Step_0 20), Just (Step_0 25)]
testFetchJobStatusNoContention :: IO ()
testFetchJobStatusNoContention = do
k <- genSecret
let settings = defaultJobSettings 2 k
myEnv <- newJobEnv settings defaultPrios testTlsManager
evts1 <- newMVar []
evts2 <- newMVar []
let job1 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_1 100)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts1 (\xs -> pure $ mb_status : xs)
pure $ Right ()
let job2 = \() -> withJob_ myEnv $ \_ hdl _input -> do
updateJobProgress hdl (const $ Step_0 50)
mb_status <- getLatestJobStatus hdl
liftIO $ modifyMVar_ evts2 (\xs -> pure $ mb_status : xs)
pure $ Right ()
Async.forConcurrently_ [job1, job2] ($ ())
threadDelay 500_000
-- Check the events
readMVar evts1 >>= \expected -> expected `shouldBe` [Just (Step_1 100)]
readMVar evts2 >>= \expected -> expected `shouldBe` [Just (Step_0 50)]
main :: IO ()
main = hspec $ do
describe "job queue" $ do
......@@ -141,3 +275,8 @@ main = hspec $ do
testExceptions
it "fairly picks equal-priority-but-different-kind jobs" $
testFairness
describe "job status update and tracking" $ do
it "can fetch the latest job status" $
testFetchJobStatus
it "can spin two separate jobs and track their status separately" $
testFetchJobStatusNoContention
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment