Commit 8390c939 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[async jobs] try to implement push events for async job

parent 67db03cc
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Job where module Gargantext.API.Job where
import Control.Lens (over, _Just) import Data.Swagger
import Data.IORef import Servant
import Data.Maybe import Servant.Job.Async
import qualified Data.Text as T
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Prelude (GargServer)
import Gargantext.API.Utils.Job
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.API.Admin.Orchestrator.Types type API = Summary "Job API (for testing)"
:> "jobs"
:> AsyncJobs JobLog '[JSON] () JobLog
jobLogInit :: Int -> JobLog api :: GargServer API
jobLogInit rem = api =
JobLog { _scst_succeeded = Just 0 serveJobsAPI $ fromJobFunctionS (jobLogInit 0) $ JobFunctionS $ \input -> do
, _scst_remaining = Just rem pushEvent (addRem 2)
, _scst_failed = Just 0 pure $ jobLogInit 0
, _scst_events = Just [] }
\ No newline at end of file
addEvent :: T.Text -> T.Text -> JobLog -> JobLog
addEvent level message (JobLog { _scst_events = mEvts, .. }) = JobLog { _scst_events = Just (evts <> [ newEvt ]), .. }
where
evts = fromMaybe [] mEvts
newEvt = ScraperEvent { _scev_message = Just message
, _scev_level = Just level
, _scev_date = Nothing }
jobLogSuccess :: JobLog -> JobLog
jobLogSuccess jl = over (scst_succeeded . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
jobLogFail :: JobLog -> JobLog
jobLogFail jl = over (scst_failed . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
jobLogFailTotal :: JobLog -> JobLog
jobLogFailTotal (JobLog { _scst_succeeded = mSucc
, _scst_remaining = mRem
, _scst_failed = mFail
, _scst_events = evt }) =
JobLog { _scst_succeeded = mSucc
, _scst_remaining = newRem
, _scst_failed = newFail
, _scst_events = evt }
where
(newRem, newFail) = case mRem of
Nothing -> (Nothing, mFail)
Just rem -> (Just 0, (+ rem) <$> mFail)
jobLogFailTotalWithMessage :: T.Text -> JobLog -> JobLog
jobLogFailTotalWithMessage message jl = addEvent "ERROR" message $ jobLogFailTotal jl
jobLogEvt :: JobLog -> ScraperEvent -> JobLog
jobLogEvt jl evt = over (scst_events . _Just) (\evts -> (evt:evts)) jl
runJobLog :: MonadBase IO m => Int -> (JobLog -> m ()) -> m (m (), m (), m JobLog)
runJobLog num logStatus = do
jlRef <- liftBase $ newIORef $ jobLogInit num
return (logRefF jlRef, logRefSuccessF jlRef, getRefF jlRef)
where
logRefF ref = do
jl <- liftBase $ readIORef ref
logStatus jl
logRefSuccessF ref = do
jl <- liftBase $ readIORef ref
let jl' = jobLogSuccess jl
liftBase $ writeIORef ref jl'
logStatus jl'
getRefF ref = do
liftBase $ readIORef ref
...@@ -95,7 +95,7 @@ import Formatting (hprint, int, (%)) ...@@ -95,7 +95,7 @@ import Formatting (hprint, int, (%))
import GHC.Generics (Generic) import GHC.Generics (Generic)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings) import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job import Gargantext.API.Utils.Job
import Gargantext.API.Ngrams.Types import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
......
...@@ -24,7 +24,7 @@ import Gargantext.Prelude.Config ...@@ -24,7 +24,7 @@ import Gargantext.Prelude.Config
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..)) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
--import Gargantext.API.Admin.Types (HasSettings) --import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (jobLogSuccess) import Gargantext.API.Utils.Job (jobLogSuccess)
import Gargantext.Core (Lang(..), PosTagAlgo(..)) import Gargantext.Core (Lang(..), PosTagAlgo(..))
import qualified Gargantext.Core.Text.Corpus.API as API import qualified Gargantext.Core.Text.Corpus.API as API
import Gargantext.Core.Text.List (buildNgramsLists) import Gargantext.Core.Text.List (buildNgramsLists)
......
...@@ -14,7 +14,7 @@ import Servant.Job.Async ...@@ -14,7 +14,7 @@ import Servant.Job.Async
import qualified Data.Text as T import qualified Data.Text as T
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogSuccess) import Gargantext.API.Utils.Job (jobLogSuccess)
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core (Lang(..)) import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Parsers.Date (dateSplit) import Gargantext.Core.Text.Corpus.Parsers.Date (dateSplit)
......
...@@ -24,7 +24,7 @@ import Data.Swagger ...@@ -24,7 +24,7 @@ import Data.Swagger
import qualified Data.Text as T import qualified Data.Text as T
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings) import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (jobLogSuccess, jobLogFailTotalWithMessage) import Gargantext.API.Utils.Job (jobLogSuccess, jobLogFailTotalWithMessage)
import Gargantext.API.Prelude (GargServer) import Gargantext.API.Prelude (GargServer)
import Gargantext.Core (Lang(..)) import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Parsers.FrameWrite import Gargantext.Core.Text.Corpus.Parsers.FrameWrite
......
...@@ -18,7 +18,7 @@ import Servant.Job.Async ...@@ -18,7 +18,7 @@ import Servant.Job.Async
import Web.FormUrlEncoded (FromForm) import Web.FormUrlEncoded (FromForm)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogInit, jobLogSuccess, jobLogFail) import Gargantext.API.Utils.Job (jobLogInit, jobLogSuccess, jobLogFail)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm) import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.Corpus.New.File (FileType(..)) import Gargantext.API.Node.Corpus.New.File (FileType(..))
import Gargantext.API.Node.Types (NewWithForm(..)) import Gargantext.API.Node.Types (NewWithForm(..))
......
...@@ -76,6 +76,7 @@ type GargServerC env err m = ...@@ -76,6 +76,7 @@ type GargServerC env err m =
, HasNodeStory env err m , HasNodeStory env err m
, EnvC env , EnvC env
, ErrC err , ErrC err
, MonadPushEvent JobLog m
, ToJSON err , ToJSON err
) )
......
...@@ -33,7 +33,8 @@ import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, Authenticated ...@@ -33,7 +33,8 @@ import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, Authenticated
import Gargantext.API.Admin.FrontEnd (FrontEndAPI) import Gargantext.API.Admin.FrontEnd (FrontEndAPI)
import Gargantext.API.Context import Gargantext.API.Context
import Gargantext.API.Count (CountAPI, count, Query) import Gargantext.API.Count (CountAPI, count, Query)
import Gargantext.API.Job (jobLogInit) import Gargantext.API.Utils.Job (jobLogInit)
import qualified Gargantext.API.Job as Job
import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableDoc) import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableDoc)
import Gargantext.API.Node import Gargantext.API.Node
import Gargantext.API.Prelude import Gargantext.API.Prelude
...@@ -174,6 +175,8 @@ type GargPrivateAPI' = ...@@ -174,6 +175,8 @@ type GargPrivateAPI' =
:<|> List.GETAPI :<|> List.GETAPI
:<|> List.JSONAPI :<|> List.JSONAPI
:<|> List.CSVAPI :<|> List.CSVAPI
:<|> Job.API
{- {-
:<|> "wait" :> Summary "Wait test" :<|> "wait" :> Summary "Wait test"
:> Capture "x" Int :> Capture "x" Int
...@@ -255,6 +258,8 @@ serverPrivateGargAPI' (AuthenticatedUser (NodeId uid)) ...@@ -255,6 +258,8 @@ serverPrivateGargAPI' (AuthenticatedUser (NodeId uid))
:<|> List.getApi :<|> List.getApi
:<|> List.jsonApi :<|> List.jsonApi
:<|> List.csvApi :<|> List.csvApi
:<|> Job.api
-- :<|> waitAPI -- :<|> waitAPI
......
module Gargantext.API.Utils.Job where
import Control.Lens (over, _Just)
import Data.IORef
import Data.Maybe
import qualified Data.Text as T
import Gargantext.Prelude
import Gargantext.API.Admin.Orchestrator.Types
jobLogInit :: Int -> JobLog
jobLogInit rem =
JobLog { _scst_succeeded = Just 0
, _scst_remaining = Just rem
, _scst_failed = Just 0
, _scst_events = Just [] }
addRem :: Int -> JobLog -> JobLog
addRem rem (JobLog { _scst_remaining = Nothing, .. }) = JobLog { _scst_remaining = Just rem, .. }
addRem rem (JobLog { _scst_remaining = Just r, .. }) = JobLog { _scst_remaining = Just (r + rem), .. }
addEvent :: T.Text -> T.Text -> JobLog -> JobLog
addEvent level message (JobLog { _scst_events = mEvts, .. }) = JobLog { _scst_events = Just (evts <> [ newEvt ]), .. }
where
evts = fromMaybe [] mEvts
newEvt = ScraperEvent { _scev_message = Just message
, _scev_level = Just level
, _scev_date = Nothing }
jobLogSuccess :: JobLog -> JobLog
jobLogSuccess jl = over (scst_succeeded . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
jobLogFail :: JobLog -> JobLog
jobLogFail jl = over (scst_failed . _Just) (+ 1) $
over (scst_remaining . _Just) (\x -> x - 1) jl
jobLogFailTotal :: JobLog -> JobLog
jobLogFailTotal (JobLog { _scst_succeeded = mSucc
, _scst_remaining = mRem
, _scst_failed = mFail
, _scst_events = evt }) =
JobLog { _scst_succeeded = mSucc
, _scst_remaining = newRem
, _scst_failed = newFail
, _scst_events = evt }
where
(newRem, newFail) = case mRem of
Nothing -> (Nothing, mFail)
Just rem -> (Just 0, (+ rem) <$> mFail)
jobLogFailTotalWithMessage :: T.Text -> JobLog -> JobLog
jobLogFailTotalWithMessage message jl = addEvent "ERROR" message $ jobLogFailTotal jl
jobLogEvt :: JobLog -> ScraperEvent -> JobLog
jobLogEvt jl evt = over (scst_events . _Just) (\evts -> (evt:evts)) jl
runJobLog :: MonadBase IO m => Int -> (JobLog -> m ()) -> m (m (), m (), m JobLog)
runJobLog num logStatus = do
jlRef <- liftBase $ newIORef $ jobLogInit num
return (logRefF jlRef, logRefSuccessF jlRef, getRefF jlRef)
where
logRefF ref = do
jl <- liftBase $ readIORef ref
logStatus jl
logRefSuccessF ref = do
jl <- liftBase $ readIORef ref
let jl' = jobLogSuccess jl
liftBase $ writeIORef ref jl'
logStatus jl'
getRefF ref = do
liftBase $ readIORef ref
...@@ -31,7 +31,7 @@ import Data.Either(Either(..)) ...@@ -31,7 +31,7 @@ import Data.Either(Either(..))
import Data.Either.Extra (partitionEithers) import Data.Either.Extra (partitionEithers)
import Data.List (concat, lookup) import Data.List (concat, lookup)
import Data.Ord() import Data.Ord()
import Data.String (String()) import Data.String ( String() )
import Data.String() import Data.String()
import Data.Text (Text) import Data.Text (Text)
import Data.Text.Encoding (decodeUtf8) import Data.Text.Encoding (decodeUtf8)
...@@ -42,7 +42,7 @@ import qualified Data.ByteString.Char8 as DBC ...@@ -42,7 +42,7 @@ import qualified Data.ByteString.Char8 as DBC
import qualified Data.ByteString.Lazy as DBL import qualified Data.ByteString.Lazy as DBL
import qualified Data.Map as DM import qualified Data.Map as DM
import qualified Data.Text as DT import qualified Data.Text as DT
import qualified Prelude as Prelude import qualified Prelude
import System.IO.Temp (emptySystemTempFile) import System.IO.Temp (emptySystemTempFile)
import Gargantext.Core (Lang(..)) import Gargantext.Core (Lang(..))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment