[worker] WorkerEnv, first draft for 'NewNodeAsync', 'AddCorpusFormAsync' jobs

parent c6620db7
Pipeline #6531 failed with stages
in 11 minutes and 46 seconds
......@@ -231,6 +231,7 @@ library
Gargantext.Core.Viz.Phylo.SynchronicClustering
Gargantext.Core.Viz.Types
Gargantext.Core.Worker
Gargantext.Core.Worker.Env
Gargantext.Core.Worker.Jobs
Gargantext.Core.Worker.Jobs.Types
Gargantext.Core.Worker.TOML
......
......@@ -74,7 +74,7 @@ data CheckAuth = InvalidUser | InvalidPassword | Valid Token TreeId UserId
data AuthenticatedUser = AuthenticatedUser
{ _auth_node_id :: NodeId
, _auth_user_id :: UserId
} deriving (Generic)
} deriving (Generic, Show, Eq)
makeLenses ''AuthenticatedUser
......
......@@ -24,30 +24,31 @@ import Control.Lens hiding (elements, Empty)
import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs (..))
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Errors.Types
import Gargantext.API.Node.New.Types
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Node qualified as Named
import Gargantext.Database.Action.Flow.Types
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Node
import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (DBCmd')
import Gargantext.Database.Prelude (CmdM)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant.Server.Generic (AsServerT)
import Gargantext.API.Admin.Types (HasSettings)
------------------------------------------------------------------------
postNode :: (HasNodeError err, HasSettings env)
postNode :: (CmdM env err m, HasNodeError err, HasSettings env)
=> AuthenticatedUser
-- ^ The logged-in user
-> NodeId
-> PostNode
-> DBCmd' env err [NodeId]
postNode authenticatedUser pId (PostNode nodeName nt) = do
let userId = authenticatedUser ^. auth_user_id
mkNodeWithParent nt (Just pId) userId nodeName
-> m [NodeId]
postNode authenticatedUser pId pn = do
postNode' authenticatedUser pId pn
postNodeAsyncAPI
:: AuthenticatedUser
......@@ -56,26 +57,41 @@ postNodeAsyncAPI
-- ^ The target node
-> Named.PostNodeAsyncAPI (AsServerT (GargM Env BackendInternalError))
postNodeAsyncAPI authenticatedUser nId = Named.PostNodeAsyncAPI $ AsyncJobs $
serveJobsAPI NewNodeJob $ \jHandle p -> postNodeAsync authenticatedUser nId p jHandle
serveJobsAPI NewNodeJob $ \_jHandle p -> do
Jobs.sendJob $ Jobs.NewNodeAsync { Jobs._nna_node_id = nId
, Jobs._nna_authenticatedUser = authenticatedUser
, Jobs._nna_postNode = p }
-- postNodeAsync authenticatedUser nId p jHandle
------------------------------------------------------------------------
postNodeAsync :: (FlowCmdM env err m, MonadJobStatus m, HasSettings env)
=> AuthenticatedUser
-- ^ The logged in user
-> NodeId
-> PostNode
-> JobHandle m
-> m ()
postNodeAsync authenticatedUser nId (PostNode nodeName tn) jobHandle = do
postNode' :: (CmdM env err m, HasSettings env, HasNodeError err)
=> AuthenticatedUser
-- ^ The logged-in user
-> NodeId
-> PostNode
-> m [NodeId]
postNode' authenticatedUser pId (PostNode nodeName nt) = do
let userId = authenticatedUser ^. auth_user_id
mkNodeWithParent nt (Just pId) userId nodeName
-- printDebug "postNodeAsync" nId
markStarted 3 jobHandle
markProgress 1 jobHandle
-- _ <- threadDelay 1000
markProgress 1 jobHandle
-- postNodeAsync :: (FlowCmdM env err m, MonadJobStatus m, HasSettings env)
-- => AuthenticatedUser
-- -- ^ The logged in user
-- -> NodeId
-- -> PostNode
-- -> JobHandle m
-- -> m ()
-- postNodeAsync authenticatedUser nId (PostNode nodeName tn) jobHandle = do
let userId = authenticatedUser ^. auth_user_id
_ <- mkNodeWithParent tn (Just nId) userId nodeName
-- -- printDebug "postNodeAsync" nId
-- markStarted 3 jobHandle
-- markProgress 1 jobHandle
-- -- _ <- threadDelay 1000
-- markProgress 1 jobHandle
-- let userId = authenticatedUser ^. auth_user_id
-- _ <- mkNodeWithParent tn (Just nId) userId nodeName
markComplete jobHandle
-- markComplete jobHandle
{-|
Module : Gargantext.API.Node.New.Types
Description :
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.API.Node.New.Types (
PostNode(..)
......@@ -5,16 +15,16 @@ module Gargantext.API.Node.New.Types (
import Data.Aeson
import Data.Swagger
import Data.Text (Text)
import GHC.Generics
import Gargantext.Core.Types (NodeType (..))
import Gargantext.Prelude
import Test.QuickCheck
import Web.FormUrlEncoded
------------------------------------------------------------------------
data PostNode = PostNode { pn_name :: Text
, pn_typename :: NodeType}
deriving (Generic)
, pn_typename :: NodeType }
deriving (Generic, Eq, Show)
------------------------------------------------------------------------
-- TODO unPrefix "pn_" FromJSON, ToJSON, ToSchema, adapt frontend.
instance FromJSON PostNode
......
......@@ -31,11 +31,13 @@ import Gargantext.API.Node.Corpus.New qualified as New
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Database.Prelude (HasConfig(..))
import Gargantext.Prelude
import Gargantext.Core.Config (gc_max_docs_scrapers)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant
import Servant.Auth.Swagger ()
import Servant.Server.Generic (AsServerT)
......@@ -64,11 +66,15 @@ addCorpusWithQuery user = Named.AddWithQuery $ \cid -> AsyncJobs $
addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
serveJobsAPI AddCorpusFormJob $ \jHandle i -> do
serveJobsAPI AddCorpusFormJob $ \_jHandle i -> do
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
markStarted 3 jHandle
New.addToCorpusWithForm user cid i jHandle
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
, Jobs._acf_user = user
, Jobs._acf_cid = cid }
--addCorpusWithFile :: User -> ServerT Named.AddWithFile (GargM Env BackendInternalError)
--addCorpusWithFile user cid =
......
......@@ -31,6 +31,7 @@ import Prelude qualified
data User = UserDBId UserId | UserName Text | RootId NodeId
deriving (Show, Eq, Generic)
instance FromJSON User
instance ToJSON User
renderUser :: User -> T.Text
......
......@@ -9,6 +9,9 @@ Portability : POSIX
-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError IOException
module Gargantext.Core.Worker where
......@@ -21,11 +24,11 @@ import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Node.New (postNode')
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.TOML (WorkerDefinition(..), wdToRedisConnectInfo)
import Gargantext.Database.Prelude (CmdCommon)
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude
......@@ -37,8 +40,8 @@ import Gargantext.Prelude
-- - progress report via notifications
-- - I think there is no point to save job result, as usually there is none (we have side-effects only)
-- - replace Servant.Job to use workers instead of garg API threads
withRedisWorker :: (HasWorkerBroker RedisBroker Job, HasSettings env, CmdCommon env)
=> env
withRedisWorker :: (HasWorkerBroker RedisBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> (Async () -> Worker.State RedisBroker Job -> IO ())
-> IO ()
......@@ -60,8 +63,8 @@ withRedisWorker env wd@(WorkerDefinition { .. }) cb = do
withAsync (Worker.run state') (\a -> cb a state')
performAction :: (HasWorkerBroker b Job, HasSettings env, CmdCommon env)
=> env
performAction :: (HasWorkerBroker b Job)
=> WorkerEnv
-> Worker.State b Job
-> BrokerMessage b (Worker.Job Job)
-> IO ()
......@@ -69,10 +72,16 @@ performAction env _state bm = do
let job' = toA $ getMessage bm
case Worker.job job' of
Ping -> putStrLn ("ping" :: Text)
ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> flip runReaderT env $ do
AddCorpusFormAsync { } -> runWorkerMonad env $ do
liftBase $ putStrLn ("add corpus form" :: Text)
ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> runWorkerMonad env $ do
liftBase $ putStrLn ("forgot password: " <> email)
us <- getUsersWithEmail (T.toLower email)
case us of
[u] -> forgotUserPassword u
_ -> pure ()
NewNodeAsync { .. } -> runWorkerMonad env $ do
liftBase $ putStrLn ("new node async " :: Text)
void $ postNode' _nna_authenticatedUser _nna_node_id _nna_postNode
return ()
GargJob { _gj_garg_job } -> putStrLn ("Garg job: " <> show _gj_garg_job :: Text)
{-|
Module : Gargantext.Core.Worker.Env
Description : Asynchronous worker logic (environment)
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-} -- orphan HasNodeError IOException
module Gargantext.Core.Worker.Env where
import Control.Lens (prism', to)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Pool (Pool)
import Data.Text qualified as T
import Database.PostgreSQL.Simple (Connection)
import Gargantext.API.Admin.EnvTypes (Mode(Dev), modeToLoggingLevels)
import Gargantext.API.Admin.Settings ( devJwkFile, devSettings, newPool, SettingsFile (..), IniFile (..) )
import Gargantext.API.Admin.Types (HasSettings(..), Settings(..))
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.Config (GargConfig(..), readConfig)
import Gargantext.Core.Config.Mail qualified as Mail
import Gargantext.Core.Config.NLP qualified as NLP
import Gargantext.Core.Mail.Types (HasMail(..))
import Gargantext.Core.NLP (HasNLPServer(..), NLPServerMap, nlpServerMap)
import Gargantext.Core.NodeStory (NodeStoryEnv, fromDBNodeStoryEnv)
import Gargantext.Database.Prelude (HasConfig(..), HasConnectionPool(..), databaseParameters)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, withLoggerHoisted)
import Prelude qualified
import System.Log.FastLogger qualified as FL
data WorkerEnv = WorkerEnv
{ _w_env_settings :: !Settings
, _w_env_config :: !GargConfig
, _w_env_logger :: !(Logger (GargM WorkerEnv IOException))
, _w_env_pool :: !(Pool Connection)
, _w_env_nodeStory :: !NodeStoryEnv
, _w_env_mail :: !Mail.MailConfig
, _w_env_nlp :: !NLPServerMap
}
withWorkerEnv :: IniFile -> SettingsFile -> (WorkerEnv -> IO a) -> IO a
withWorkerEnv (IniFile iniPath) settingsFile k = withLoggerHoisted Dev $ \logger -> do
env <- newWorkerEnv logger
k env -- `finally` cleanEnv env
where
newWorkerEnv logger = do
cfg <- readConfig iniPath
dbParam <- databaseParameters iniPath
--nodeStory_env <- fromDBNodeStoryEnv (_gc_repofilepath cfg)
pool <- newPool dbParam
nodeStory_env <- fromDBNodeStoryEnv pool
setts <- devSettings devJwkFile settingsFile
mail <- Mail.readConfig iniPath
nlp_config <- NLP.readConfig iniPath
pure $ WorkerEnv
{ _w_env_pool = pool
, _w_env_logger = logger
, _w_env_nodeStory = nodeStory_env
, _w_env_settings = setts
, _w_env_config = cfg
, _w_env_mail = mail
, _w_env_nlp = nlpServerMap nlp_config
}
instance HasConfig WorkerEnv where
hasConfig = to _w_env_config
instance HasSettings WorkerEnv where
settings = to _w_env_settings
instance HasLogger (GargM WorkerEnv IOException) where
data instance Logger (GargM WorkerEnv IOException) =
GargWorkerLogger {
w_logger_mode :: Mode
, w_logger_set :: FL.LoggerSet
}
type instance LogInitParams (GargM WorkerEnv IOException) = Mode
type instance LogPayload (GargM WorkerEnv IOException) = FL.LogStr
initLogger = \mode -> do
w_logger_set <- liftIO $ FL.newStderrLoggerSet FL.defaultBufSize
pure $ GargWorkerLogger mode w_logger_set
destroyLogger = \GargWorkerLogger{..} -> liftIO $ FL.rmLoggerSet w_logger_set
logMsg = \(GargWorkerLogger mode logger_set) lvl msg -> do
let pfx = "[" <> show lvl <> "] " :: Text
when (lvl `elem` (modeToLoggingLevels mode)) $
liftIO $ FL.pushLogStrLn logger_set $ FL.toLogStr pfx <> msg
logTxt lgr lvl msg = logMsg lgr lvl (FL.toLogStr $ T.unpack msg)
instance HasConnectionPool WorkerEnv where
connPool = to _w_env_pool
instance HasMail WorkerEnv where
mailSettings = to _w_env_mail
instance HasNLPServer WorkerEnv where
nlpServer = to _w_env_nlp
---------------
newtype WorkerMonad a =
WorkerMonad { _WorkerMonad :: GargM WorkerEnv IOException a }
deriving ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader WorkerEnv
, MonadBase IO
, MonadBaseControl IO
, MonadError IOException
, MonadFail )
instance HasNodeError IOException where
_NodeError = prism' (Prelude.userError . show) (const Nothing)
runWorkerMonad :: WorkerEnv -> WorkerMonad a -> IO a
runWorkerMonad env m = do
res <- runExceptT . flip runReaderT env $ _WorkerMonad m
case res of
Left e -> throwIO e
Right x -> pure x
{-|
Module : Gargantext.Core.Worker.Jobs.Types
Description : Worker job definitions
Copyright : (c) CNRS, 2024
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
......@@ -15,14 +15,24 @@ module Gargantext.Core.Worker.Jobs.Types where
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams)
import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types (NewWithForm)
import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId)
import Gargantext.Prelude
data Job =
Ping
| AddCorpusFormAsync { _acf_args :: NewWithForm
, _acf_user :: User
, _acf_cid :: CorpusId }
| ForgotPasswordAsync { _fpa_args :: ForgotPasswordAsyncParams }
| NewNodeAsync { _nna_node_id :: NodeId
, _nna_authenticatedUser :: AuthenticatedUser
, _nna_postNode :: PostNode }
| GargJob { _gj_garg_job :: GargJob }
deriving (Show, Eq)
instance FromJSON Job where
......@@ -30,16 +40,34 @@ instance FromJSON Job where
type_ <- o .: "type"
case type_ of
"Ping" -> return Ping
"AddCorpusFormAsync" -> do
_acf_args <- o .: "args"
_acf_user <- o .: "user"
_acf_cid <- o .: "cid"
return $ AddCorpusFormAsync { .. }
"ForgotPasswordAsync" -> do
_fpa_args <- o .: "args"
return $ ForgotPasswordAsync { _fpa_args }
"NewNodeAsync" -> do
_nna_node_id <- o .: "node_id"
_nna_authenticatedUser <- o .: "authenticated_user"
_nna_postNode <- o .: "post_node"
return $ NewNodeAsync { .. }
"GargJob" -> do
_gj_garg_job <- o .: "garg_job"
return $ GargJob { _gj_garg_job }
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ]
toJSON (ForgotPasswordAsync { _fpa_args }) = object [ ("type" .= ("ForgotPasswordAsync" :: Text))
, ("args" .= _fpa_args) ]
toJSON (GargJob { _gj_garg_job }) = object [ ("type" .= ("GargJob" :: Text))
, ("garg_job" .= _gj_garg_job) ]
toJSON (AddCorpusFormAsync { .. }) = object [ ("type" .= ("AddCorpusFormJob" :: Text))
, ("args" .= _acf_args)
, ("user" .= _acf_user)
, ("cid" .= _acf_cid) ]
toJSON (ForgotPasswordAsync { .. }) = object [ ("type" .= ("ForgotPasswordAsync" :: Text))
, ("args" .= _fpa_args) ]
toJSON (NewNodeAsync { .. }) = object [ ("type" .= ("NewNodeAsync" :: Text))
, ("node_id" .= _nna_node_id)
, ("authenticated_user" .= _nna_authenticatedUser)
, ("post_node" .= _nna_postNode) ]
toJSON (GargJob { .. }) = object [ ("type" .= ("GargJob" :: Text))
, ("garg_job" .= _gj_garg_job) ]
{-|
Module : Gargantext.Utils.Jobs.Internal
Description : Servant Jobs
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.Utils.Jobs.Internal (
serveJobsAPI
-- * Internals for testing
......
......@@ -32,12 +32,24 @@ instance Arbitrary EnvTypes.GargJob where
instance Arbitrary Job where
arbitrary = oneof [ pure Ping
, addCorpusFormAsyncGen
, forgotPasswordAsyncGen
, newNodeAsyncGen
, gargJobGen ]
where
forgotPasswordAsyncGen = do
email <- arbitrary
return $ ForgotPasswordAsync (ForgotPasswordAsyncParams { email })
addCorpusFormAsyncGen = do
_acf_args <- arbitrary
_acf_user <- arbitrary
_acf_cid <- arbitrary
return $ AddCorpusFormAsync { .. }
newNodeAsyncGen = do
_nna_node_id <- arbitrary
_nna_authenticatedUser <- arbitrary
_nna_postNode <- arbitrary
return $ NewNodeAsync { .. }
gargJobGen = do
_gj_garg_job <- arbitrary
return $ GargJob { _gj_garg_job }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment