[worker] more old jobs code removal

parent 99e9cd42
Pipeline #6924 failed with stages
in 17 minutes and 20 seconds
......@@ -11,10 +11,9 @@ Import a corpus binary.
-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE Strict #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE LambdaCase #-}
module CLI.Import where
......
......@@ -278,12 +278,7 @@ library
Gargantext.System.Logging
Gargantext.Utils.Dict
Gargantext.Utils.Jobs.Error
Gargantext.Utils.Jobs.Internal
Gargantext.Utils.Jobs.Map
Gargantext.Utils.Jobs.Monad
Gargantext.Utils.Jobs.Queue
Gargantext.Utils.Jobs.Settings
Gargantext.Utils.Jobs.State
Gargantext.Utils.SpacyNLP
Gargantext.Utils.SpacyNLP.Types
Gargantext.Utils.Tuple
......
......@@ -5,12 +5,9 @@
{-# LANGUAGE LambdaCase #-}
module Gargantext.API.Admin.EnvTypes (
GargJob(..)
, parseGargJob
, Env(..)
Env(..)
, Mode(..)
, modeToLoggingLevels
, mkJobHandle
, env_config
, env_logger
, env_manager
......@@ -28,23 +25,17 @@ module Gargantext.API.Admin.EnvTypes (
, MockEnv(..)
, DevEnv(..)
, DevJobHandle(..)
, ConcreteJobHandle -- opaque
) where
import Control.Lens hiding (Level, (:<), (.=))
import Control.Monad.Except
import Control.Monad.Reader
import Data.Aeson qualified as Aeson
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.List ((\\))
import Data.Pool (Pool)
import Data.Sequence (ViewL(..), viewl)
import Data.Text qualified as T
import Database.PostgreSQL.Simple (Connection)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Errors.Types
import Gargantext.API.Job
import Gargantext.API.Prelude (GargM, IsGargServer)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
......@@ -57,13 +48,11 @@ import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Map (LoggerM, J(..), jTask, rjGetLog)
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Network.HTTP.Client (Manager)
import Servant.Auth.Server (JWTSettings)
import Servant.Client (BaseUrl)
import Servant.Job.Async (HasJobEnv(..), Job)
import Servant.Job.Async qualified as SJ
import Servant.Job.Core qualified
import System.Log.FastLogger qualified as FL
......@@ -102,71 +91,6 @@ instance HasLogger (GargM Env BackendInternalError) where
logTxt lgr lvl msg = logMsg lgr lvl (FL.toLogStr $ T.unpack msg)
-- {-# DEPRECATED GargJob "GargJob is deprecated, use Worker.Jobs.Types.Job instead" #-}
data GargJob
= AddAnnuaireFormJob
| AddContactJob
| AddCorpusFileJob
| AddCorpusFormJob
| AddCorpusQueryJob
| AddFileJob
| DocumentFromWriteNodeJob
| ForgotPasswordJob
| NewNodeJob
| RecomputeGraphJob
| TableNgramsJob
| UpdateNgramsListJobJSON
| UpdateNgramsListJobTSV
| UpdateNodeJob
| UploadDocumentJob
| UploadFrameCalcJob
deriving (Show, Eq, Ord, Enum, Bounded)
parseGargJob :: Text -> Maybe GargJob
parseGargJob s = case s of
"addannuaireform" -> Just AddAnnuaireFormJob
"addcontact" -> Just AddContactJob
"addcorpusfile" -> Just AddCorpusFileJob
"addcorpusform" -> Just AddCorpusFormJob
"addcorpusquery" -> Just AddCorpusQueryJob
"addfile" -> Just AddFileJob
"documentfromwritenode" -> Just DocumentFromWriteNodeJob
"forgotpassword" -> Just ForgotPasswordJob
"newnode" -> Just NewNodeJob
"recomputegraph" -> Just RecomputeGraphJob
"tablengrams" -> Just TableNgramsJob
"updatedocument" -> Just UploadDocumentJob
"updateframecalc" -> Just UploadFrameCalcJob
"updatengramslistjson" -> Just UpdateNgramsListJobJSON
"updatengramslisttsv" -> Just UpdateNgramsListJobTSV
"updatenode" -> Just UpdateNodeJob
_ -> Nothing
instance FromJSON GargJob where
parseJSON = withObject "GargJob" $ \o -> do
type_ <- o .: "type"
case parseGargJob type_ of
Just gj -> return gj
Nothing -> prependFailure "parsing garg job type failed, " (typeMismatch "type" $ Aeson.String type_)
instance ToJSON GargJob where
toJSON AddAnnuaireFormJob = object [ ("type" .= ("addannuaireform" :: Text))]
toJSON AddContactJob = object [ ("type" .= ("addcontact" :: Text))]
toJSON AddCorpusFileJob = object [ ("type" .= ("addcorpusfile" :: Text))]
toJSON AddCorpusFormJob = object [ ("type" .= ("addcorpusform" :: Text))]
toJSON AddCorpusQueryJob = object [ ("type" .= ("addcorpusquery" :: Text))]
toJSON AddFileJob = object [ ("type" .= ("addfile" :: Text))]
toJSON DocumentFromWriteNodeJob = object [ ("type" .= ("documentfromwritenode" :: Text))]
toJSON ForgotPasswordJob = object [ ("type" .= ("forgotpassword" :: Text))]
toJSON NewNodeJob = object [ ("type" .= ("newnode" :: Text))]
toJSON RecomputeGraphJob = object [ ("type" .= ("recomputegraph" :: Text))]
toJSON TableNgramsJob = object [ ("type" .= ("tablengrams" :: Text))]
toJSON UploadDocumentJob = object [ ("type" .= ("updatedocument" :: Text))]
toJSON UploadFrameCalcJob = object [ ("type" .= ("updateframecalc" :: Text))]
toJSON UpdateNgramsListJobJSON = object [ ("type" .= ("updatengramslistjson" :: Text))]
toJSON UpdateNgramsListJobTSV = object [ ("type" .= ("updatengramslisttsv" :: Text))]
toJSON UpdateNodeJob = object [ ("type" .= ("updatenode" :: Text))]
-- Do /not/ treat the data types of this type as strict, because it's convenient
-- to be able to partially initialise things like an 'Env' during tests, without
-- having to specify /everything/. This means that when we /construct/ an 'Env',
......@@ -178,7 +102,6 @@ data Env = Env
, _env_manager :: ~Manager
, _env_self_url :: ~BaseUrl
, _env_scrapers :: ~ScrapersEnv
, _env_jobs :: ~(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
, _env_config :: ~GargConfig
, _env_central_exchange :: ~ThreadId
, _env_dispatcher :: ~Dispatcher
......@@ -221,94 +144,10 @@ instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
instance Jobs.MonadJob (GargM Env err) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs)
instance CET.HasCentralExchangeNotification Env where
ce_notify m = do
c <- asks (view env_config)
liftBase $ CE.notify (_gc_notifications_config c) m
-- | The /concrete/ 'JobHandle' in use with our 'GargM' (production) monad. Its
-- constructor it's not exported, to not leak internal details of its implementation.
data ConcreteJobHandle err =
ConcreteNullHandle
| JobHandle {
_jh_id :: !(SJ.JobID 'SJ.Safe)
, _jh_logger :: LoggerM (GargM Env err) JobLog
}
-- | Creates a new /concrete/ 'JobHandle', given its underlying 'JobID' and the logging function to
-- be used to report the status.
mkJobHandle :: SJ.JobID 'SJ.Safe
-> LoggerM (GargM Env err) JobLog
-> ConcreteJobHandle err
mkJobHandle jId = JobHandle jId
-- | Updates the status of a 'JobHandle' by using the input 'updateJobStatus' function.
updateJobProgress :: ConcreteJobHandle err -> (JobLog -> JobLog) -> GargM Env err ()
updateJobProgress _ _ = pure ()
-- updateJobProgress ConcreteNullHandle _ = pure ()
-- updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do
-- jobLog <- Jobs.getLatestJobStatus hdl
-- let jobLogNew = updateJobStatus jobLog
-- logStatus jobLogNew
-- mJb <- Jobs.findJob jId
-- case mJb of
-- Nothing -> pure ()
-- Just je -> do
-- -- We use the same endpoint as the one for polling jobs via
-- -- API. This way we can send the job status directly in the
-- -- notification
-- j <- pollJob (Just $ SJ.Limit 1) Nothing jId je
-- CET.ce_notify $ CET.UpdateJobProgress j
instance Jobs.MonadJobStatus (GargM Env err) where
type JobHandle (GargM Env err) = ConcreteJobHandle err
type JobType (GargM Env err) = GargJob
type JobOutputType (GargM Env err) = JobLog
type JobEventType (GargM Env err) = JobLog
noJobHandle Proxy = ConcreteNullHandle
getLatestJobStatus ConcreteNullHandle = pure noJobLog
getLatestJobStatus (JobHandle jId _) = do
mb_jb <- Jobs.findJob jId
case mb_jb of
Nothing -> pure noJobLog
Just j -> case jTask j of
QueuedJ _ -> pure noJobLog
RunningJ rj -> liftIO (rjGetLog rj) <&>
\lgs -> case viewl lgs of
EmptyL -> noJobLog
l :< _ -> l
DoneJ lgs _ -> pure $ case viewl lgs of
EmptyL -> noJobLog
l :< _ -> l
withTracer _ ConcreteNullHandle f = f ConcreteNullHandle
withTracer extraLogger (JobHandle jId logger) n = n (JobHandle jId (\w -> logger w >> liftIO (extraLogger w)))
markStarted n jh = updateJobProgress jh (const $ jobLogStart (RemainingSteps n))
markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailures steps latest
Just msg -> addErrorEvent msg (jobLogFailures steps latest)
)
markComplete jh = updateJobProgress jh jobLogComplete
markFailed mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailTotal latest
Just msg -> jobLogFailTotalWithMessage msg latest
)
addMoreSteps steps jh = updateJobProgress jh (jobLogAddMore steps)
data FireWall = FireWall { unFireWall :: Bool }
......@@ -357,11 +196,9 @@ instance CET.HasCentralExchangeNotification DevEnv where
-- | Our /mock/ job handle.
data DevJobHandle = DevJobHandle
instance Jobs.MonadJobStatus (GargM DevEnv err) where
instance MonadJobStatus (GargM DevEnv err) where
type JobHandle (GargM DevEnv err) = DevJobHandle
type JobType (GargM DevEnv err) = GargJob
type JobOutputType (GargM DevEnv err) = JobLog
type JobEventType (GargM DevEnv err) = JobLog
......
......@@ -22,7 +22,6 @@ import Codec.Serialise (Serialise(), serialise)
import Control.Lens
import Control.Monad.Reader
import Data.ByteString.Lazy qualified as L
import Data.Map.Strict qualified as Map
import Data.Pool (Pool)
import Data.Pool qualified as Pool
import Database.PostgreSQL.Simple (Connection, connect, close, ConnectInfo)
......@@ -31,14 +30,12 @@ import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Config (GargConfig(..), gc_jobs, gc_frontend_config, hasConfig)
import Gargantext.Core.Config.Types (PortNumber, SettingsFile(..), fc_appPort, jc_js_job_timeout, jc_js_id_timeout, jwtSettings)
import Gargantext.Core.Config (GargConfig(..), gc_frontend_config)
import Gargantext.Core.Config.Types (PortNumber, SettingsFile(..), fc_appPort, jwtSettings)
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.NodeStory
import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Gargantext.Utils.Jobs.Settings qualified as Jobs
import Network.HTTP.Client.TLS (newTlsManager)
import Servant.Client (parseBaseUrl)
import Servant.Job.Async (newJobEnv, defaultSettings)
......@@ -160,17 +157,15 @@ newEnv logger port settingsFile = do
-- let prios' = Jobs.applyPrios prios Jobs.defaultPrios
-- putStrLn ("Overrides: " <> show prios :: Text)
-- putStrLn ("New priorities: " <> show prios' :: Text)
let prios = Map.empty
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
!pool <- newPool $ _gc_database_config config_env
!nodeStory_env <- fromDBNodeStoryEnv pool
!scrapers_env <- newJobEnv defaultSettings manager_env
secret <- Jobs.genSecret
let jobs_settings = (Jobs.defaultJobSettings 1 secret)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!jobs_env <- Jobs.newJobEnv jobs_settings prios manager_env
-- secret <- Jobs.genSecret
-- let jobs_settings = (Jobs.defaultJobSettings 1 secret)
-- & Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
-- & Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env)
!dispatcher <- D.newDispatcher (_gc_notifications_config config_env)
......@@ -186,7 +181,6 @@ newEnv logger port settingsFile = do
, _env_nodeStory = nodeStory_env
, _env_manager = manager_env
, _env_scrapers = scrapers_env
, _env_jobs = jobs_env
, _env_self_url = self_url_env
, _env_config = config_env
, _env_central_exchange = central_exchange
......
......@@ -259,7 +259,3 @@ performAction env _state bm = do
UploadDocument { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] upload document"
void $ documentUploadAsync _ud_node_id _ud_args jh
GargJob { _gj_garg_job } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "Garg job: " <> show _gj_garg_job <> " (handling of this job is still not implemented!)"
return ()
......@@ -24,9 +24,8 @@ import Data.Maybe (fromJust)
import Data.Pool qualified as Pool
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.EnvTypes (GargJob, Mode(Dev), modeToLoggingLevels)
import Gargantext.API.Admin.EnvTypes (Mode(Dev), modeToLoggingLevels)
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
-- import Gargantext.API.Admin.Settings ( newPool )
import Gargantext.API.Job (RemainingSteps(..), jobLogStart, jobLogProgress, jobLogFailures, jobLogComplete, addErrorEvent, jobLogFailTotal, jobLogFailTotalWithMessage, jobLogAddMore)
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
......@@ -224,12 +223,9 @@ data WorkerJobHandle =
instance MonadJobStatus WorkerMonad where
-- type JobHandle WorkerMonad = WorkerJobHandle
type JobHandle WorkerMonad = WorkerJobHandle
type JobType WorkerMonad = GargJob
type JobOutputType WorkerMonad = JobLog
type JobEventType WorkerMonad = JobLog
-- noJobHandle _ = WorkerNoJobHandle
-- noJobHandle _ = noJobHandle (Proxy :: Proxy (GargM WorkerEnv IOException)) -- ConcreteNullHandle
noJobHandle Proxy = WorkerNoJobHandle
getLatestJobStatus _ = WorkerMonad (pure noJobLog)
withTracer _ jh n = n jh
......
......@@ -16,7 +16,6 @@ module Gargantext.Core.Worker.Jobs.Types where
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams)
import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.API.Ngrams.Types (NgramsList, UpdateTableNgramsCharts(_utn_list_id))
import Gargantext.API.Node.Corpus.Annuaire (AnnuaireWithForm)
import Gargantext.API.Node.Contact.Types (AddContactParams)
......@@ -66,7 +65,6 @@ data Job =
, _un_args :: UpdateNodeParams }
| UploadDocument { _ud_node_id :: NodeId
, _ud_args :: DocumentUpload }
| GargJob { _gj_garg_job :: GargJob }
deriving (Show, Eq)
instance FromJSON Job where
parseJSON = withObject "Job" $ \o -> do
......@@ -134,9 +132,6 @@ instance FromJSON Job where
_ud_node_id <- o .: "node_id"
_ud_args <- o .: "args"
return $ UploadDocument { .. }
"GargJob" -> do
_gj_garg_job <- o .: "garg_job"
return $ GargJob { .. }
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [ "type" .= ("Ping" :: Text) ]
......@@ -201,9 +196,6 @@ instance ToJSON Job where
object [ "type" .= ("UploadDocument" :: Text)
, "node_id" .= _ud_node_id
, "args" .= _ud_args ]
toJSON (GargJob { .. }) =
object [ "type" .= ("GargJob" :: Text)
, "garg_job" .= _gj_garg_job ]
......@@ -231,5 +223,4 @@ getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id
getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id
getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (GargJob {}) = Nothing
{-|
Module : Gargantext.Utils.Jobs.Internal
Description : Servant Jobs
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.Utils.Jobs.Internal (
serveJobsAPI
-- * Internals for testing
, newJob
, pollJob
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception.Safe
import Control.Lens
import Control.Monad
import Control.Monad.Except
import Data.Aeson (ToJSON)
import Data.Foldable (toList)
import Data.Monoid
import Data.Kind (Type)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Gargantext.Prelude (panicTrace)
import Prelude
import Protolude qualified
import Servant.API.Alternative
import Servant.API.ContentTypes
import Gargantext.API.Errors.Types (BackendInternalError)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import qualified Data.Text as T
import qualified Servant.Client as C
import qualified Servant.Job.Async as SJ
import qualified Servant.Job.Client as SJ
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
serveJobsAPI
:: ( Ord t, MonadError BackendInternalError m
, MonadJob m t (Seq event) output
, ToJSON event, ToJSON output, MimeRender JSON output
, Foldable callback
)
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (JobError -> BackendInternalError)
-> (env -> JobHandle m -> input -> IO (Either BackendInternalError output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI newJobHandle getenv t joberr f
= newJob newJobHandle getenv t f (SJ.JobInput undefined Nothing)
:<|> newJob newJobHandle getenv t f
:<|> serveJobAPI t joberr
serveJobAPI
:: forall (m :: Type -> Type) t event output.
(Ord t, MonadError BackendInternalError m, MonadJob m t (Seq event) output, MimeRender JSON output)
=> t
-> (JobError -> BackendInternalError)
-> SJ.JobID 'SJ.Unsafe
-> SJ.AsyncJobServerT event output m
serveJobAPI t joberr jid' = wrap' (killJob t)
:<|> wrap' pollJob
:<|> wrap (waitJob joberr)
where wrap
:: forall a.
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
-> m a
wrap g = do
jid <- handleIDError joberr (checkJID jid')
job <- maybe (throwError $ joberr $ UnknownJob (SJ._id_number jid)) pure =<< findJob jid
g jid job
wrap' g limit offset = wrap (g limit offset)
newJob
:: ( Ord t, MonadJob m t (Seq event) output
, ToJSON event, ToJSON output
, MimeRender JSON output
, Foldable callbacks
)
=> (SJ.JobID 'SJ.Safe -> LoggerM m event -> JobHandle m)
-> m env
-> t
-> (env -> JobHandle m -> input -> IO (Either BackendInternalError output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob newJobHandle getenv jobkind f input = do
je <- getJobEnv
env <- getenv
let postCallback m = forM_ (input ^. SJ.job_callback) $ \url -> do
C.runClientM (SJ.clientMCallback m)
(C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
pushLog logF w = do
postCallback (SJ.mkChanEvent w)
logF w
f' jId inp logF = do
catch (do
r <- f env (newJobHandle jId (liftIO . pushLog logF . Seq.singleton)) inp
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> pure a)
(\e -> do
-- We don't want jobs to fail silently
Protolude.putText $ Protolude.show (e :: SomeException)
_ <- panicTrace $ Protolude.show (e :: SomeException)
throwIO e)
jid <- queueJob jobkind (input ^. SJ.job_input) f'
pure (SJ.JobStatus jid [] SJ.IsPending Nothing)
pollJob
:: MonadJob m t (Seq event) output
=> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
pollJob limit offset jid je = do
(logs, status, merr) <- case jTask je of
QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
<*> pure SJ.IsRunning
<*> pure Nothing
DoneJ ls r ->
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
in pure (ls, st, me)
-- /NOTE/: We need to be careful with the ordering of the logs here:
-- we want to return the logs ordered from the newest to the oldest,
-- because the API will use 'limit' to show only the newest ones,
-- taking 'limit' of them from the front of the list.
--
-- Due to the fact we do not force any 'Ord' constraint on an 'event' type,
-- and it would be inefficient to reverse the list here, it's important
-- that the concrete implementation of 'rjGetLog' returns the logs in the
-- correct order.
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
waitJob
:: (MonadError e m, MonadJob m t (Seq event) output)
=> (JobError -> e)
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobOutput output)
waitJob joberr jid je = do
r <- case jTask je of
QueuedJ _qj -> do
m <- getJobsMap
erj <- waitTilRunning
case erj of
Left res -> pure res
Right rj -> do
(res, _logs) <- liftIO (waitJobDone jid rj m)
pure res
RunningJ rj -> do
m <- getJobsMap
(res, _logs) <- liftIO (waitJobDone jid rj m)
pure res
DoneJ _ls res -> pure res
either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
where waitTilRunning =
findJob jid >>= \mjob -> case mjob of
Nothing -> error "impossible"
Just je' -> case jTask je' of
QueuedJ _qj -> do
liftIO $ threadDelay 50000 -- wait 50ms
waitTilRunning
RunningJ rj -> pure (Right rj)
DoneJ _ls res -> pure (Left res)
killJob
:: (Ord t, MonadJob m t (Seq event) output)
=> t
-> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
killJob t limit offset jid je = do
(logs, status, merr) <- case jTask je of
QueuedJ _ -> do
removeJob True t jid
pure (mempty, SJ.IsKilled, Nothing)
RunningJ rj -> do
liftIO $ cancel (rjAsync rj)
lgs <- liftIO (rjGetLog rj)
removeJob False t jid
pure (lgs, SJ.IsKilled, Nothing)
DoneJ lgs r -> do
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
removeJob False t jid
pure (lgs, st, me)
-- /NOTE/: Same proviso as in 'pollJob' applies here.
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
module Gargantext.Utils.Jobs.Map (
-- * Types
JobMap(..)
, JobEntry(..)
, J(..)
, QueuedJob(..)
, RunningJob(..)
, LoggerM
, Logger
-- * Functions
, newJobMap
, lookupJob
, gcThread
, addJobEntry
, deleteJob
, runJob
, waitJobDone
, runJ
, waitJ
, pollJ
, killJ
) where
import Control.Concurrent
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM
import Control.Exception.Safe
import Control.Monad
import Data.Map.Strict (Map)
import Data.Time.Clock
import Prelude
import qualified Data.Map.Strict as Map
import Gargantext.Utils.Jobs.Settings
-- | (Mutable) 'Map' containing job id -> job info mapping.
newtype JobMap jid w a = JobMap
{ jobMap :: TVar (Map jid (JobEntry jid w a))
}
-- | Information associated to a job ID
data JobEntry jid w a = JobEntry
{ jID :: jid
, jTask :: J w a
, jTimeoutAfter :: Maybe UTCTime
, jRegistered :: UTCTime
, jStarted :: Maybe UTCTime
, jEnded :: Maybe UTCTime
}
-- | A job computation, which has a different representation depending on the
-- status of the job.
--
-- A queued job consists of the input to the computation and the computation.
-- A running job consists of an 'Async' as well as an action to get the current logs.
-- A done job consists of the result of the computation and the final logs.
data J w a
= QueuedJ (QueuedJob w a)
| RunningJ (RunningJob w a)
| DoneJ w (Either SomeException a)
-- | An unexecuted job is an input paired with a computation
-- to run with it. Input type is "hidden" to
-- be able to store different job types together.
data QueuedJob w r where
QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
-- | A running job points to the async computation for the job and provides a
-- function to peek at the current logs.
data RunningJob w a = RunningJob
{ rjAsync :: Async.Async a
, rjGetLog :: IO w
}
-- | Polymorphic logger over any monad @m@.
type LoggerM m w = w -> m ()
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = LoggerM IO w
newJobMap :: IO (JobMap jid w a)
newJobMap = JobMap <$> newTVarIO Map.empty
-- | Lookup a job by ID
lookupJob
:: Ord jid
=> jid
-> JobMap jid w a
-> IO (Maybe (JobEntry jid w a))
lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
-- | Ready to use GC thread
gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
gcThread js (JobMap mvar) = go
where go = do
threadDelay (jsGcPeriod js * 1000000)
now <- getCurrentTime
candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
forM_ candidateEntries $ \je -> do
mrunningjob <- atomically $ do
case jTask je of
RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
>> pure (Just rj)
_ -> pure Nothing
case mrunningjob of
Nothing -> pure ()
Just a -> killJ a
go
expired now jobentry = case jTimeoutAfter jobentry of
Just t -> now >= t
_ -> False
-- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
-- /IMPORTANT/: The new value is appended in front. The ordering is important later on
-- when consuming logs from the API (see for example 'pollJob').
jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> w <> old_w)
-- | Generating new 'JobEntry's.
addJobEntry
:: Ord jid
=> UTCTime
-> jid
-> a
-> (jid -> a -> Logger w -> IO r)
-> JobMap jid w r
-> STM (JobEntry jid w r)
addJobEntry now jid input f (JobMap mvar) = do
let je = JobEntry
{ jID = jid
, jTask = QueuedJ (QueuedJob input (f jid))
, jRegistered = now
, jTimeoutAfter = Nothing
, jStarted = Nothing
, jEnded = Nothing
}
modifyTVar' mvar (Map.insert jid je)
pure je
deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
runJob
:: (Ord jid, Monoid w)
=> jid
-> QueuedJob w a
-> JobMap jid w a
-> JobSettings
-> IO (RunningJob w a)
runJob jid qj (JobMap mvar) js = do
rj <- runJ qj
now <- getCurrentTime
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jTask = RunningJ rj
, jStarted = Just now
, jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
}
pure rj
waitJobDone
:: Ord jid
=> jid
-> RunningJob w a
-> JobMap jid w a
-> IO (Either SomeException a, w)
waitJobDone jid rj (JobMap mvar) = do
r <- waitJ rj
now <- getCurrentTime
logs <- rjGetLog rj
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jEnded = Just now, jTask = DoneJ logs r }
pure (r, logs)
-- | Turn a queued job into a running job by setting up the logging of @w@s and
-- firing up the async action.
runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
runJ (QueuedJob a f) = do
logs <- newTVarIO mempty
act <- Async.async $ f a (jobLog logs)
let readLogs = readTVarIO logs
pure (RunningJob act readLogs)
-- | Wait for a running job to return (blocking).
waitJ :: RunningJob w a -> IO (Either SomeException a)
waitJ (RunningJob act _) = Async.waitCatch act
-- | Poll a running job to see if it's done.
pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
pollJ (RunningJob act _) = Async.poll act
-- | Kill a running job by cancelling the action.
killJ :: RunningJob w a -> IO ()
killJ (RunningJob act _) = Async.cancel act
{-# LANGUAGE MultiWayIf, FunctionalDependencies, TypeFamilies, ScopedTypeVariables #-}
{-|
Module : Gargantext.Utils.Jobs.Monad
Description : Job monad
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.Utils.Jobs.Monad (
-- * Types and classes
JobEnv(..)
, NumRunners
, JobError(..)
, MonadJob(..)
JobError(..)
-- * Reporting errors to users in a friendly way
, ToHumanFriendlyError(..)
......@@ -14,110 +24,24 @@ module Gargantext.Utils.Jobs.Monad (
, MonadJobStatus(..)
-- * Functions
, newJobEnv
, defaultJobSettings
, genSecret
, getJobsSettings
, getJobsState
, getJobsMap
, getJobsQueue
, queueJob
, findJob
, checkJID
, withJob
, handleIDError
, removeJob
, markFailedNoErr
, markFailureNoErr
) where
import Control.Concurrent.STM
import Control.Exception.Safe
import Control.Monad.Except
import Control.Monad.Reader
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Proxy
import Data.Text qualified as T
import Data.Time.Clock
import Data.Void (Void)
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.State
import Network.HTTP.Client (Manager)
import Prelude
import Servant.Job.Core qualified as SJ
import Servant.Job.Types qualified as SJ
data JobEnv t w a = JobEnv
{ jeSettings :: JobSettings
, jeState :: JobsState t w a
, jeManager :: Manager
}
newJobEnv
:: (EnumBounded t, Monoid w)
=> JobSettings
-> Map t Prio
-> Manager
-> IO (JobEnv t w a)
newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
type NumRunners = Int
defaultJobSettings :: NumRunners -> SJ.SecretKey -> JobSettings
defaultJobSettings numRunners k = JobSettings
{ jsNumRunners = numRunners
, jsJobTimeout = 30 * 60 -- 30 minutes
, jsIDTimeout = 30 * 60 -- 30 minutes
, jsGcPeriod = 1 * 60 -- 1 minute
, jsSecretKey = k
, jsDebugLogs = False
}
genSecret :: IO SJ.SecretKey
genSecret = SJ.generateSecretKey
class MonadIO m => MonadJob m t w a | m -> t w a where
getJobEnv :: m (JobEnv t w a)
instance MonadIO m => MonadJob (ReaderT (JobEnv t w a) m) t w a where
getJobEnv = ask
getJobsSettings :: MonadJob m t w a => m JobSettings
getJobsSettings = jeSettings <$> getJobEnv
getJobsState :: MonadJob m t w a => m (JobsState t w a)
getJobsState = jeState <$> getJobEnv
getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
getJobsMap = jobsData <$> getJobsState
getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
getJobsQueue = jobsQ <$> getJobsState
queueJob
:: (MonadJob m t w a, Ord t)
=> t
-> i
-> (SJ.JobID 'SJ.Safe -> i -> Logger w -> IO a)
-> m (SJ.JobID 'SJ.Safe)
queueJob jobkind input f = do
js <- getJobsSettings
st <- getJobsState
liftIO (pushJob jobkind input f js st)
findJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Safe
-> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
findJob jid = do
jmap <- getJobsMap
liftIO $ lookupJob jid jmap
data JobError
=
-- | We expected to find a job tagged internall as \"job\", but we found the input @T.Text@ instead.
......@@ -129,55 +53,13 @@ data JobError
| JobException SomeException
deriving Show
checkJID
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> m (Either JobError (SJ.JobID 'SJ.Safe))
checkJID (SJ.PrivateID tn n t d) = do
now <- liftIO getCurrentTime
js <- getJobsSettings
if | tn /= "job" -> pure (Left $ InvalidIDType $ T.pack tn)
| now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> pure (Left $ IDExpired n)
| d /= SJ.macID tn (jsSecretKey js) t n -> pure (Left $ InvalidMacID $ T.pack d)
| otherwise -> pure $ Right (SJ.PrivateID tn n t d)
withJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
-> m (Either JobError (Maybe r))
withJob jid f = do
r <- checkJID jid
case r of
Left e -> pure (Left e)
Right jid' -> do
mj <- findJob jid'
case mj of
Nothing -> pure (Right Nothing)
Just j -> Right . Just <$> f jid' j
handleIDError
:: MonadError e m
=> (JobError -> e)
-> m (Either JobError a)
-> m a
handleIDError toE act = act >>= \r -> case r of
Left err -> throwError (toE err)
Right a -> pure a
removeJob
:: (Ord t, MonadJob m t w a)
=> Bool -- is it queued (and we have to remove jid from queue)
-> t
-> SJ.JobID 'SJ.Safe
-> m ()
removeJob queued t jid = do
q <- getJobsQueue
m <- getJobsMap
liftIO . atomically $ do
when queued $
deleteQueue t jid q
deleteJob jid m
-- | Polymorphic logger over any monad @m@.
type LoggerM m w = w -> m ()
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = LoggerM IO w
--
-- Tracking jobs status
......@@ -191,7 +73,6 @@ class MonadJobStatus m where
-- can decide how this will look like.
type JobHandle m :: Type
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
......
{-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.Queue where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception.Safe
import Control.Monad
import Data.Function
import Data.Maybe
import Data.Ord
import Prelude
import System.IO
import Data.List
import qualified Data.Map as Map
import qualified Data.Vector as Vector
type EnumBounded t = (Ord t, Enum t, Bounded t)
data Q a = Q [a] [a] !Int
emptyQ :: Q a
emptyQ = Q [] [] 0
singletonQ :: a -> Q a
singletonQ a = Q [a] [] 1
snocQ :: a -> Q a -> Q a
snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
normalizeQ :: Q a -> Q a
normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
normalizeQ q = q
deleteQ :: Eq a => a -> Q a -> Q a
deleteQ x (Q xs ys sz) = Q xs' ys' sz'
where (xs_num_x, xs') = go xs (0, [])
(ys_num_x, ys') = go ys (0, [])
sz' = sz - xs_num_x - ys_num_x
go [] (n, bs) = (n, reverse bs)
go (a:as) (n, bs)
| a == x = go as (n+1, bs)
| otherwise = go as (n, a:bs)
popQ :: Q a -> Maybe (a, Q a)
popQ q@(Q as bs sz) = case as of
x:xs -> Just (x, Q xs bs (sz-1))
_ -> case normalizeQ q of
Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
_ -> Nothing
sizeQ :: Q a -> Int
sizeQ (Q _ _ sz) = sz
peekQ :: Q a -> Maybe a
peekQ (Q _ _ 0) = Nothing
peekQ q = case normalizeQ q of
Q (x:_) _ _ -> Just x
_ -> Nothing
dropQ :: Q a -> Q a
dropQ (Q [] [] _) = Q [] [] 0
dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
-- | A priority is just a number. The greater, the earlier the job will get picked.
type Prio = Int
applyPrios
:: Ord t
=> [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
-- | A queue with different kinds of values, described by @t@, where each
-- kind can have a higher or lower priority than other kinds, as described
-- by the 'queuePrios' field.
data Queue t a = Queue
{ queueData :: Vector.Vector (TVar (Q a))
, queueIndices :: Map.Map t Int -- indices into queueData
, queuePrios :: Map.Map t Prio
}
-- | Default priorities for the enumeration of job types @t@: everyone at 0.
defaultPrios :: EnumBounded t => Map.Map t Prio
defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
-- | Create a new queue that'll apply the given priorities
newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
newQueue prios = do
let allTs = [ minBound .. maxBound ]
indices = Map.fromList (zip allTs [0..])
n = Map.size indices
vars <- Vector.replicateM n (newTVarIO emptyQ)
pure $ Queue vars indices prios
-- | Add a new element to the queue, with the given kind.
addQueue :: Ord t => t -> a -> Queue t a -> STM ()
addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (snocQ a)
Nothing -> error "addQueue: couldn't find queue for given job kind"
deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
Nothing -> error "deleteQueue: queue type not found?!"
-- | Dump the contents of the queue, for debugging purposes.
debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)]
debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do
readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t)
where
i t = fromJust $ Map.lookup t (queueIndices q)
debugDumpQ t (Q xs ys _) = pure $ map (\x -> (t, x)) (xs ++ reverse ys)
type Picker a = [(a, STM ())] -> STM (a, STM ())
-- | Figure out the candidates for being popped from the various queues.
-- We always look at highest priority queues first, and will pick between
-- equal priority items of different queues (candidates, elements of the
-- returned lists) by choosing the one that was queued first.
popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
popQueue picker q = atomically $ select prioLevels
where -- TODO: cache this in the 'Queue' data structure?
prioLevels :: [[(t, Prio)]]
prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
Map.toList (queuePrios q)
select :: [[(t, Prio)]] -> STM (Maybe a)
select [] = pure Nothing
select (level:levels) = do
mres <- selectLevel level
case mres of
Nothing -> select levels
Just res -> pure $ Just res
selectLevel :: [(t, Prio)] -> STM (Maybe a)
selectLevel xs = do
let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
queues = map (queueData q Vector.!) indices
go qvar = readTVar qvar >>= \qu ->
pure (peekQ qu, modifyTVar' qvar dropQ)
mtopItems <- catMaybesFst <$> traverse go queues
case mtopItems of
Nothing -> pure Nothing
Just [] -> pure Nothing
Just topItems -> do
(earliestItem, popItem) <- picker topItems
popItem
pure (Just earliestItem)
catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
catMaybesFst [] = Just []
-- | A ready-to-use runner that pops the highest priority item off the queue
-- and processes it using the given function.
queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
queueRunner picker f q = go
where go = do
mres <- popQueue picker q
case mres of
Just a -> f a `catch` exc
Nothing -> pure ()
threadDelay 5000 -- 5ms
go
exc :: SomeException -> IO ()
exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
-- | Create a queue and @n@ runner actions for it, with the given priorities
-- for the runners to apply when picking a new item.
newQueueWithRunners
:: EnumBounded t
=> Int -- ^ number of runners
-> Map.Map t Prio -- ^ priorities
-> Picker a -- ^ how to pick between equal priority items
-> (a -> IO ()) -- ^ what to do with each item
-> IO (Queue t a, [IO ()])
newQueueWithRunners n prios picker f = do
q <- newQueue prios
let runners = replicate n (queueRunner picker f q)
pure (q, runners)
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Utils.Jobs.Settings where
import Control.Lens
import Prelude
import qualified Servant.Job.Core as SJ
-- | A few control knobs for the job system.
data JobSettings = JobSettings
{ jsNumRunners :: Int
, jsJobTimeout :: Int -- in seconds. TODO: timeout per job type? Map t Int
, jsIDTimeout :: Int -- in seconds, how long a job ID is valid
, jsGcPeriod :: Int -- in seconds, how long between each GC
, jsSecretKey :: SJ.SecretKey
, jsDebugLogs :: Bool -- if 'True', enable debug logs
}
makeLensesFor [ ("jsJobTimeout", "l_jsJobTimeout")
, ("jsIDTimeout", "l_jsIDTimeout")] ''JobSettings
module Gargantext.Utils.Jobs.State where
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.Settings
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import qualified Data.List as List
import Data.Map.Strict (Map)
import Data.Maybe
import Data.Ord
import Data.Proxy
import Data.Time.Clock
import Prelude
import qualified Data.Map.Strict as Map
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
type IDGenerator = TVar Int
data JobsState t w a = JobsState
{ jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
, jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
, jobsIdGen :: IDGenerator
, jsGC :: Async ()
, jsRunners :: [Async ()]
}
nextID :: UTCTime -> JobSettings -> JobsState t w a -> STM (SJ.JobID 'SJ.Safe)
nextID now js st = do
n <- stateTVar (jobsIdGen st) $ \i -> (i, i+1)
pure $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
newJobsState
:: forall t w a.
(EnumBounded t, Monoid w)
=> JobSettings
-> Map t Prio
-> IO (JobsState t w a)
newJobsState js prios = do
jmap <- newJobMap
idgen <- newTVarIO 0
(q, runners) <- newQueueWithRunners (jsNumRunners js) prios (picker jmap) $ \jid -> do
mje <- lookupJob jid jmap
case mje of
Nothing -> pure ()
Just je -> case jTask je of
QueuedJ qj -> do
rj <- runJob jid qj jmap js
(_res, _logs) <- waitJobDone jid rj jmap
pure ()
_ -> pure ()
when (jsDebugLogs js) $ putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
gcAsync <- async $ gcThread js jmap
runnersAsyncs <- traverse async runners
pure (JobsState jmap q idgen gcAsync runnersAsyncs)
where picker
:: JobMap (SJ.JobID 'SJ.Safe) w a
-> Picker (SJ.JobID 'SJ.Safe)
picker (JobMap jmap) xs = do
jinfos <- fmap catMaybes . forM xs $ \(jid, popjid) -> do
mje <- Map.lookup jid <$> readTVar jmap
case mje of
Nothing -> pure Nothing
Just je -> pure $ Just (jid, popjid, jRegistered je)
let (jid, popjid, _) = List.minimumBy (comparing _3) jinfos
pure (jid, popjid)
_3 (_, _, c) = c
pushJob
:: Ord t
=> t
-> a
-> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> IO (SJ.JobID 'SJ.Safe)
pushJob jobkind input f js st = do
now <- getCurrentTime
atomically $ pushJobWithTime now jobkind input f js st
pushJobWithTime
:: Ord t
=> UTCTime
-> t
-> a
-> (SJ.JobID 'SJ.Safe -> a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> STM (SJ.JobID 'SJ.Safe)
pushJobWithTime now jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
jid <- nextID now js st
_je <- addJobEntry now jid input f jmap
addQueue jobkind jid jqueue
pure jid
......@@ -30,7 +30,6 @@ import Gargantext.Core.Config (GargConfig(..))
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Internal (newJob)
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad hiding (withJob)
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment