{-| Module : Gargantext.Core.Worker.Env Description : Asynchronous worker logic (environment) Copyright : (c) CNRS, 2024 License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX -} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE UndecidableInstances #-} {-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError IOException module Gargantext.Core.Worker.Env where import Control.Concurrent.STM.TVar (TVar, modifyTVar, newTVarIO, readTVarIO) import Control.Lens (prism', to, view) import Control.Lens.TH import Control.Monad.Trans.Control (MonadBaseControl) import Data.Maybe (fromJust) import Data.Pool qualified as Pool import Database.PostgreSQL.Simple qualified as PSQL import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog) import Gargantext.API.Job (RemainingSteps(..), jobLogStart, jobLogProgress, jobLogFailures, jobLogComplete, addErrorEvent, jobLogFailTotal, jobLogFailTotalWithMessage, jobLogAddMore) import Gargantext.API.Prelude (GargM) import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET import Gargantext.Core.Config (GargConfig(..), HasConfig(..), gc_logging, LogConfig) import Gargantext.Core.Config.Mail qualified as Mail import Gargantext.Core.Config.Utils (readConfig) import Gargantext.Core.Config.Types (SettingsFile(..)) import Gargantext.Core.Mail.Types (HasMail(..)) import Gargantext.Core.NLP (HasNLPServer(..), NLPServerMap, nlpServerMap) import Gargantext.Core.NodeStory (HasNodeStoryEnv(..), HasNodeStoryImmediateSaver(..), HasNodeArchiveStoryImmediateSaver(..), NodeStoryEnv, fromDBNodeStoryEnv, nse_saver_immediate, nse_archive_saver_immediate) import Gargantext.Core.Types (HasValidationError(..)) import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Database.Prelude (HasConnectionPool(..)) import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..)) import Gargantext.Database.Query.Tree.Error (HasTreeError(..)) import Gargantext.Prelude hiding (to) import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, withLoggerIO) import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle ) import GHC.IO.Exception (IOException(..), IOErrorType(OtherError)) import Prelude qualified import System.Log.FastLogger qualified as FL import Gargantext.System.Logging.Loggers data WorkerEnv = WorkerEnv { _w_env_config :: ~GargConfig , _w_env_logger :: ~(Logger (GargM WorkerEnv IOException)) -- the pool is a pool for gargantext db, not pgmq db! , _w_env_pool :: ~(Pool.Pool PSQL.Connection) , _w_env_nodeStory :: ~NodeStoryEnv , _w_env_mail :: ~Mail.MailConfig , _w_env_nlp :: ~NLPServerMap , _w_env_job_state :: ~(TVar (Maybe WorkerJobState)) } data WorkerJobState = WorkerJobState { _wjs_job_info :: JobInfo , _wjs_job_log :: JobLog } deriving (Show, Eq) withWorkerEnv :: SettingsFile -> (WorkerEnv -> IO a) -> IO a withWorkerEnv settingsFile k = do cfg <- readConfig settingsFile withLoggerIO (cfg ^. gc_logging) $ \logger -> do env <- newWorkerEnv logger cfg k env -- `finally` cleanEnv env where newWorkerEnv logger cfg = do --nodeStory_env <- fromDBNodeStoryEnv (_gc_repofilepath cfg) -- pool <- newPool $ _gc_database_config cfg let dbConfig = _gc_database_config cfg pool <- Pool.newPool $ Pool.setNumStripes (Just 1) $ Pool.defaultPoolConfig (PSQL.connect dbConfig) PSQL.close 60 4 nodeStory_env <- fromDBNodeStoryEnv pool _w_env_job_state <- newTVarIO Nothing pure $ WorkerEnv { _w_env_pool = pool , _w_env_logger = logger , _w_env_nodeStory = nodeStory_env , _w_env_config = cfg , _w_env_mail = _gc_mail_config cfg , _w_env_nlp = nlpServerMap $ _gc_nlp_config cfg , _w_env_job_state } instance HasConfig WorkerEnv where hasConfig = to _w_env_config instance HasLogger (GargM WorkerEnv IOException) where newtype instance Logger (GargM WorkerEnv IOException) = GargWorkerLogger { _GargWorkerLogger :: MonadicStdLogger FL.LogStr IO } type instance LogInitParams (GargM WorkerEnv IOException) = LogConfig type instance LogPayload (GargM WorkerEnv IOException) = FL.LogStr initLogger cfg = fmap GargWorkerLogger $ (liftIO $ monadicStdLogger cfg) destroyLogger = liftIO . _msl_destroy . _GargWorkerLogger logMsg (GargWorkerLogger ioLogger) lvl msg = liftIO $ _msl_log_msg ioLogger lvl msg logTxt (GargWorkerLogger ioLogger) lvl msg = liftIO $ _msl_log_txt ioLogger lvl msg instance HasConnectionPool WorkerEnv where connPool = to _w_env_pool instance HasMail WorkerEnv where mailSettings = to _w_env_mail instance HasNLPServer WorkerEnv where nlpServer = to _w_env_nlp instance HasNodeStoryEnv WorkerEnv where hasNodeStory = to _w_env_nodeStory instance HasNodeStoryImmediateSaver WorkerEnv where hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate instance HasNodeArchiveStoryImmediateSaver WorkerEnv where hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate instance MonadLogger (GargM WorkerEnv IOException) where getLogger = asks _w_env_logger instance CET.HasCentralExchangeNotification WorkerEnv where ce_notify m = do c <- asks (view $ to _w_env_config) liftBase $ do withLogger (c ^. gc_logging) $ \ioL -> logMsg ioL DEBUG $ "[ce_notify]: " <> show (_gc_notifications_config c) <> " :: " <> show m CE.notify c m --------- instance HasValidationError IOException where _ValidationError = prism' mkIOException (const Nothing) where mkIOException v = IOError { ioe_handle = Nothing , ioe_type = OtherError , ioe_location = "Worker job (validation)" , ioe_description = show v , ioe_errno = Nothing , ioe_filename = Nothing } instance HasTreeError IOException where _TreeError = prism' mkIOException (const Nothing) where mkIOException v = IOError { ioe_handle = Nothing , ioe_type = OtherError , ioe_location = "Worker job (tree)" , ioe_description = show v , ioe_errno = Nothing , ioe_filename = Nothing } instance HasNodeError IOException where _NodeError = prism' (Prelude.userError . show) (const Nothing) --------------- newtype WorkerMonad a = WorkerMonad { _WorkerMonad :: GargM WorkerEnv IOException a } deriving ( Functor , Applicative , Monad , MonadIO , MonadReader WorkerEnv , MonadBase IO , MonadBaseControl IO , MonadError IOException , MonadFail ) instance HasLogger WorkerMonad where newtype instance Logger WorkerMonad = WorkerMonadLogger { _WorkerMonadLogger :: MonadicStdLogger FL.LogStr IO } type instance LogInitParams WorkerMonad = LogConfig type instance LogPayload WorkerMonad = FL.LogStr initLogger cfg = fmap WorkerMonadLogger $ (liftIO $ monadicStdLogger cfg) destroyLogger = liftIO . _msl_destroy . _WorkerMonadLogger logMsg (WorkerMonadLogger ioLogger) lvl msg = liftIO $ _msl_log_msg ioLogger lvl msg logTxt (WorkerMonadLogger ioLogger) lvl msg = liftIO $ _msl_log_txt ioLogger lvl msg instance MonadLogger WorkerMonad where getLogger = do env <- ask let (GargWorkerLogger lgr) = _w_env_logger env pure $ WorkerMonadLogger lgr runWorkerMonad :: WorkerEnv -> WorkerMonad a -> IO a runWorkerMonad env m = do res <- runExceptT . flip runReaderT env $ _WorkerMonad m case res of Left e -> throwIO e Right x -> pure x data WorkerJobHandle = WorkerNoJobHandle | WorkerJobHandle { _w_job_info :: !JobInfo } deriving (Show, Eq) -- | Worker handles 1 job at a time, hence it's enough to provide -- simple progress tracking instance MonadJobStatus WorkerMonad where type JobHandle WorkerMonad = WorkerJobHandle type JobOutputType WorkerMonad = JobLog type JobEventType WorkerMonad = JobLog noJobHandle Proxy = WorkerNoJobHandle getLatestJobStatus _ = WorkerMonad (pure noJobLog) withTracer _ jh n = n jh markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n) markProgress steps jh = updateJobProgress jh (jobLogProgress steps) markFailure steps mb_msg jh = updateJobProgress jh (\latest -> case mb_msg of Nothing -> jobLogFailures steps latest Just msg -> addErrorEvent msg (jobLogFailures steps latest)) markComplete jh = updateJobProgress jh jobLogComplete markFailed mb_msg jh = updateJobProgress jh (\latest -> case mb_msg of Nothing -> jobLogFailTotal latest Just msg -> jobLogFailTotalWithMessage msg latest) addMoreSteps steps jh = updateJobProgress jh (jobLogAddMore steps) updateJobProgress :: WorkerJobHandle -> (JobLog -> JobLog) -> WorkerMonad () updateJobProgress WorkerNoJobHandle _ = pure () updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do stateTVar <- asks _w_env_job_state liftIO $ atomically $ modifyTVar stateTVar updateState state' <- liftIO $ readTVarIO stateTVar case state' of Nothing -> pure () Just wjs -> do CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs) where updateState mwjs = let initJobLog = if (_wjs_job_info <$> mwjs) == Just ji then _wjs_job_log (fromJust mwjs) else noJobLog in Just (WorkerJobState { _wjs_job_info = ji , _wjs_job_log = f initJobLog }) makeLenses ''WorkerEnv