Commit 97e04297 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Cleaner Jobs API

This big commit improves the jobs API in a way that now we can
completely abstract away over a JobLog.
parent cf360b1b
......@@ -23,8 +23,9 @@ import System.Environment (getArgs)
import qualified Data.Text as Text
import Text.Read (readMaybe)
import Gargantext.API.Dev (withDevEnv, runCmdDev)
import Gargantext.API.Admin.EnvTypes (DevEnv(..))
import Gargantext.API.Dev (withDevEnv, runCmdGargDev)
import Gargantext.API.Admin.EnvTypes (DevEnv(..), DevJobHandle(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Node () -- instances
import Gargantext.API.Prelude (GargError)
import Gargantext.Core (Lang(..))
......@@ -36,6 +37,7 @@ import Gargantext.Database.Admin.Types.Node (CorpusId)
import Gargantext.Database.Prelude (Cmd)
import Gargantext.Prelude
import Gargantext.Core.Text.Corpus.Parsers (FileFormat(..), FileType(..))
import Gargantext.Utils.Jobs (MonadJobStatus, JobHandle)
main :: IO ()
main = do
......@@ -50,14 +52,14 @@ main = do
limit' = case (readMaybe limit :: Maybe Int) of
Nothing -> panic $ "Cannot read limit: " <> (Text.pack limit)
Just l -> l
corpus :: forall m. FlowCmdM DevEnv GargError m => m CorpusId
corpus = flowCorpusFile (UserName $ cs user) (Left (cs name :: Text)) limit' tt format Plain corpusPath Nothing (\_ -> pure ())
corpus :: forall m. (FlowCmdM DevEnv GargError m, MonadJobStatus m, JobHandle m ~ DevJobHandle) => m CorpusId
corpus = flowCorpusFile (UserName $ cs user) (Left (cs name :: Text)) limit' tt format Plain corpusPath Nothing DevJobHandle
corpusCsvHal :: forall m. FlowCmdM DevEnv GargError m => m CorpusId
corpusCsvHal = flowCorpusFile (UserName $ cs user) (Left (cs name :: Text)) limit' tt CsvHal Plain corpusPath Nothing (\_ -> pure ())
corpusCsvHal :: forall m. (FlowCmdM DevEnv GargError m, MonadJobStatus m, JobHandle m ~ DevJobHandle) => m CorpusId
corpusCsvHal = flowCorpusFile (UserName $ cs user) (Left (cs name :: Text)) limit' tt CsvHal Plain corpusPath Nothing DevJobHandle
annuaire :: forall m. FlowCmdM DevEnv GargError m => m CorpusId
annuaire = flowAnnuaire (UserName $ cs user) (Left "Annuaire") (Multi EN) corpusPath (\_ -> pure ())
annuaire :: forall m. (FlowCmdM DevEnv GargError m, MonadJobStatus m, JobHandle m ~ DevJobHandle) => m CorpusId
annuaire = flowAnnuaire (UserName $ cs user) (Left "Annuaire") (Multi EN) corpusPath DevJobHandle
{-
let debatCorpus :: forall m. FlowCmdM DevEnv GargError m => m CorpusId
......@@ -72,15 +74,15 @@ main = do
withDevEnv iniPath $ \env -> do
_ <- if fun == "corpus"
then runCmdDev env corpus
then runCmdGargDev env corpus
else pure 0 --(cs "false")
_ <- if fun == "corpusCsvHal"
then runCmdDev env corpusCsvHal
then runCmdGargDev env corpusCsvHal
else pure 0 --(cs "false")
_ <- if fun == "annuaire"
then runCmdDev env annuaire
then runCmdGargDev env annuaire
else pure 0
{-
_ <- if corpusType == "csv"
......
......@@ -33,6 +33,7 @@ library
Gargantext.API.Admin.Auth.Types
Gargantext.API.Admin.EnvTypes
Gargantext.API.Admin.Settings
Gargantext.API.Admin.Orchestrator.Types
Gargantext.API.Admin.Types
Gargantext.API.Dev
Gargantext.API.HashedResponse
......@@ -114,7 +115,6 @@ library
Gargantext.API.Admin.FrontEnd
Gargantext.API.Admin.Orchestrator
Gargantext.API.Admin.Orchestrator.Scrapy.Schedule
Gargantext.API.Admin.Orchestrator.Types
Gargantext.API.Admin.Utils
Gargantext.API.Context
Gargantext.API.Count
......@@ -340,6 +340,7 @@ library
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -538,6 +539,7 @@ executable gargantext-admin
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -566,6 +568,7 @@ executable gargantext-cbor2json
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -598,6 +601,7 @@ executable gargantext-cli
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -637,6 +641,7 @@ executable gargantext-import
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -666,6 +671,7 @@ executable gargantext-init
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -694,6 +700,7 @@ executable gargantext-invitations
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -722,6 +729,7 @@ executable gargantext-phylo
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -764,6 +772,7 @@ executable gargantext-server
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -799,6 +808,7 @@ executable gargantext-upgrade
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -844,6 +854,7 @@ test-suite garg-test
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......@@ -889,6 +900,7 @@ test-suite jobqueue-test
DeriveGeneric
FlexibleContexts
FlexibleInstances
GADTs
GeneralizedNewtypeDeriving
MultiParamTypeClasses
NamedFieldPuns
......
......@@ -25,6 +25,7 @@ default-extensions:
- DeriveGeneric
- FlexibleContexts
- FlexibleInstances
- GADTs
- GeneralizedNewtypeDeriving
- MultiParamTypeClasses
- NamedFieldPuns
......@@ -58,6 +59,7 @@ library:
- Gargantext.API.Admin.Auth.Types
- Gargantext.API.Admin.EnvTypes
- Gargantext.API.Admin.Settings
- Gargantext.API.Admin.Orchestrator.Types
- Gargantext.API.Admin.Types
- Gargantext.API.Dev
- Gargantext.API.HashedResponse
......
......@@ -49,7 +49,6 @@ import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types
import Gargantext.API.Job (jobLogSuccess)
import Gargantext.API.Prelude (HasJoseError(..), joseError, HasServerError, GargServerC, GargServer, _ServerError, GargM, GargError)
import Gargantext.Core.Mail (MailModel(..), mail)
import Gargantext.Core.Mail.Types (mailSettings)
......@@ -64,7 +63,7 @@ import Gargantext.Database.Action.User.New (guessUserName)
import Gargantext.Database.Schema.Node (NodePoly(_node_id))
import Gargantext.Prelude hiding (reverse)
import Gargantext.Prelude.Crypto.Pass.User (gargPass)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Servant
import Servant.Auth.Server
import qualified Data.Text as Text
......@@ -275,23 +274,19 @@ type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI ForgotPasswordJob $ \jHandle p ->
forgotPasswordAsync' p (jobHandleLogger jHandle)
serveJobsAPI ForgotPasswordJob $ \jHandle p -> forgotPasswordAsync' p jHandle
forgotPasswordAsync' :: (FlowCmdM env err m)
forgotPasswordAsync' :: (FlowCmdM env err m, MonadJobStatus m)
=> ForgotPasswordAsyncParams
-> (JobLog -> m ())
-> m JobLog
forgotPasswordAsync' (ForgotPasswordAsyncParams { email }) logStatus = do
let jobLog = JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
logStatus jobLog
-> JobHandle m
-> m ()
forgotPasswordAsync' (ForgotPasswordAsyncParams { email }) jobHandle = do
markStarted 2 jobHandle
markProgress 1 jobHandle
-- printDebug "[forgotPasswordAsync'] email" email
_ <- forgotPasswordPost $ ForgotPasswordRequest { _fpReq_email = email }
pure $ jobLogSuccess jobLog
markComplete jobHandle
......@@ -3,24 +3,39 @@
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.API.Admin.EnvTypes where
import Control.Lens
module Gargantext.API.Admin.EnvTypes (
GargJob(..)
, Env(..)
, mkJobHandle
, env_logger
, env_manager
, env_self_url
, menv_firewall
, MockEnv(..)
, DevEnv(..)
, DevJobHandle(..)
, ConcreteJobHandle -- opaque
) where
import Control.Lens hiding ((:>))
import Control.Monad.Except
import Control.Monad.Reader
import Data.Pool (Pool)
import Data.Sequence (Seq)
import Data.Sequence (Seq, ViewR(..), viewr)
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
import Network.HTTP.Client (Manager)
import Servant.Client (BaseUrl)
import Servant.Job.Async (HasJobEnv(..), Job)
import qualified Servant.Job.Async as SJ
import System.Log.FastLogger
import qualified Servant.Job.Core
import Gargantext.API.Admin.Types
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Prelude (GargError)
import Gargantext.API.Job
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.NodeStory
import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
......@@ -30,6 +45,7 @@ import Gargantext.Prelude.Config (GargConfig(..))
import Gargantext.Prelude.Mail.Types (MailConfig)
import qualified Gargantext.Utils.Jobs.Monad as Jobs
import Gargantext.Utils.Jobs.Map (LoggerM, J(..), jTask, rjGetLog)
data GargJob
= TableNgramsJob
......@@ -50,18 +66,22 @@ data GargJob
| RecomputeGraphJob
deriving (Show, Eq, Ord, Enum, Bounded)
-- Do /not/ treat the data types of this type as strict, because it's convenient
-- to be able to partially initialise things like an 'Env' during tests, without
-- having to specify /everything/. This means that when we /construct/ an 'Env',
-- we need to remember to force the fields to WHNF at that point.
data Env = Env
{ _env_settings :: !Settings
, _env_logger :: !LoggerSet
, _env_pool :: !(Pool Connection)
, _env_nodeStory :: !NodeStoryEnv
, _env_manager :: !Manager
, _env_self_url :: !BaseUrl
, _env_scrapers :: !ScrapersEnv
, _env_jobs :: !(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
, _env_config :: !GargConfig
, _env_mail :: !MailConfig
, _env_nlp :: !NLPServerMap
{ _env_settings :: ~Settings
, _env_logger :: ~LoggerSet
, _env_pool :: ~(Pool Connection)
, _env_nodeStory :: ~NodeStoryEnv
, _env_manager :: ~Manager
, _env_self_url :: ~BaseUrl
, _env_scrapers :: ~ScrapersEnv
, _env_jobs :: ~(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
, _env_config :: ~GargConfig
, _env_mail :: ~MailConfig
, _env_nlp :: ~NLPServerMap
}
deriving (Generic)
......@@ -103,13 +123,68 @@ instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Seq JobLog) JobLog where
instance Jobs.MonadJob (GargM Env err) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs)
instance Jobs.MonadJobStatus (ReaderT Env (ExceptT GargError IO)) where
type JobType (ReaderT Env (ExceptT GargError IO)) = GargJob
type JobOutputType (ReaderT Env (ExceptT GargError IO)) = JobLog
type JobEventType (ReaderT Env (ExceptT GargError IO)) = JobLog
-- | The /concrete/ 'JobHandle' in use with our 'GargM' (production) monad. Its
-- constructor it's not exported, to not leak internal details of its implementation.
data ConcreteJobHandle err = JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: LoggerM (GargM Env err) JobLog
}
-- | Creates a new /concrete/ 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe
-> LoggerM (GargM Env err) JobLog
-> ConcreteJobHandle err
mkJobHandle jId = JobHandle jId
-- | Updates the status of a 'JobHandle' by using the input 'updateJobStatus' function.
updateJobProgress :: ConcreteJobHandle err -> (JobLog -> JobLog) -> GargM Env err ()
updateJobProgress hdl@(JobHandle _ logStatus) updateJobStatus =
Jobs.getLatestJobStatus hdl >>= logStatus . updateJobStatus
instance Jobs.MonadJobStatus (GargM Env err) where
type JobHandle (GargM Env err) = ConcreteJobHandle err
type JobType (GargM Env err) = GargJob
type JobOutputType (GargM Env err) = JobLog
type JobEventType (GargM Env err) = JobLog
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- Jobs.findJob jId
case mb_jb of
Nothing -> pure noJobLog
Just j -> case jTask j of
QueuedJ _ -> pure noJobLog
RunningJ rj -> liftIO (rjGetLog rj) <&>
\lgs -> case viewr lgs of
EmptyR -> noJobLog
_ :> l -> l
DoneJ lgs _ -> pure $ case viewr lgs of
EmptyR -> noJobLog
_ :> l -> l
withTracer extraLogger (JobHandle jId logger) n = n (JobHandle jId (\w -> logger w >> liftIO (extraLogger w)))
markStarted n jh = updateJobProgress jh (const $ jobLogStart (RemainingSteps n))
markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailures steps latest
Just msg -> addErrorEvent msg (jobLogFailures steps latest)
)
markComplete jh = updateJobProgress jh jobLogComplete
markFailed mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailTotal latest
Just msg -> jobLogFailTotalWithMessage msg latest
)
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
......@@ -118,8 +193,6 @@ data MockEnv = MockEnv
makeLenses ''MockEnv
data DevEnv = DevEnv
{ _dev_env_settings :: !Settings
, _dev_env_config :: !GargConfig
......@@ -127,10 +200,39 @@ data DevEnv = DevEnv
, _dev_env_nodeStory :: !NodeStoryEnv
, _dev_env_mail :: !MailConfig
, _dev_env_nlp :: !NLPServerMap
, _dev_env_jobs :: !(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
}
makeLenses ''DevEnv
-- | Our /mock/ job handle.
data DevJobHandle = DevJobHandle
instance Jobs.MonadJob (GargM DevEnv err) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view dev_env_jobs)
instance Jobs.MonadJobStatus (GargM DevEnv err) where
type JobHandle (GargM DevEnv err) = DevJobHandle
type JobType (GargM DevEnv err) = GargJob
type JobOutputType (GargM DevEnv err) = JobLog
type JobEventType (GargM DevEnv err) = JobLog
getLatestJobStatus DevJobHandle = pure noJobLog
withTracer _ DevJobHandle n = n DevJobHandle
markStarted _ _ = pure ()
markProgress _ _ = pure ()
markFailure _ _ _ = pure ()
markComplete _ = pure ()
markFailed _ _ = pure ()
instance HasConfig DevEnv where
hasConfig = dev_env_config
......
......@@ -101,7 +101,7 @@ data ScraperEvent = ScraperEvent
, _scev_level :: !(Maybe Text)
, _scev_date :: !(Maybe Text)
}
deriving (Show, Generic)
deriving (Show, Generic, Eq)
instance Arbitrary ScraperEvent where
arbitrary = ScraperEvent <$> elements [Nothing, Just "test message"]
......@@ -122,10 +122,13 @@ data JobLog = JobLog
, _scst_remaining :: !(Maybe Int)
, _scst_events :: !(Maybe [ScraperEvent])
}
deriving (Show, Generic)
deriving (Show, Generic, Eq)
makeLenses ''JobLog
noJobLog :: JobLog
noJobLog = JobLog Nothing Nothing Nothing Nothing
instance Arbitrary JobLog where
arbitrary = JobLog
<$> arbitrary
......
......@@ -12,6 +12,7 @@ TODO-SECURITY: Critical
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
......@@ -177,32 +178,34 @@ devJwkFile = "dev.jwk"
newEnv :: PortNumber -> FilePath -> IO Env
newEnv port file = do
manager_env <- newTlsManager
settings' <- devSettings devJwkFile <&> appPort .~ port -- TODO read from 'file'
!manager_env <- newTlsManager
!settings' <- devSettings devJwkFile <&> appPort .~ port -- TODO read from 'file'
when (port /= settings' ^. appPort) $
panic "TODO: conflicting settings of port"
config_env <- readConfig file
!config_env <- readConfig file
prios <- Jobs.readPrios (file <> ".jobs")
let prios' = Jobs.applyPrios prios Jobs.defaultPrios
putStrLn $ "Overrides: " <> show prios
putStrLn $ "New priorities: " <> show prios'
self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
dbParam <- databaseParameters file
pool <- newPool dbParam
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
dbParam <- databaseParameters file
!pool <- newPool dbParam
--nodeStory_env <- readNodeStoryEnv (_gc_repofilepath config_env)
nodeStory_env <- readNodeStoryEnv pool
scrapers_env <- newJobEnv defaultSettings manager_env
!nodeStory_env <- readNodeStoryEnv pool
!scrapers_env <- newJobEnv defaultSettings manager_env
secret <- Jobs.genSecret
let jobs_settings = (Jobs.defaultJobSettings 1 secret)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_js_id_timeout)
jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
logger <- newStderrLoggerSet defaultBufSize
config_mail <- Mail.readConfig file
config_nlp <- NLP.readConfig file
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!logger <- newStderrLoggerSet defaultBufSize
!config_mail <- Mail.readConfig file
!nlp_env <- nlpServerMap <$> NLP.readConfig file
-- | An 'Env' by default doesn't have strict fields, but when constructing one in production
-- we want to force them to WHNF to avoid accumulating unnecessary thunks.
pure $ Env
{ _env_settings = settings'
, _env_logger = logger
......@@ -214,7 +217,7 @@ newEnv port file = do
, _env_self_url = self_url_env
, _env_config = config_env
, _env_mail = config_mail
, _env_nlp = nlpServerMap config_nlp
, _env_nlp = nlp_env
}
newPool :: ConnectInfo -> IO (Pool Connection)
......
......@@ -9,12 +9,15 @@ Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
-- Use only for dev/repl
module Gargantext.API.Dev where
import Control.Exception (finally)
import Control.Monad (fail)
import Control.Monad.Reader (runReaderT)
import Control.Monad.Except (runExceptT)
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Admin.Settings
import Gargantext.API.Ngrams (saveNodeStoryImmediate)
......@@ -24,8 +27,11 @@ import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude
import Gargantext.Prelude
import Gargantext.Prelude.Config (readConfig)
import Network.HTTP.Client.TLS (newTlsManager)
import qualified Gargantext.Prelude.Mail as Mail
import qualified Gargantext.Prelude.NLP as NLP
import qualified Gargantext.Utils.Jobs.Monad as Jobs
import qualified Gargantext.Utils.Jobs.Queue as Jobs
import Servant
import System.IO (FilePath)
......@@ -46,6 +52,10 @@ withDevEnv iniPath k = do
setts <- devSettings devJwkFile
mail <- Mail.readConfig iniPath
nlp_config <- NLP.readConfig iniPath
secret <- Jobs.genSecret
let jobs_settings = Jobs.defaultJobSettings 1 secret
manager_env <- newTlsManager
jobs_env <- Jobs.newJobEnv jobs_settings Jobs.defaultPrios manager_env
pure $ DevEnv
{ _dev_env_pool = pool
, _dev_env_nodeStory = nodeStory_env
......@@ -53,8 +63,14 @@ withDevEnv iniPath k = do
, _dev_env_config = cfg
, _dev_env_mail = mail
, _dev_env_nlp = nlpServerMap nlp_config
, _dev_env_jobs = jobs_env
}
type DevCmd env err a = forall m. (
CmdM'' env err m
, Jobs.MonadJobStatus m
) => m a
-- | Run Cmd Sugar for the Repl (GHCI)
runCmdRepl :: Show err => Cmd'' DevEnv err a -> IO a
runCmdRepl f = withDevEnv "gargantext.ini" $ \env -> runCmdDev env f
......@@ -66,9 +82,15 @@ runCmdReplServantErr = runCmdRepl
-- the command.
-- This function is constrained to the DevEnv rather than
-- using HasConnectionPool and HasRepoVar.
runCmdDev :: Show err => DevEnv -> Cmd'' DevEnv err a -> IO a
runCmdDev env f =
(either (fail . show) pure =<< runCmd env f)
runCmdDev :: Show err => DevEnv -> DevCmd DevEnv err a -> IO a
runCmdDev env cmd =
(either (fail . show) pure =<< runExceptT (runReaderT cmd env))
`finally`
runReaderT saveNodeStoryImmediate env
runCmdGargDev :: DevEnv -> GargM DevEnv GargError a -> IO a
runCmdGargDev env cmd =
(either (fail . show) pure =<< runExceptT (runReaderT cmd env))
`finally`
runReaderT saveNodeStoryImmediate env
......
module Gargantext.API.Job where
import Control.Lens (over, _Just)
import Data.IORef
import Data.Maybe
import qualified Data.Text as T
......@@ -9,11 +8,13 @@ import Gargantext.Prelude
import Gargantext.API.Admin.Orchestrator.Types
newtype RemainingSteps = RemainingSteps { _RemainingSteps :: Int }
deriving (Show, Eq, Num)
jobLogInit :: Int -> JobLog
jobLogInit rem =
jobLogStart :: RemainingSteps -> JobLog
jobLogStart rem =
JobLog { _scst_succeeded = Just 0
, _scst_remaining = Just rem
, _scst_remaining = Just (_RemainingSteps rem)
, _scst_failed = Just 0
, _scst_events = Just [] }
......@@ -25,13 +26,24 @@ addEvent level message (JobLog { _scst_events = mEvts, .. }) = JobLog { _scst_ev
, _scev_level = Just level
, _scev_date = Nothing }
jobLogSuccess :: JobLog -> JobLog
jobLogSuccess jl = over (scst_succeeded . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
addErrorEvent :: T.Text -> JobLog -> JobLog
addErrorEvent message = addEvent "ERROR" message
jobLogFail :: JobLog -> JobLog
jobLogFail jl = over (scst_failed . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
jobLogProgress :: Int -> JobLog -> JobLog
jobLogProgress n jl = over (scst_succeeded . _Just) (+ n) $
over (scst_remaining . _Just) (\x -> x - n) jl
-- | Mark a job as completely done, by adding the 'remaining' into 'succeeded'.
-- At the end 'scst_remaining' will be 0, and 'scst_succeeded' will be 'oldvalue + remaining'.
jobLogComplete :: JobLog -> JobLog
jobLogComplete jl =
let remainingNow = fromMaybe 0 (_scst_remaining jl)
in jl & over scst_succeeded (Just . maybe remainingNow ((+) remainingNow))
& over scst_remaining (const (Just 0))
jobLogFailures :: Int -> JobLog -> JobLog
jobLogFailures n jl = over (scst_failed . _Just) (+ n) $
over (scst_remaining . _Just) (\x -> x - n) jl
jobLogFailTotal :: JobLog -> JobLog
jobLogFailTotal (JobLog { _scst_succeeded = mSucc
......@@ -48,25 +60,7 @@ jobLogFailTotal (JobLog { _scst_succeeded = mSucc
Just rem -> (Just 0, (+ rem) <$> mFail)
jobLogFailTotalWithMessage :: T.Text -> JobLog -> JobLog
jobLogFailTotalWithMessage message jl = addEvent "ERROR" message $ jobLogFailTotal jl
jobLogFailTotalWithMessage message jl = addErrorEvent message $ jobLogFailTotal jl
jobLogEvt :: JobLog -> ScraperEvent -> JobLog
jobLogEvt jl evt = over (scst_events . _Just) (\evts -> (evt:evts)) jl
runJobLog :: MonadBase IO m => Int -> (JobLog -> m ()) -> m (m (), m (), m JobLog)
runJobLog num logStatus = do
jlRef <- liftBase $ newIORef $ jobLogInit num
return (logRefF jlRef, logRefSuccessF jlRef, getRefF jlRef)
where
logRefF ref = do
jl <- liftBase $ readIORef ref
logStatus jl
logRefSuccessF ref = do
jl <- liftBase $ readIORef ref
let jl' = jobLogSuccess jl
liftBase $ writeIORef ref jl'
logStatus jl'
getRefF ref = do
liftBase $ readIORef ref
......@@ -101,7 +101,6 @@ import GHC.Generics (Generic)
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job
import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude
import Gargantext.Core.NodeStory
......@@ -121,7 +120,7 @@ import Gargantext.Prelude hiding (log)
import Gargantext.Prelude.Clock (hasTime, getTime)
import Prelude (error)
import Servant hiding (Patch)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import System.IO (stderr)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary (Arbitrary, arbitrary)
......@@ -423,11 +422,12 @@ tableNgramsPostChartsAsync :: ( HasNodeStory env err m
, FlowCmdM env err m
, HasNodeError err
, HasSettings env
, MonadJobStatus m
)
=> UpdateTableNgramsCharts
-> (JobLog -> m ())
-> m JobLog
tableNgramsPostChartsAsync utn logStatus = do
-> JobHandle m
-> m ()
tableNgramsPostChartsAsync utn jobHandle = do
let tabType = utn ^. utn_tab_type
let listId = utn ^. utn_list_id
......@@ -442,44 +442,35 @@ tableNgramsPostChartsAsync utn logStatus = do
case mCId of
Nothing -> do
-- printDebug "[tableNgramsPostChartsAsync] can't update charts, no parent, nId" nId
pure $ jobLogFail $ jobLogInit 1
markStarted 1 jobHandle
markFailed Nothing jobHandle
Just cId -> do
case tabType of
Authors -> do
-- printDebug "[tableNgramsPostChartsAsync] Authors, updating Pie, cId" cId
(logRef, logRefSuccess, getRef) <- runJobLog 1 logStatus
logRef
markStarted 1 jobHandle
_ <- Metrics.updatePie cId (Just listId) tabType Nothing
logRefSuccess
getRef
markComplete jobHandle
Institutes -> do
-- printDebug "[tableNgramsPostChartsAsync] Institutes, updating Tree, cId" cId
-- printDebug "[tableNgramsPostChartsAsync] updating tree StopTerm, cId" cId
(logRef, logRefSuccess, getRef) <- runJobLog 3 logStatus
logRef
markStarted 3 jobHandle
_ <- Metrics.updateTree cId (Just listId) tabType StopTerm
-- printDebug "[tableNgramsPostChartsAsync] updating tree CandidateTerm, cId" cId
logRefSuccess
markProgress 1 jobHandle
_ <- Metrics.updateTree cId (Just listId) tabType CandidateTerm
-- printDebug "[tableNgramsPostChartsAsync] updating tree MapTerm, cId" cId
logRefSuccess
markProgress 1 jobHandle
_ <- Metrics.updateTree cId (Just listId) tabType MapTerm
logRefSuccess
getRef
markComplete jobHandle
Sources -> do
-- printDebug "[tableNgramsPostChartsAsync] Sources, updating chart, cId" cId
(logRef, logRefSuccess, getRef) <- runJobLog 1 logStatus
logRef
markStarted 1 jobHandle
_ <- Metrics.updatePie cId (Just listId) tabType Nothing
logRefSuccess
getRef
markComplete jobHandle
Terms -> do
-- printDebug "[tableNgramsPostChartsAsync] Terms, updating Metrics (Histo), cId" cId
(logRef, logRefSuccess, getRef) <- runJobLog 6 logStatus
logRef
markStarted 6 jobHandle
{-
_ <- Metrics.updateChart cId (Just listId) tabType Nothing
logRefSuccess
......@@ -493,12 +484,11 @@ tableNgramsPostChartsAsync utn logStatus = do
logRefSuccess
_ <- Metrics.updateTree cId (Just listId) tabType MapTerm
-}
logRefSuccess
getRef
markComplete jobHandle
_ -> do
-- printDebug "[tableNgramsPostChartsAsync] no update for tabType = " tabType
pure $ jobLogFail $ jobLogInit 1
markStarted 1 jobHandle
markFailed Nothing jobHandle
{-
{ _ne_list :: ListType
......@@ -830,12 +820,8 @@ apiNgramsTableDoc dId = getTableNgramsDoc dId
apiNgramsAsync :: NodeId -> ServerT TableNgramsAsyncApi (GargM Env GargError)
apiNgramsAsync _dId =
serveJobsAPI TableNgramsJob $ \jHandle i ->
let
log' x = do
printDebug "tableNgramsPostChartsAsync" x
jobHandleLogger jHandle x
in tableNgramsPostChartsAsync i log'
serveJobsAPI TableNgramsJob $ \jHandle i -> withTracer (printDebug "tableNgramsPostChartsAsync") jHandle $
\jHandle' -> tableNgramsPostChartsAsync i jHandle'
-- Did the given list of ngrams changed since the given version?
-- The returned value is versioned boolean value, meaning that one always retrieve the
......
......@@ -25,7 +25,6 @@ import Data.Text (Text, concat, pack, splitOn)
import Data.Vector (Vector)
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Job (jobLogFailTotalWithMessage, jobLogSuccess)
import Gargantext.API.Ngrams (setListNgrams)
import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Prelude (getNgramsList)
......@@ -48,7 +47,7 @@ import Gargantext.Database.Schema.Ngrams
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Servant
-- import Servant.Job.Async
import qualified Data.ByteString.Lazy as BSL
......@@ -194,45 +193,27 @@ toIndexedNgrams m t = Indexed <$> i <*> n
jsonPostAsync :: ServerT JSONAPI (GargM Env GargError)
jsonPostAsync lId =
serveJobsAPI UpdateNgramsListJobJSON $ \jHandle f ->
let
log'' x = do
-- printDebug "postAsync ListId" x
jobHandleLogger jHandle x
in postAsync' lId f log''
postAsync' lId f jHandle
postAsync' :: FlowCmdM env err m
postAsync' :: (FlowCmdM env err m, MonadJobStatus m)
=> ListId
-> WithJsonFile
-> (JobLog -> m ())
-> m JobLog
postAsync' l (WithJsonFile m _) logStatus = do
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
-> JobHandle m
-> m ()
postAsync' l (WithJsonFile m _) jobHandle = do
markStarted 2 jobHandle
-- printDebug "New list as file" l
_ <- setList l m
-- printDebug "Done" r
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markProgress 1 jobHandle
corpus_node <- getNode l -- (Proxy :: Proxy HyperdataList)
let corpus_id = fromMaybe (panic "") (_node_parent_id corpus_node)
_ <- reIndexWith corpus_id l NgramsTerms (Set.fromList [MapTerm, CandidateTerm])
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
------------------------------------------------------------------------
......@@ -293,23 +274,13 @@ csvPost l m = do
csvPostAsync :: ServerT CSVAPI (GargM Env GargError)
csvPostAsync lId =
serveJobsAPI UpdateNgramsListJobCSV $ \jHandle f -> do
let log'' x = do
-- printDebug "[csvPostAsync] filetype" (_wtf_filetype f)
-- printDebug "[csvPostAsync] name" (_wtf_name f)
jobHandleLogger jHandle x
let jl = JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
log'' jl
markStarted 1 jHandle
ePost <- csvPost lId (_wtf_data f)
let jlNew = case ePost of
Left err -> jobLogFailTotalWithMessage err jl
Right () -> jobLogSuccess jl
printDebug "[csvPostAsync] job ended with joblog: " jlNew
log'' jlNew
pure jlNew
case ePost of
Left err -> markFailed (Just err) jHandle
Right () -> markComplete jHandle
getLatestJobStatus jHandle >>= printDebug "[csvPostAsync] job ended with joblog: "
------------------------------------------------------------------------
......
......@@ -46,9 +46,9 @@ import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataAnnuaire(..), HyperdataContact)
import Gargantext.Database.Admin.Types.Hyperdata.Contact (hyperdataContact)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (($), {-printDebug,-} pure)
import Gargantext.Prelude (($), {-printDebug,-})
import qualified Gargantext.Utils.Aeson as GUA
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
------------------------------------------------------------------------
type API = "contact" :> Summary "Contact endpoint"
......@@ -74,34 +74,22 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
api_async :: User -> NodeId -> ServerT API_Async (GargM Env GargError)
api_async u nId =
serveJobsAPI AddContactJob $ \jHandle p ->
let
log' x = do
-- printDebug "addContact" x
jobHandleLogger jHandle x
in addContact u nId p log'
addContact u nId p jHandle
addContact :: (HasSettings env, FlowCmdM env err m)
addContact :: (HasSettings env, FlowCmdM env err m, MonadJobStatus m)
=> User
-> NodeId
-> AddContactParams
-> (JobLog -> m ())
-> m JobLog
addContact u nId (AddContactParams fn ln) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
_ <- flow (Nothing :: Maybe HyperdataAnnuaire) u (Right [nId]) (Multi EN) Nothing (Just 1, yield $ hyperdataContact fn ln) logStatus
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
addContact _uId _nId _p logStatus = do
simuLogs logStatus 10
-> JobHandle m
-> m ()
addContact u nId (AddContactParams fn ln) jobHandle = do
markStarted 2 jobHandle
_ <- flow (Nothing :: Maybe HyperdataAnnuaire) u (Right [nId]) (Multi EN) Nothing (Just 1, yield $ hyperdataContact fn ln) jobHandle
markComplete jobHandle
addContact _uId _nId _p jobHandle = do
simuLogs jobHandle 10
------------------------------------------------------------------------
-- TODO unPrefix "pn_" FromJSON, ToJSON, ToSchema, adapt frontend.
......
......@@ -30,6 +30,7 @@ import Gargantext.Core.Utils.Prefix (unPrefixSwagger)
import Gargantext.Database.Action.Flow.Types (FlowCmdM) -- flowAnnuaire
import Gargantext.Database.Admin.Types.Node (AnnuaireId)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (MonadJobStatus(..))
type Api = Summary "New Annuaire endpoint"
......@@ -64,22 +65,15 @@ type AddWithForm = Summary "Add with FormUrlEncoded to annuaire endpoint"
:> AsyncJobs JobLog '[FormUrlEncoded] AnnuaireWithForm JobLog
------------------------------------------------------------------------
addToAnnuaireWithForm :: FlowCmdM env err m
addToAnnuaireWithForm :: (FlowCmdM env err m, MonadJobStatus m)
=> AnnuaireId
-> AnnuaireWithForm
-> (JobLog -> m ())
-> m JobLog
addToAnnuaireWithForm _cid (AnnuaireWithForm { _wf_filetype }) logStatus = do
-> JobHandle m
-> m ()
addToAnnuaireWithForm _cid (AnnuaireWithForm { _wf_filetype }) jobHandle = do
-- printDebug "ft" _wf_filetype
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markStarted 3 jobHandle
markProgress 1 jobHandle
markComplete jobHandle
......@@ -37,9 +37,8 @@ import qualified Data.Text.Encoding as TE
-- import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, ScraperEvent(..), scst_events)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (addEvent, jobLogSuccess, jobLogFailTotal)
import Gargantext.API.Node.Corpus.New.Types
import Gargantext.API.Node.Corpus.Searx
import Gargantext.API.Node.Corpus.Types
......@@ -61,6 +60,7 @@ import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_parsers)
import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
import qualified Gargantext.Core.Text.Corpus.API as API
import qualified Gargantext.Core.Text.Corpus.Parsers as Parser (FileType(..), parseFormatC)
import qualified Gargantext.Database.GargDB as GargDB
......@@ -180,24 +180,20 @@ type AddWithFile = Summary "Add with MultipartData to corpus endpoint"
-- TODO WithQuery also has a corpus id
addToCorpusWithQuery :: FlowCmdM env err m
addToCorpusWithQuery :: (FlowCmdM env err m, MonadJobStatus m)
=> User
-> CorpusId
-> WithQuery
-> Maybe Integer
-> (JobLog -> m ())
-> m JobLog
-> JobHandle m
-> m ()
addToCorpusWithQuery user cid (WithQuery { _wq_query = q
, _wq_databases = dbs
, _wq_datafield = datafield
, _wq_lang = l
, _wq_flowListWith = flw }) maybeLimit logStatus = do
, _wq_flowListWith = flw }) maybeLimit jobHandle = do
-- TODO ...
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 3
, _scst_events = Just []
}
markStarted 3 jobHandle
-- printDebug "[addToCorpusWithQuery] (cid, dbs)" (cid, dbs)
-- printDebug "[addToCorpusWithQuery] datafield" datafield
-- printDebug "[addToCorpusWithQuery] flowListWith" flw
......@@ -206,13 +202,9 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
Just Web -> do
-- printDebug "[addToCorpusWithQuery] processing web request" datafield
_ <- triggerSearxSearch user cid q l logStatus
_ <- triggerSearxSearch user cid q l jobHandle
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
_ -> do
-- TODO add cid
......@@ -229,35 +221,30 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
[] -> do
let txts = rights eTxts
-- TODO Sum lenghts of each txt elements
logStatus $ JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just $ 1 + length txts
, _scst_events = Just []
}
-- NOTE(adinapoli) Some other weird arithmetic to have the
-- following 'JobLog' as output:
-- JobLog
-- { _scst_succeeded = Just 2
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ 1 + length txts
-- , _scst_events = Just []
-- }
markStarted (3 + length txts) jobHandle
markProgress 2 jobHandle
_cids <- mapM (\txt -> do
flowDataText user txt (Multi l) cid (Just flw) logStatus) txts
flowDataText user txt (Multi l) cid (Just flw) jobHandle) txts
-- printDebug "corpus id" cids
-- printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
-- TODO ...
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
(err:_) -> do
-- printDebug "Error: " err
let jl = addEvent "ERROR" (T.pack $ show err) $
JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 1
, _scst_remaining = Just 0
, _scst_events = Just []
}
logStatus jl
pure jl
markFailure 1 (Just $ T.pack (show err)) jobHandle
type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
:> "corpus"
......@@ -267,18 +254,16 @@ type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
:> "async"
:> AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog
addToCorpusWithForm :: (FlowCmdM env err m)
addToCorpusWithForm :: (FlowCmdM env err m, MonadJobStatus m)
=> User
-> CorpusId
-> NewWithForm
-> (JobLog -> m ())
-> JobLog
-> m JobLog
addToCorpusWithForm user cid (NewWithForm ft ff d l _n sel) logStatus jobLog = do
-> JobHandle m
-> m ()
addToCorpusWithForm user cid (NewWithForm ft ff d l _n sel) jobHandle = do
-- printDebug "[addToCorpusWithForm] Parsing corpus: " cid
-- printDebug "[addToCorpusWithForm] fileType" ft
-- printDebug "[addToCorpusWithForm] fileFormat" ff
logStatus jobLog
limit' <- view $ hasConfig . gc_max_docs_parsers
let limit = fromIntegral limit' :: Integer
let
......@@ -329,28 +314,17 @@ addToCorpusWithForm user cid (NewWithForm ft ff d l _n sel) logStatus jobLog = d
--(Just $ fromIntegral $ length docs, docsC')
(mCount, transPipe liftBase docsC') -- TODO fix number of docs
--(map (map toHyperdataDocument) docs)
logStatus
jobHandle
-- printDebug "Extraction finished : " cid
-- printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
-- TODO uncomment this
--sendMail user
logStatus jobLog3
pure jobLog3
markComplete jobHandle
Left e -> do
printDebug "[addToCorpusWithForm] parse error" e
let evt = ScraperEvent { _scev_message = Just $ T.pack e
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
logStatus $ over (scst_events . _Just) (\evt' -> evt' <> [evt]) jobLogE
pure jobLogE
where
jobLog2 = jobLogSuccess jobLog
jobLog3 = jobLogSuccess jobLog2
jobLogE = jobLogFailTotal jobLog
markFailed (Just $ T.pack e) jobHandle
{-
addToCorpusWithFile :: FlowCmdM env err m
......@@ -385,20 +359,16 @@ type AddWithFile = Summary "Add with FileUrlEncoded to corpus endpoint"
:> "async"
:> AsyncJobs JobLog '[FormUrlEncoded] NewWithFile JobLog
addToCorpusWithFile :: (HasSettings env, FlowCmdM env err m)
addToCorpusWithFile :: (HasSettings env, FlowCmdM env err m, MonadJobStatus m)
=> User
-> CorpusId
-> NewWithFile
-> (JobLog -> m ())
-> m JobLog
addToCorpusWithFile user cid nwf@(NewWithFile _d _l fName) logStatus = do
-> JobHandle m
-> m ()
addToCorpusWithFile user cid nwf@(NewWithFile _d _l fName) jobHandle = do
printDebug "[addToCorpusWithFile] Uploading file to corpus: " cid
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markStarted 1 jobHandle
fPath <- GargDB.writeFile nwf
printDebug "[addToCorpusWithFile] File saved as: " fPath
......@@ -421,8 +391,4 @@ addToCorpusWithFile user cid nwf@(NewWithFile _d _l fName) logStatus = do
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
pure $ JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
......@@ -22,8 +22,6 @@ import Protolude (catMaybes, encodeUtf8, rightToMaybe, Text)
import Gargantext.Prelude
import Gargantext.Prelude.Config
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import Gargantext.API.Job (jobLogSuccess)
import Gargantext.Core (Lang(..))
import Gargantext.Core.NLP (nlpServerGet)
import qualified Gargantext.Core.Text.Corpus.API as API
......@@ -43,6 +41,7 @@ import Gargantext.Database.Admin.Types.Node (CorpusId, ListId)
import Gargantext.Database.Prelude (hasConfig)
import Gargantext.Database.Query.Table.Node (defaultListMaybe, getOrMkList)
import Gargantext.Database.Query.Tree.Root (getOrMk_RootWithCorpus)
import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
langToSearx :: Lang -> Text
......@@ -149,21 +148,16 @@ insertSearxResponse user cId listId l (Right (SearxResponse { _srs_results })) =
pure ()
-- TODO Make an async task out of this?
triggerSearxSearch :: (MonadBase IO m, FlowCmdM env err m)
triggerSearxSearch :: (MonadBase IO m, FlowCmdM env err m, MonadJobStatus m)
=> User
-> CorpusId
-> API.Query
-> Lang
-> (JobLog -> m ())
-> m JobLog
triggerSearxSearch user cId q l logStatus = do
-> JobHandle m
-> m ()
triggerSearxSearch user cId q l jobHandle = do
let numPages = 100
let jobLog = JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just numPages
, _scst_events = Just []
}
logStatus jobLog
markStarted numPages jobHandle
-- printDebug "[triggerSearxSearch] cId" cId
-- printDebug "[triggerSearxSearch] q" q
......@@ -190,15 +184,11 @@ triggerSearxSearch user cId q l logStatus = do
, _fsp_url = surl }
insertSearxResponse user cId listId l res
markProgress page jobHandle
logStatus $ JobLog { _scst_succeeded = Just page
, _scst_failed = Just 0
, _scst_remaining = Just (numPages - page)
, _scst_events = Just [] }
) [1..numPages]
--printDebug "[triggerSearxSearch] res" res
pure $ jobLogSuccess jobLog
markComplete jobHandle
hyperdataDocumentFromSearxResult :: Lang -> SearxResult -> Either T.Text HyperdataDocument
hyperdataDocumentFromSearxResult l (SearxResult { _sr_content, _sr_engine, _sr_pubdate, _sr_title }) = do
......
......@@ -14,7 +14,6 @@ import qualified Data.Text as T
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogSuccess)
import Gargantext.API.Prelude
import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Parsers.Date (dateSplit)
......@@ -28,7 +27,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Prelude
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus(..))
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
data DocumentUpload = DocumentUpload
......@@ -70,25 +69,19 @@ type API = Summary " Document upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadDocumentJob $ \jHandle q -> do
documentUploadAsync uId nId q (jobHandleLogger jHandle)
documentUploadAsync uId nId q jHandle
documentUploadAsync :: (FlowCmdM env err m)
documentUploadAsync :: (FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> DocumentUpload
-> (JobLog -> m ())
-> m JobLog
documentUploadAsync _uId nId doc logStatus = do
let jl = JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just [] }
logStatus jl
-> JobHandle m
-> m ()
documentUploadAsync _uId nId doc jobHandle = do
markStarted 1 jobHandle
_docIds <- documentUpload nId doc
-- printDebug "documentUploadAsync" docIds
pure $ jobLogSuccess jl
markComplete jobHandle
documentUpload :: (FlowCmdM env err m)
=> NodeId
......
......@@ -28,7 +28,6 @@ import GHC.Generics (Generic)
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (jobLogSuccess, jobLogFailTotalWithMessage)
import Gargantext.API.Prelude (GargM, GargError)
import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Parsers.FrameWrite
......@@ -43,7 +42,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getChildrenByType, getClosestParentIdByType', getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata, node_name, node_date)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Core.Text.Corpus.Parsers.Date (split')
import Servant
import Text.Read (readMaybe)
......@@ -55,8 +54,8 @@ import qualified Data.Text as T
type API = Summary " Documents from Write nodes."
:> AsyncJobs JobLog '[JSON] Params JobLog
------------------------------------------------------------------------
data Params = Params
{ id :: Int
data Params = Params
{ id :: Int
, paragraphs :: Text
, lang :: Lang
, selection :: FlowSocialListWith
......@@ -71,28 +70,24 @@ instance ToSchema Params
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI DocumentFromWriteNodeJob $ \jHandle p ->
documentsFromWriteNodes uId nId p (jobHandleLogger jHandle)
documentsFromWriteNodes uId nId p jHandle
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m)
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> Params
-> (JobLog -> m ())
-> m JobLog
documentsFromWriteNodes uId nId Params { selection, lang, paragraphs } logStatus = do
let jobLog = JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
logStatus jobLog
-> JobHandle m
-> m ()
documentsFromWriteNodes uId nId Params { selection, lang, paragraphs } jobHandle = do
markStarted 2 jobHandle
markProgress 1 jobHandle
mcId <- getClosestParentIdByType' nId NodeCorpus
cId <- case mcId of
Just cId -> pure cId
Nothing -> do
let msg = T.pack $ "[G.A.N.DFWN] Node has no corpus parent: " <> show nId
logStatus $ jobLogFailTotalWithMessage msg jobLog
markFailed (Just msg) jobHandle
panic msg
frameWriteIds <- getChildrenByType nId NodeFrameWrite
......@@ -105,7 +100,7 @@ documentsFromWriteNodes uId nId Params { selection, lang, paragraphs } logStatus
contents <- getHyperdataFrameContents (node ^. node_hyperdata)
pure (node, contents)
) frameWrites
let paragraphs' = fromMaybe (7 :: Int) $ (readMaybe $ T.unpack paragraphs)
let parsedE = (\(node, contents)
-> hyperdataDocumentFromFrameWrite lang paragraphs' (node, contents)) <$> frameWritesWithContents
......@@ -116,9 +111,9 @@ documentsFromWriteNodes uId nId Params { selection, lang, paragraphs } logStatus
(Multi lang)
cId
(Just selection)
logStatus
jobHandle
pure $ jobLogSuccess jobLog
markProgress 1 jobHandle
------------------------------------------------------------------------
hyperdataDocumentFromFrameWrite :: Lang -> Int -> (Node HyperdataFrame, T.Text) -> Either T.Text [HyperdataDocument]
......
......@@ -31,7 +31,7 @@ import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Data.Either
data RESPONSE deriving Typeable
......@@ -103,27 +103,19 @@ type FileAsyncApi = Summary "File Async Api"
fileAsyncApi :: UserId -> NodeId -> ServerT FileAsyncApi (GargM Env GargError)
fileAsyncApi uId nId =
serveJobsAPI AddFileJob $ \jHandle i ->
let
log' x = do
-- printDebug "addWithFile" x
jobHandleLogger jHandle x
in addWithFile uId nId i log'
addWithFile uId nId i jHandle
addWithFile :: (HasSettings env, FlowCmdM env err m)
addWithFile :: (HasSettings env, FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> NewWithFile
-> (JobLog -> m ())
-> m JobLog
addWithFile uId nId nwf@(NewWithFile _d _l fName) logStatus = do
-> JobHandle m
-> m ()
addWithFile uId nId nwf@(NewWithFile _d _l fName) jobHandle = do
-- printDebug "[addWithFile] Uploading file: " nId
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markStarted 1 jobHandle
fPath <- GargDB.writeFile nwf
-- printDebug "[addWithFile] File saved as: " fPath
......@@ -142,8 +134,4 @@ addWithFile uId nId nwf@(NewWithFile _d _l fName) logStatus = do
_ -> pure ()
-- printDebug "[addWithFile] File upload finished: " nId
pure $ JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
......@@ -18,7 +18,6 @@ import Web.FormUrlEncoded (FromForm)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogInit, jobLogSuccess, jobLogFail)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.Corpus.New.Types (FileFormat(..), FileType(..))
import Gargantext.API.Node.Types (NewWithForm(..))
......@@ -32,7 +31,7 @@ import Gargantext.Database.Prelude (HasConfig)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Core (Lang)
data FrameCalcUpload = FrameCalcUpload {
......@@ -55,19 +54,18 @@ type API = Summary " FrameCalc upload"
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UploadFrameCalcJob $ \jHandle p ->
frameCalcUploadAsync uId nId p (jobHandleLogger jHandle) (jobLogInit 5)
frameCalcUploadAsync uId nId p jHandle
frameCalcUploadAsync :: (HasConfig env, FlowCmdM env err m)
frameCalcUploadAsync :: (HasConfig env, FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> FrameCalcUpload
-> (JobLog -> m ())
-> JobLog
-> m JobLog
frameCalcUploadAsync uId nId (FrameCalcUpload _wf_lang _wf_selection) logStatus jobLog = do
logStatus jobLog
-> JobHandle m
-> m ()
frameCalcUploadAsync uId nId (FrameCalcUpload _wf_lang _wf_selection) jobHandle = do
markStarted 5 jobHandle
-- printDebug "[frameCalcUploadAsync] uId" uId
-- printDebug "[frameCalcUploadAsync] nId" nId
......@@ -89,9 +87,9 @@ frameCalcUploadAsync uId nId (FrameCalcUpload _wf_lang _wf_selection) logStatus
mCId <- getClosestParentIdByType nId NodeCorpus
-- printDebug "[frameCalcUploadAsync] mCId" mCId
jobLog2 <- case mCId of
Nothing -> pure $ jobLogFail jobLog
case mCId of
Nothing -> markFailure 1 Nothing jobHandle
Just cId ->
addToCorpusWithForm (RootId (NodeId uId)) cId (NewWithForm CSV Plain body _wf_lang "calc-upload.csv" _wf_selection) logStatus jobLog
addToCorpusWithForm (RootId (NodeId uId)) cId (NewWithForm CSV Plain body _wf_lang "calc-upload.csv" _wf_selection) jobHandle
pure $ jobLogSuccess jobLog2
markComplete jobHandle
......@@ -41,7 +41,7 @@ import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.Node.User
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
------------------------------------------------------------------------
data PostNode = PostNode { pn_name :: Text
......@@ -77,39 +77,27 @@ type PostNodeAsync = Summary "Post Node"
postNodeAsyncAPI
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI NewNodeJob $ \jHandle p ->
postNodeAsync uId nId p (jobHandleLogger jHandle)
serveJobsAPI NewNodeJob $ \jHandle p -> postNodeAsync uId nId p jHandle
------------------------------------------------------------------------
postNodeAsync :: FlowCmdM env err m
postNodeAsync :: (FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> PostNode
-> (JobLog -> m ())
-> m JobLog
postNodeAsync uId nId (PostNode nodeName tn) logStatus = do
-> JobHandle m
-> m ()
postNodeAsync uId nId (PostNode nodeName tn) jobHandle = do
-- printDebug "postNodeAsync" nId
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
markStarted 3 jobHandle
markProgress 1 jobHandle
nodeUser <- getNodeUser (NodeId uId)
-- _ <- threadDelay 1000
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
markProgress 1 jobHandle
let uId' = nodeUser ^. node_user_id
_ <- mkNodeWithParent tn (Just nId) uId' nodeName
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
......@@ -44,7 +44,7 @@ import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), {-printDebug,-} pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
import Test.QuickCheck (elements)
......@@ -95,67 +95,38 @@ data Charts = Sources | Authors | Institutes | Ngrams | All
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI UpdateNodeJob $ \jHandle p ->
let
log' x = do
-- printDebug "updateNode" x
jobHandleLogger jHandle x
in updateNode uId nId p log'
updateNode uId nId p jHandle
updateNode :: (HasSettings env, FlowCmdM env err m)
updateNode :: (HasSettings env, FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> UpdateNodeParams
-> (JobLog -> m ())
-> m JobLog
updateNode uId nId (UpdateNodeParamsGraph metric partitionMethod bridgeMethod strength nt1 nt2) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
-> JobHandle m
-> m ()
updateNode uId nId (UpdateNodeParamsGraph metric partitionMethod bridgeMethod strength nt1 nt2) jobHandle = do
markStarted 2 jobHandle
-- printDebug "Computing graph: " method
_ <- recomputeGraph uId nId partitionMethod bridgeMethod (Just metric) (Just strength) nt1 nt2 True
-- printDebug "Graph computed: " method
markComplete jobHandle
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
updateNode _uId nid1 (LinkNodeReq nt nid2) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
updateNode _uId nid1 (LinkNodeReq nt nid2) jobHandle = do
markStarted 2 jobHandle
_ <- case nt of
NodeAnnuaire -> pairing nid2 nid1 Nothing -- defaultList
NodeCorpus -> pairing nid1 nid2 Nothing -- defaultList
_ -> panic $ "[G.API.N.Update.updateNode] NodeType not implemented"
<> cs (show nt <> " nid1: " <> show nid1 <> " nid2: " <> show nid2)
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
-- | `Advanced` to update graphs
updateNode _uId lId (UpdateNodeParamsList Advanced) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
updateNode _uId lId (UpdateNodeParamsList Advanced) jobHandle = do
markStarted 3 jobHandle
corpusId <- view node_parent_id <$> getNode lId
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markProgress 1 jobHandle
_ <- case corpusId of
Just cId -> do
......@@ -165,25 +136,13 @@ updateNode _uId lId (UpdateNodeParamsList Advanced) logStatus = do
pure ()
Nothing -> pure ()
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
updateNode _uId lId (UpdateNodeParamsList _mode) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
markComplete jobHandle
updateNode _uId lId (UpdateNodeParamsList _mode) jobHandle = do
markStarted 3 jobHandle
corpusId <- view node_parent_id <$> getNode lId
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markProgress 1 jobHandle
_ <- case corpusId of
Just cId -> do
......@@ -192,18 +151,10 @@ updateNode _uId lId (UpdateNodeParamsList _mode) logStatus = do
pure ()
Nothing -> pure ()
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
updateNode _userId phyloId (UpdateNodePhylo config) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
updateNode _userId phyloId (UpdateNodePhylo config) jobHandle = do
markStarted 3 jobHandle
corpusId' <- view node_parent_id <$> getNode phyloId
......@@ -211,35 +162,19 @@ updateNode _userId phyloId (UpdateNodePhylo config) logStatus = do
phy <- flowPhyloAPI (subConfig2config config) corpusId
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markProgress 1 jobHandle
_ <- updateHyperdata phyloId (HyperdataPhylo Nothing (Just phy))
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
updateNode _uId tId (UpdateNodeParamsTexts _mode) jobHandle = do
markStarted 3 jobHandle
updateNode _uId tId (UpdateNodeParamsTexts _mode) logStatus = do
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
corpusId <- view node_parent_id <$> getNode tId
lId <- defaultList $ fromMaybe (panic "[G.A.N.Update] updateNode/UpdateNodeParamsTexts: no defaultList") corpusId
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
markProgress 1 jobHandle
_ <- case corpusId of
Just cId -> do
......@@ -251,18 +186,11 @@ updateNode _uId tId (UpdateNodeParamsTexts _mode) logStatus = do
pure ()
Nothing -> pure ()
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
updateNode _uId _nId _p logStatus = do
simuLogs logStatus 10
updateNode _uId _nId _p jobHandle = do
simuLogs jobHandle 10
------------------------------------------------------------------------
-- TODO unPrefix "pn_" FromJSON, ToJSON, ToSchema, adapt frontend.
......
......@@ -24,6 +24,7 @@ import Control.Concurrent (threadDelay)
import Control.Exception (Exception)
import Control.Lens (Prism', (#))
import Control.Lens.TH (makePrisms)
import Control.Monad (mapM_)
import Control.Monad.Except (ExceptT)
import Control.Monad.Reader (ReaderT)
import Control.Monad.Error.Class (MonadError(..))
......@@ -42,6 +43,7 @@ import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.Node.Error (NodeError(..), HasNodeError(..))
import Gargantext.Database.Query.Tree
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..), JobHandle)
import qualified Gargantext.Utils.Jobs.Monad as Jobs
import Servant
import Servant.Job.Async
......@@ -140,31 +142,13 @@ instance HasJoseError GargError where
------------------------------------------------------------------------
-- | Utils
-- | Simulate logs
simuLogs :: MonadBase IO m
=> (JobLog -> m ())
-> Int
-> m JobLog
simuLogs logStatus t = do
_ <- mapM (\n -> simuTask logStatus n t) $ take t [0,1..]
pure $ JobLog { _scst_succeeded = Just t
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
simuTask :: MonadBase IO m
=> (JobLog -> m ())
-> Int
-> Int
-> m ()
simuTask logStatus cur total = do
let m = (10 :: Int) ^ (6 :: Int)
liftBase $ threadDelay (m*5)
let status = JobLog { _scst_succeeded = Just cur
, _scst_failed = Just 0
, _scst_remaining = (-) <$> Just total <*> Just cur
, _scst_events = Just []
}
-- printDebug "status" status
logStatus status
simuLogs :: (MonadBase IO m, MonadJobStatus m) => JobHandle m -> Int -> m ()
simuLogs jobHandle t = do
markStarted t jobHandle
mapM_ (const simuTask) $ take t ([0,1..] :: [Int])
markComplete jobHandle
where
simuTask = do
let m = (10 :: Int) ^ (6 :: Int)
liftBase $ threadDelay (m*5)
markProgress 1 jobHandle
......@@ -34,7 +34,6 @@ import Gargantext.API.Admin.FrontEnd (FrontEndAPI)
import Gargantext.API.Context
import Gargantext.API.Count (CountAPI, count, Query)
import Gargantext.API.Members (MembersAPI, members)
import Gargantext.API.Job (jobLogInit)
import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableDoc)
import Gargantext.API.Node
import Gargantext.API.Prelude
......@@ -45,7 +44,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (HasConfig(..))
import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_scrapers)
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import qualified Gargantext.API.GraphQL as GraphQL
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
......@@ -284,7 +283,7 @@ addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI AddCorpusQueryJob $ \jHandle q -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (jobHandleLogger jHandle)
New.addToCorpusWithQuery user cid q (Just limit) jHandle
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
......@@ -292,23 +291,18 @@ addCorpusWithQuery user cid =
addCorpusWithForm :: User -> ServerT New.AddWithForm (GargM Env GargError)
addCorpusWithForm user cid =
serveJobsAPI AddCorpusFormJob $ \jHandle i ->
let
log'' x = do
--printDebug "[addToCorpusWithForm] " x
jobHandleLogger jHandle x
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3)
serveJobsAPI AddCorpusFormJob $ \jHandle i -> do
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
markStarted 3 jHandle
New.addToCorpusWithForm user cid i jHandle
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
addCorpusWithFile user cid =
serveJobsAPI AddCorpusFileJob $ \jHandle i ->
let
log'' x = do
-- printDebug "[addToCorpusWithFile]" x
jobHandleLogger jHandle x
in New.addToCorpusWithFile user cid i log''
New.addToCorpusWithFile user cid i jHandle
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI AddAnnuaireFormJob $ \jHandle i ->
Annuaire.addToAnnuaireWithForm cid i (jobHandleLogger jHandle)
Annuaire.addToAnnuaireWithForm cid i jHandle
......@@ -47,7 +47,7 @@ import Gargantext.Database.Query.Table.Node.User (getNodeUser)
import Gargantext.Database.Schema.Node
import Gargantext.Database.Schema.Ngrams
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, jobHandleLogger)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Servant
import Servant.Job.Async (AsyncJobsAPI)
import Servant.XML
......@@ -257,8 +257,7 @@ type GraphAsyncAPI = Summary "Recompute graph"
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI RecomputeGraphJob $ \jHandle _ ->
graphRecompute u n (jobHandleLogger jHandle)
serveJobsAPI RecomputeGraphJob $ \jHandle _ -> graphRecompute u n jHandle
--graphRecompute :: UserId
......@@ -266,23 +265,15 @@ graphAsync u n =
-- -> (JobLog -> GargNoServer ())
-- -> GargNoServer JobLog
-- TODO get Graph Metadata to recompute
graphRecompute :: FlowCmdM env err m
graphRecompute :: (FlowCmdM env err m, MonadJobStatus m)
=> UserId
-> NodeId
-> (JobLog -> m ())
-> m JobLog
graphRecompute u n logStatus = do
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
-> JobHandle m
-> m ()
graphRecompute u n jobHandle = do
markStarted 1 jobHandle
_g <- recomputeGraph u n Spinglass BridgenessMethod_Basic Nothing Nothing NgramsTerms NgramsTerms False
pure JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
markComplete jobHandle
------------------------------------------------------------
type GraphVersionsAPI = Summary "Graph versions"
......
......@@ -73,7 +73,6 @@ import qualified Data.Map.Strict as Map
import qualified Data.Conduit.List as CL
import qualified Data.Conduit as C
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import Gargantext.Core (Lang(..), PosTagAlgo(..))
-- import Gargantext.Core.Ext.IMT (toSchoolName)
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
......@@ -111,6 +110,7 @@ import Gargantext.Database.Schema.Node (NodePoly(..), node_id)
import Gargantext.Database.Types
import Gargantext.Prelude
import Gargantext.Prelude.Crypto.Hash (Hash)
import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
import qualified Gargantext.Core.Text.Corpus.API as API
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
--import qualified Prelude
......@@ -187,13 +187,14 @@ getDataText_Debug a l q li = do
-------------------------------------------------------------------------------
flowDataText :: forall env err m.
( FlowCmdM env err m
, MonadJobStatus m
)
=> User
-> DataText
-> TermType Lang
-> CorpusId
-> Maybe FlowSocialListWith
-> (JobLog -> m ())
-> JobHandle m
-> m CorpusId
flowDataText u (DataOld ids) tt cid mfslw _ = do
(_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
......@@ -201,25 +202,25 @@ flowDataText u (DataOld ids) tt cid mfslw _ = do
flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
where
corpusType = (Nothing :: Maybe HyperdataCorpus)
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) logStatus
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
------------------------------------------------------------------------
-- TODO use proxy
flowAnnuaire :: (FlowCmdM env err m)
flowAnnuaire :: (FlowCmdM env err m, MonadJobStatus m)
=> User
-> Either CorpusName [CorpusId]
-> (TermType Lang)
-> FilePath
-> (JobLog -> m ())
-> JobHandle m
-> m AnnuaireId
flowAnnuaire u n l filePath logStatus = do
flowAnnuaire u n l filePath jobHandle = do
-- TODO Conduit for file
docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) jobHandle
------------------------------------------------------------------------
flowCorpusFile :: (FlowCmdM env err m)
flowCorpusFile :: (FlowCmdM env err m, MonadJobStatus m)
=> User
-> Either CorpusName [CorpusId]
-> Limit -- Limit the number of docs (for dev purpose)
......@@ -228,13 +229,13 @@ flowCorpusFile :: (FlowCmdM env err m)
-> FileFormat
-> FilePath
-> Maybe FlowSocialListWith
-> (JobLog -> m ())
-> JobHandle m
-> m CorpusId
flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
flowCorpusFile u n _l la ft ff fp mfslw jobHandle = do
eParsed <- liftBase $ parseFile ft ff fp
case eParsed of
Right parsed -> do
flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle
--let docs = splitEvery 500 $ take l parsed
--flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
Left e -> panic $ "Error: " <> T.pack e
......@@ -242,13 +243,13 @@ flowCorpusFile u n _l la ft ff fp mfslw logStatus = do
------------------------------------------------------------------------
-- | TODO improve the needed type to create/update a corpus
-- (For now, Either is enough)
flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
flowCorpus :: (FlowCmdM env err m, FlowCorpus a, MonadJobStatus m)
=> User
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
-> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> JobHandle m
-> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
......@@ -257,6 +258,7 @@ flow :: forall env err m a c.
( FlowCmdM env err m
, FlowCorpus a
, MkCorpus c
, MonadJobStatus m
)
=> Maybe c
-> User
......@@ -264,9 +266,9 @@ flow :: forall env err m a c.
-> TermType Lang
-> Maybe FlowSocialListWith
-> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> JobHandle m
-> m CorpusId
flow c u cn la mfslw (mLength, docsC) logStatus = do
flow c u cn la mfslw (mLength, docsC) jobHandle = do
(_userId, userCorpusId, listId) <- createNodes u cn c
-- TODO if public insertMasterDocs else insertUserDocs
_ <- runConduit $ zipSources (yieldMany [1..]) docsC
......@@ -302,11 +304,22 @@ flow c u cn la mfslw (mLength, docsC) logStatus = do
case mLength of
Nothing -> pure ()
Just len -> do
logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
, _scst_failed = Just 0
, _scst_remaining = Just $ fromIntegral $ len - maxIdx
, _scst_events = Just []
}
let succeeded = fromIntegral (1 + maxIdx)
let remaining = fromIntegral (len - maxIdx)
-- Reconstruct the correct update state by using 'markStarted' and the other primitives.
-- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
-- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
-- looking like this:
-- JobLog
-- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
-- , _scst_events = Just []
-- }
markStarted (remaining + succeeded) jobHandle
markProgress succeeded jobHandle
pure ids
......
......@@ -5,7 +5,7 @@ module Gargantext.Utils.Jobs (
-- * Parsing and reading @GargJob@s from disk
, readPrios
-- * Handy re-exports
, jobHandleLogger
, MonadJobStatus(..)
) where
import Control.Monad.Except
......@@ -34,14 +34,15 @@ serveJobsAPI
, ToJSON (JobEventType m)
, ToJSON (JobOutputType m)
, MonadJobStatus m
, m ~ (GargM env GargError)
, m ~ (GargM Env GargError)
, JobEventType m ~ JobOutputType m
)
=> JobType m
-> (JobHandle m (JobEventType m) -> input -> m (JobOutputType m))
-> (JobHandle m -> input -> m ())
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI ask jobType jobErrorToGargError $ \env jHandle i -> do
serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do
putStrLn ("Running job of type: " ++ show jobType)
runExceptT $ runReaderT (f jHandle i) env
runExceptT $ runReaderT (f jHandle i >> getLatestJobStatus jHandle) env
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
......
......@@ -37,14 +37,15 @@ serveJobsAPI
, ToJSON e, ToJSON event, ToJSON output
, Foldable callback
)
=> m env
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (JobError -> e)
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> (env -> JobHandle m -> input -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
:<|> newJob getenv t f
serveJobsAPI newJobHandle getenv t joberr f
= newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
:<|> newJob newJobHandle getenv t f
:<|> serveJobAPI t joberr
serveJobAPI
......@@ -74,12 +75,13 @@ newJob
, ToJSON e, ToJSON event, ToJSON output
, Foldable callbacks
)
=> m env
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (env -> JobHandle m event -> input -> IO (Either e output))
-> (env -> JobHandle m -> input -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
newJob newJobHandle getenv jobkind f input = do
je <- getJobEnv
env <- getenv
let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
......@@ -91,7 +93,7 @@ newJob getenv jobkind f input = do
logF w
f' jId inp logF = do
r <- f env (mkJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
......
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies #-}
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses, TypeFamilies, TypeFamilyDependencies #-}
module Gargantext.Utils.Jobs.Monad (
-- * Types and classes
JobEnv(..)
, NumRunners
, JobError(..)
, JobHandle -- opaque
, MonadJob(..)
-- * Tracking jobs status
, MonadJobStatus(..)
, getLatestJobStatus
, updateJobProgress
-- * Functions
, newJobEnv
......@@ -27,8 +24,6 @@ module Gargantext.Utils.Jobs.Monad (
, withJob
, handleIDError
, removeJob
, mkJobHandle
, jobHandleLogger
) where
import Gargantext.Utils.Jobs.Settings
......@@ -40,11 +35,11 @@ import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Control.Monad.Reader
import Data.Functor ((<&>))
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Sequence (Seq, viewr, ViewR(..))
import Data.Sequence (Seq)
import Data.Time.Clock
import qualified Data.Text as T
import Network.HTTP.Client (Manager)
import Prelude
......@@ -178,60 +173,42 @@ removeJob queued t jid = do
-- Tracking jobs status
--
-- | An opaque handle that abstracts over the concrete identifier for
-- a job. The constructor for this type is deliberately not exported.
data JobHandle m event = JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: LoggerM m event
}
-- | Creates a new 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m event
mkJobHandle jId = JobHandle jId
jobHandleLogger :: JobHandle m event -> LoggerM m event
jobHandleLogger (JobHandle _ lgr) = lgr
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
-- | This is type family for the concrete 'JobHandle' that is associated to
-- a job when it starts and it can be used to query for its completion status. Different environment
-- can decide how this will look like.
type JobHandle m :: Type
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
--
-- Tracking jobs status API
--
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: MonadJobStatus m => JobHandle m -> m (JobEventType m)
-- | Adds an extra \"tracer\" that logs events to the passed action. Produces
-- a new 'JobHandle'.
withTracer :: Logger (JobEventType m) -> JobHandle m -> (JobHandle m -> m a) -> m a
-- Creating events
-- | Start tracking a new 'JobEventType' with 'n' remaining steps.
markStarted :: Int -> JobHandle m -> m ()
-- | Mark 'n' steps of the job as succeeded, while simultaneously substracting this number
-- from the remaining steps.
markProgress :: Int -> JobHandle m -> m ()
-- | Mark 'n' step of the job as failed, while simultaneously substracting this number
-- from the remaining steps. Attach an optional error message to the failure.
markFailure :: Int -> Maybe T.Text -> JobHandle m -> m ()
-- | Finish tracking a job by marking all the remaining steps as succeeded.
markComplete :: JobHandle m -> m ()
-- | Retrevies the latest 'JobEventType' from the underlying monad. It can be
-- used to query the latest status for a particular job, given its 'JobHandle' as input.
getLatestJobStatus :: MonadJobStatus m => JobHandle m (JobEventType m) -> m (Maybe (JobEventType m))
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- findJob jId
case mb_jb of
Nothing -> pure Nothing
Just j -> case jTask j of
QueuedJ _ -> pure Nothing
RunningJ rj -> liftIO (rjGetLog rj) <&>
\lgs -> case viewr lgs of
EmptyR -> Nothing
_ :> l -> Just l
DoneJ lgs _ -> pure $ case viewr lgs of
EmptyR -> Nothing
_ :> l -> Just l
updateJobProgress :: (Monoid (JobEventType m), MonadJobStatus m)
=> JobHandle m (JobEventType m)
-- ^ The handle that uniquely identifies this job.
-> (JobEventType m -> JobEventType m)
-- ^ A /pure/ function to update the 'JobEventType'. The input
-- is the /latest/ event, i.e. the current progress status. If
-- this is the first time we report progress and therefore there
-- is no previous progress status, this function will be applied
-- over 'mempty', thus the 'Monoid' constraint.
-> m ()
updateJobProgress hdl@(JobHandle _jId logStatus) updateJobStatus = do
latestStatus <- getLatestJobStatus hdl
case latestStatus of
Nothing -> logStatus (updateJobStatus mempty)
Just s -> logStatus (updateJobStatus s)
-- | Finish tracking a job by marking all the remaining steps as failed. Attach an optional
-- message to the failure.
markFailed :: Maybe T.Text -> JobHandle m -> m ()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment