[worker] migrate all jobs to new-style worker jobs

Also, removed some old job stuff
parent ec6d4e91
Pipeline #6922 failed with stages
in 26 minutes and 12 seconds
......@@ -23,8 +23,8 @@ import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Worker (WorkerDefinition(..), WorkerSettings(..), findDefinitionByName)
import Gargantext.Core.Worker (withPGMQWorkerCtrlC, withPGMQWorkerSingleCtrlC, initWorkerState)
import Gargantext.Core.Worker.Env (withWorkerEnv)
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
-- import Gargantext.Core.Worker.Jobs (sendJob)
-- import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
import Gargantext.Prelude
import Options.Applicative
import Prelude qualified
......@@ -63,7 +63,7 @@ workerCLI (CLIW_run (WorkerArgs { .. })) = do
wait a
else
withPGMQWorkerCtrlC env wd $ \a _state -> do
_ <- runReaderT (sendJob Ping) env
-- _ <- runReaderT (sendJob Ping) env
wait a
workerCLI (CLIW_stats (WorkerStatsArgs { .. })) = do
putStrLn ("worker toml: " <> _SettingsFile ws_toml)
......
......@@ -70,9 +70,9 @@ url = "http://localhost:6800"
[external.frames]
# FRAMES (i.e. iframe sources used in various places on the frontend)
#write_url = "http://write.frame.gargantext.org/"
#write_url = "http://write.frame.gargantext.org"
write_url = URL_TO_CHANGE
#calc_url = "http://calc.frame.gargantext.org/"
#calc_url = "http://calc.frame.gargantext.org"
calc_url = URL_TO_CHANGE
visio_url = URL_TO_CHANGE
......
......@@ -277,7 +277,6 @@ library
Gargantext.MicroServices.ReverseProxy
Gargantext.System.Logging
Gargantext.Utils.Dict
Gargantext.Utils.Jobs
Gargantext.Utils.Jobs.Error
Gargantext.Utils.Jobs.Internal
Gargantext.Utils.Jobs.Map
......
......@@ -57,7 +57,6 @@ import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal (pollJob)
import Gargantext.Utils.Jobs.Map (LoggerM, J(..), jTask, rjGetLog)
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Network.HTTP.Client (Manager)
......@@ -66,7 +65,6 @@ import Servant.Client (BaseUrl)
import Servant.Job.Async (HasJobEnv(..), Job)
import Servant.Job.Async qualified as SJ
import Servant.Job.Core qualified
import Servant.Job.Types qualified as SJ
import System.Log.FastLogger qualified as FL
data Mode = Dev | Mock | Prod
......@@ -104,6 +102,7 @@ instance HasLogger (GargM Env BackendInternalError) where
logTxt lgr lvl msg = logMsg lgr lvl (FL.toLogStr $ T.unpack msg)
-- {-# DEPRECATED GargJob "GargJob is deprecated, use Worker.Jobs.Types.Job instead" #-}
data GargJob
= AddAnnuaireFormJob
| AddContactJob
......@@ -248,20 +247,21 @@ mkJobHandle jId = JobHandle jId
-- | Updates the status of a 'JobHandle' by using the input 'updateJobStatus' function.
updateJobProgress :: ConcreteJobHandle err -> (JobLog -> JobLog) -> GargM Env err ()
updateJobProgress ConcreteNullHandle _ = pure ()
updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do
jobLog <- Jobs.getLatestJobStatus hdl
let jobLogNew = updateJobStatus jobLog
logStatus jobLogNew
mJb <- Jobs.findJob jId
case mJb of
Nothing -> pure ()
Just je -> do
-- We use the same endpoint as the one for polling jobs via
-- API. This way we can send the job status directly in the
-- notification
j <- pollJob (Just $ SJ.Limit 1) Nothing jId je
CET.ce_notify $ CET.UpdateJobProgress j
updateJobProgress _ _ = pure ()
-- updateJobProgress ConcreteNullHandle _ = pure ()
-- updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do
-- jobLog <- Jobs.getLatestJobStatus hdl
-- let jobLogNew = updateJobStatus jobLog
-- logStatus jobLogNew
-- mJb <- Jobs.findJob jId
-- case mJb of
-- Nothing -> pure ()
-- Just je -> do
-- -- We use the same endpoint as the one for polling jobs via
-- -- API. This way we can send the job status directly in the
-- -- notification
-- j <- pollJob (Just $ SJ.Limit 1) Nothing jId je
-- CET.ce_notify $ CET.UpdateJobProgress j
instance Jobs.MonadJobStatus (GargM Env err) where
......
......@@ -22,6 +22,7 @@ import Codec.Serialise (Serialise(), serialise)
import Control.Lens
import Control.Monad.Reader
import Data.ByteString.Lazy qualified as L
import Data.Map.Strict qualified as Map
import Data.Pool (Pool)
import Data.Pool qualified as Pool
import Database.PostgreSQL.Simple (Connection, connect, close, ConnectInfo)
......@@ -36,9 +37,7 @@ import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.NodeStory
import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs qualified as Jobs
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Gargantext.Utils.Jobs.Queue qualified as Jobs
import Gargantext.Utils.Jobs.Settings qualified as Jobs
import Network.HTTP.Client.TLS (newTlsManager)
import Servant.Client (parseBaseUrl)
......@@ -151,16 +150,17 @@ readRepoEnv repoDir = do
--}
newEnv :: Logger (GargM Env BackendInternalError) -> PortNumber -> SettingsFile -> IO Env
newEnv logger port settingsFile@(SettingsFile sf) = do
newEnv logger port settingsFile = do
!manager_env <- newTlsManager
!config_env <- readConfig settingsFile <&> (gc_frontend_config . fc_appPort) .~ port -- TODO read from 'file'
when (port /= config_env ^. gc_frontend_config . fc_appPort) $
panicTrace "TODO: conflicting settings of port"
prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs")
let prios' = Jobs.applyPrios prios Jobs.defaultPrios
putStrLn ("Overrides: " <> show prios :: Text)
putStrLn ("New priorities: " <> show prios' :: Text)
-- prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs")
-- let prios' = Jobs.applyPrios prios Jobs.defaultPrios
-- putStrLn ("Overrides: " <> show prios :: Text)
-- putStrLn ("New priorities: " <> show prios' :: Text)
let prios = Map.empty
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
!pool <- newPool $ _gc_database_config config_env
!nodeStory_env <- fromDBNodeStoryEnv pool
......@@ -170,7 +170,7 @@ newEnv logger port settingsFile@(SettingsFile sf) = do
let jobs_settings = (Jobs.defaultJobSettings 1 secret)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!jobs_env <- Jobs.newJobEnv jobs_settings prios manager_env
!central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env)
!dispatcher <- D.newDispatcher (_gc_notifications_config config_env)
......
......@@ -11,7 +11,6 @@ Portability : POSIX
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE ViewPatterns #-}
module Gargantext.API.Ngrams.List
......@@ -27,18 +26,19 @@ import Data.Set qualified as Set
import Data.Text (concat, pack, splitOn)
import Data.Vector (Vector)
import Data.Vector qualified as Vec
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Errors.Types (BackendInternalError)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types (BackendInternalError(InternalServerError))
import Gargantext.API.Ngrams (setListNgrams)
import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Prelude (getNgramsList)
import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude (GargM, serverError, HasServerError)
import Gargantext.API.Routes.Named.List qualified as Named
import Gargantext.API.Worker (serveWorkerAPI, serveWorkerAPIEJob)
import Gargantext.Core.NodeStory.Types ( HasNodeStory )
import Gargantext.Core.Text.Ngrams (Ngrams, NgramsType(NgramsTerms))
import Gargantext.Core.Types.Main (ListType(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (reIndexWith)
import Gargantext.Database.Admin.Types.Node ( NodeId(_NodeId), ListId )
import Gargantext.Database.Query.Table.Node (getNode)
......@@ -46,7 +46,7 @@ import Gargantext.Database.Schema.Ngrams ( text2ngrams, NgramsId )
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude hiding (concat, toList)
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Prelude qualified
import Protolude qualified as P
import Servant
......@@ -107,9 +107,11 @@ getTsv lId = do
------------------------------------------------------------------------
jsonPostAsync :: Named.JSONAPI (AsServerT (GargM Env BackendInternalError))
jsonPostAsync = Named.JSONAPI $ \lId -> AsyncJobs $
serveJobsAPI UpdateNgramsListJobJSON $ \jHandle f ->
postAsyncJSON lId (_wjf_data f) jHandle
jsonPostAsync = Named.JSONAPI {
updateListJSONEp = \lId -> serveWorkerAPI $ \p ->
Jobs.JSONPost { _jp_list_id = lId
, _jp_ngrams_list = _wjf_data p }
}
------------------------------------------------------------------------
postAsyncJSON :: (HasNodeStory env err m, MonadJobStatus m)
......@@ -148,11 +150,14 @@ tsvAPI = tsvPostAsync
------------------------------------------------------------------------
tsvPostAsync :: Named.TSVAPI (AsServerT (GargM Env BackendInternalError))
tsvPostAsync = Named.TSVAPI $ \lId -> AsyncJobs $
serveJobsAPI UpdateNgramsListJobTSV $ \jHandle f -> do
case ngramsListFromTSVData (_wtf_data f) of
Left err -> serverError $ err500 { errReasonPhrase = err }
Right ngramsList -> postAsyncJSON lId ngramsList jHandle
tsvPostAsync =
Named.TSVAPI {
updateListTSVEp = \lId -> serveWorkerAPIEJob $ \p ->
case ngramsListFromTSVData (_wtf_data p) of
Left err -> Left $ InternalServerError $ err500 { errReasonPhrase = err }
Right ngramsList -> Right $ Jobs.JSONPost { _jp_list_id = lId
, _jp_ngrams_list = ngramsList }
}
-- | Tries converting a text file into an 'NgramList', so that we can reuse the
-- existing JSON endpoint for the TSV upload.
......
......@@ -12,7 +12,6 @@ Portability : POSIX
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Node.Contact
......@@ -20,16 +19,17 @@ module Gargantext.API.Node.Contact
import Conduit ( yield )
import Gargantext.API.Admin.Auth.Types ( AuthenticatedUser(AuthenticatedUser) )
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Node ( nodeNodeAPI )
import Gargantext.API.Node.Contact.Types
import Gargantext.API.Prelude (GargM, simuLogs)
import Gargantext.API.Routes.Named.Contact qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (flow)
import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact, hyperdataContact )
......@@ -37,21 +37,25 @@ import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire(..)
import Gargantext.Database.Admin.Types.Node ( CorpusId, NodeId )
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(MkCorpusUserNormalCorpusIds))
import Gargantext.Prelude (($), Maybe(..))
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant
import Servant.Server.Generic (AsServerT)
contactAPI :: AuthenticatedUser -> CorpusId -> Named.ContactAPI (AsServerT (GargM Env BackendInternalError))
contactAPI authUser@(AuthenticatedUser userNodeId _userUserId) cid = Named.ContactAPI
{ contactAsyncAPI = api_async (RootId userNodeId) cid
{ contactAsyncAPI = apiAsync (RootId userNodeId) cid
, getContactEp = nodeNodeAPI (Proxy :: Proxy HyperdataContact) authUser cid
}
----------------------------------------------------------------------
api_async :: User -> NodeId -> Named.ContactAsyncAPI (AsServerT (GargM Env BackendInternalError))
api_async u nId = Named.ContactAsyncAPI $ AsyncJobs $
serveJobsAPI AddContactJob $ \jHandle p ->
addContact u nId p jHandle
apiAsync :: User -> NodeId -> Named.ContactAsyncAPI (AsServerT (GargM Env BackendInternalError))
apiAsync u nId = Named.ContactAsyncAPI {
addContactAsyncEp = serveWorkerAPI $ \p ->
Jobs.AddContact { _ac_args = p
, _ac_node_id = nId
, _ac_user = u }
}
-- addContact u nId p jHandle
addContact :: (FlowCmdM env err m, MonadJobStatus m)
=> User
......
{-|
Module : Gargantext.API.Node.Contact.Types
Description : Contact API types
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.API.Node.Contact.Types where
import Data.Aeson
import Data.Swagger
import Data.Text (Text)
import GHC.Generics
import Gargantext.Prelude
import Gargantext.Utils.Aeson qualified as GUA
import Test.QuickCheck
......@@ -14,7 +24,7 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
, lastname :: !Text
-- TODO add others fields
}
deriving (Generic)
deriving (Generic, Show, Eq)
------------------------------------------------------------------------
-- TODO unPrefix "pn_" FromJSON, ToJSON, ToSchema, adapt frontend.
......
......@@ -23,7 +23,7 @@ import Gargantext.Core.Utils.Prefix (unPrefixSwagger)
import Gargantext.Database.Action.Flow.Types (FlowCmdM) -- flowAnnuaire
import Gargantext.Database.Admin.Types.Node (AnnuaireId)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant
import Servant.Job.Utils (jsonOptions)
import Web.FormUrlEncoded (FromForm)
......
......@@ -26,7 +26,7 @@ import Gargantext.Database.Query.Table.Node.Error
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs
import Gargantext.Utils.Jobs.Monad (MonadJobStatus)
-- | Updates the 'HyperdataCorpus' with the input 'Lang'.
......
......@@ -11,23 +11,22 @@ Portability : POSIX
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Node.DocumentUpload where
import Control.Lens (view)
import Data.Text qualified as T
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Node.DocumentUpload.Types
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.Document qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core (Lang(..))
import Gargantext.Core.NLP (nlpServerGet)
import Gargantext.Core.Text.Corpus.Parsers.Date (mDateSplit)
import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (addDocumentsToHyperCorpus)
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataCorpus )
......@@ -35,14 +34,16 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..)
import Gargantext.Database.Admin.Types.Node ( DocId, NodeId, NodeType(NodeCorpus) )
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant.Server.Generic (AsServerT)
api :: NodeId -> Named.DocumentUploadAPI (AsServerT (GargM Env BackendInternalError))
api nId = Named.DocumentUploadAPI $ AsyncJobs $
serveJobsAPI UploadDocumentJob $ \jHandle q -> do
documentUploadAsync nId q jHandle
api nId = Named.DocumentUploadAPI {
uploadDocAsyncEp = serveWorkerAPI $ \p ->
Jobs.UploadDocument { _ud_args = p
, _ud_node_id = nId }
}
documentUploadAsync :: (FlowCmdM env err m, MonadJobStatus m)
=> NodeId
......
{-|
Module : Gargantext.API.Node.DocumentUpload.Types
Description : Document upload types
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.API.Node.DocumentUpload.Types where
import Data.Aeson ( Options(..), genericParseJSON, defaultOptions, genericToJSON, SumEncoding(..) )
......@@ -15,7 +27,7 @@ data DocumentUpload = DocumentUpload
, _du_date :: T.Text
, _du_language :: T.Text
}
deriving (Generic)
deriving (Generic, Show, Eq)
$(makeLenses ''DocumentUpload)
......
......@@ -20,19 +20,20 @@ import Conduit ( yieldMany )
import Data.List qualified as List
import Data.Text qualified as T
import Gargantext.API.Admin.Auth.Types ( AuthenticatedUser, auth_node_id, auth_user_id )
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Ngrams (commitStatePatch, Versioned(..))
import Gargantext.API.Node.DocumentsFromWriteNodes.Types
import Gargantext.API.Prelude (GargM)
import Gargantext.API.Routes.Named.Document qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core (Lang(..))
import Gargantext.Core.NodeStory (HasNodeStoryImmediateSaver, HasNodeArchiveStoryImmediateSaver, currentVersion)
import Gargantext.Core.Text.Corpus.Parsers.Date (split')
import Gargantext.Core.Text.Corpus.Parsers.FrameWrite
import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (flowDataText, DataText(..))
import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..))
......@@ -42,7 +43,7 @@ import Gargantext.Database.Query.Table.Node (getChildrenByType, getClosestParent
import Gargantext.Database.Schema.Node (node_hyperdata, node_name, node_date)
import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Gargantext.Utils.Jobs.Error
import Servant.Server.Generic (AsServerT)
......@@ -50,9 +51,13 @@ api :: AuthenticatedUser
-- ^ The logged-in user
-> NodeId
-> Named.DocumentsFromWriteNodesAPI (AsServerT (GargM Env BackendInternalError))
api authenticatedUser nId = Named.DocumentsFromWriteNodesAPI $ AsyncJobs $
serveJobsAPI DocumentFromWriteNodeJob $ \jHandle p ->
documentsFromWriteNodes authenticatedUser nId p jHandle
api authenticatedUser nId =
Named.DocumentsFromWriteNodesAPI {
docFromWriteNodesEp = serveWorkerAPI $ \p ->
Jobs.DocumentsFromWriteNodes { _dfwn_args = p
, _dfwn_authenticatedUser = authenticatedUser
, _dfwn_node_id = nId }
}
documentsFromWriteNodes :: ( FlowCmdM env err m
, MonadJobStatus m
......
{-|
Module : Gargantext.API.Node.DocumentsFromWriteNodes.Types
Description : Documents from write nodes
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.API.Node.DocumentsFromWriteNodes.Types where
......@@ -14,7 +24,7 @@ data Params = Params
, lang :: Lang
, selection :: FlowSocialListWith
}
deriving (Generic, Show)
deriving (Generic, Show, Eq)
instance FromJSON Params where
parseJSON = genericParseJSON defaultOptions
instance ToJSON Params where
......
......@@ -11,7 +11,6 @@ Portability : POSIX
{-# OPTIONS_GHC -fno-warn-unused-matches #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE IncoherentInstances #-}
......@@ -20,13 +19,14 @@ module Gargantext.API.Node.File where
import Data.MIME.Types qualified as DMT
import Data.Text qualified as T
import Gargantext.API.Admin.Auth.Types ( AuthenticatedUser, auth_user_id )
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Node.File.Types
import Gargantext.API.Node.Types ( NewWithFile(NewWithFile) )
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.File qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Action.Node (mkNodeWithParent)
import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) )
......@@ -36,10 +36,11 @@ import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant
import Servant.Server.Generic (AsServerT)
fileApi :: (FlowCmdM env err m)
=> NodeId
-> m (Headers '[Servant.Header "Content-Type" Text] BSResponse)
......@@ -77,9 +78,13 @@ fileAsyncApi :: AuthenticatedUser
-- ^ The logged-in user
-> NodeId
-> Named.FileAsyncAPI (AsServerT (GargM Env BackendInternalError))
fileAsyncApi authenticatedUser nId = Named.FileAsyncAPI $ AsyncJobs $
serveJobsAPI AddFileJob $ \jHandle i ->
addWithFile authenticatedUser nId i jHandle
fileAsyncApi authenticatedUser nId =
Named.FileAsyncAPI {
addFileAsyncEp = serveWorkerAPI $ \i ->
Jobs.AddWithFile { _awf_args = i
, _awf_node_id = nId
, _awf_authenticatedUser = authenticatedUser }
}
addWithFile :: (FlowCmdM env err m, MonadJobStatus m)
......
......@@ -18,8 +18,7 @@ import Data.ByteString.Lazy qualified as BSL
import Data.ByteString.UTF8 qualified as BSU8
import Data.Text qualified as T
import Gargantext.API.Admin.Auth.Types ( auth_node_id, AuthenticatedUser )
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.Corpus.New.Types (FileFormat(..), FileType(..))
......@@ -27,25 +26,31 @@ import Gargantext.API.Node.FrameCalcUpload.Types
import Gargantext.API.Node.Types (NewWithForm(..))
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.FrameCalc qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Config (HasConfig)
import Gargantext.Core.NodeStory.Types ( HasNodeArchiveStoryImmediateSaver )
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Admin.Types.Hyperdata.Frame ( HyperdataFrame(..) )
import Gargantext.Database.Admin.Types.Node ( NodeId, NodeType(NodeCorpus) )
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..), markFailureNoErr)
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..), markFailureNoErr)
import Network.HTTP.Client (newManager, httpLbs, parseRequest, responseBody)
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Servant.Server.Generic (AsServerT)
api :: AuthenticatedUser -> NodeId -> Named.FrameCalcAPI (AsServerT (GargM Env BackendInternalError))
api authenticatedUser nId = Named.FrameCalcAPI $ AsyncJobs $
serveJobsAPI UploadFrameCalcJob $ \jHandle p ->
frameCalcUploadAsync authenticatedUser nId p jHandle
api authenticatedUser nId =
Named.FrameCalcAPI {
frameCalcUploadEp = serveWorkerAPI $ \p ->
Jobs.FrameCalcUpload { _fca_args = p
, _fca_authenticatedUser = authenticatedUser
, _fca_node_id = nId }
}
......
{-|
Module : Gargantext.API.Node.FrameCalcUpload
Description : Frame calc upload types
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.API.Node.FrameCalcUpload.Types where
......@@ -11,7 +22,7 @@ data FrameCalcUpload = FrameCalcUpload {
_wf_lang :: !(Maybe Lang)
, _wf_selection :: !FlowSocialListWith
}
deriving (Generic)
deriving (Generic, Show, Eq)
instance FromForm FrameCalcUpload
instance FromJSON FrameCalcUpload
......
......@@ -19,23 +19,21 @@ module Gargantext.API.Node.New
import Control.Lens hiding (elements, Empty)
import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs (..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types
import Gargantext.API.Node.New.Types
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Node qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Mail.Types (HasMail)
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CE
import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Node
import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (CmdM, DBCmd')
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant.Server.Generic (AsServerT)
------------------------------------------------------------------------
......@@ -59,12 +57,13 @@ postNodeAsyncAPI
-> NodeId
-- ^ The target node
-> Named.PostNodeAsyncAPI (AsServerT (GargM Env BackendInternalError))
postNodeAsyncAPI authenticatedUser nId = Named.PostNodeAsyncAPI $ AsyncJobs $
serveJobsAPI NewNodeJob $ \_jHandle p -> do
void $ Jobs.sendJob $ Jobs.NewNodeAsync { Jobs._nna_node_id = nId
, Jobs._nna_authenticatedUser = authenticatedUser
, Jobs._nna_postNode = p }
-- postNodeAsync authenticatedUser nId p jHandle
postNodeAsyncAPI authenticatedUser nId =
Named.PostNodeAsyncAPI {
postNodeAsyncEp = serveWorkerAPI $ \p ->
Jobs.PostNodeAsync { _pna_node_id = nId
, _pna_authenticatedUser = authenticatedUser
, _pna_args = p }
}
------------------------------------------------------------------------
-- postNode' :: (CmdM env err m, HasSettings env, HasNodeError err)
......
......@@ -9,27 +9,26 @@ Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Node.Update
where
import Control.Lens (view)
import Data.Set qualified as Set
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Metrics qualified as Metrics
import Gargantext.API.Ngrams.Types qualified as NgramsTypes
import Gargantext.API.Node.Update.Types
import Gargantext.API.Prelude (GargM, simuLogs)
import Gargantext.API.Routes.Named.Node qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.NodeStory.Types (HasNodeStory)
import Gargantext.Core.Text.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Core.Types.Main (ListType(..))
import Gargantext.Core.Viz.Graph.API (recomputeGraph)
import Gargantext.Core.Viz.Phylo (subConfigAPI2config)
import Gargantext.Core.Viz.Phylo.API.Tools (flowPhyloAPI)
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (reIndexWith)
import Gargantext.Database.Action.Flow.Pairing (pairing)
import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContextScore)
......@@ -40,15 +39,18 @@ import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude
import Gargantext.System.Logging ( MonadLogger )
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Gargantext.Utils.UTCTime (timeMeasured)
import Servant.Server.Generic (AsServerT)
------------------------------------------------------------------------
api :: NodeId -> Named.UpdateAPI (AsServerT (GargM Env BackendInternalError))
api nId = Named.UpdateAPI $ AsyncJobs $
serveJobsAPI UpdateNodeJob $ \jHandle p ->
updateNode nId p jHandle
api nId =
Named.UpdateAPI {
updateNodeEp = serveWorkerAPI $ \p ->
Jobs.UpdateNode { _un_node_id = nId
, _un_args = p }
}
updateNode :: (HasNodeStory env err m
, MonadJobStatus m
......
......@@ -32,19 +32,19 @@ data UpdateNodeParams = UpdateNodeParamsList { methodList :: !Method }
, id :: !NodeId }
| UpdateNodePhylo { config :: !PhyloSubConfigAPI }
deriving (Generic)
deriving (Generic, Show, Eq)
----------------------------------------------------------------------
data Method = Basic | Advanced | WithModel
deriving (Generic, Eq, Ord, Enum, Bounded)
deriving (Generic, Eq, Ord, Enum, Bounded, Show)
----------------------------------------------------------------------
data Granularity = NewNgrams | NewTexts | Both
deriving (Generic, Eq, Ord, Enum, Bounded)
deriving (Generic, Eq, Ord, Enum, Bounded, Show)
----------------------------------------------------------------------
data Charts = Sources | Authors | Institutes | Ngrams | All
deriving (Generic, Eq, Ord, Enum, Bounded)
deriving (Generic, Eq, Ord, Enum, Bounded, Show)
------------------------------------------------------------------------
data UpdateNodeConfigGraph = UpdateNodeConfigGraph { methodGraphMetric :: !GraphMetric
......@@ -54,7 +54,7 @@ data UpdateNodeConfigGraph = UpdateNodeConfigGraph { methodGraphMetric ::
, methodGraphNodeType1 :: !NgramsType
, methodGraphNodeType2 :: !NgramsType
}
deriving (Generic)
deriving (Generic, Show, Eq)
------------------------------------------------------------------------
-- TODO unPrefix "pn_" FromJSON, ToJSON, ToSchema, adapt frontend.
......
......@@ -10,7 +10,6 @@ Portability : POSIX
-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MonoLocalBinds #-}
module Gargantext.API.Prelude
......
......@@ -18,19 +18,15 @@ module Gargantext.API.Routes
where
import Data.Validity
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Types.Individu (User(..))
-- import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant
import Servant.Auth.Swagger ()
import Servant.Server.Generic (AsServerT)
......@@ -61,41 +57,31 @@ waitAPI n = do
-}
addCorpusWithQuery :: User -> Named.AddWithQuery (AsServerT (GargM Env BackendInternalError))
addCorpusWithQuery user = Named.AddWithQuery $ \cId ->
serveWorkerAPI $ \p ->
Jobs.AddCorpusWithQuery { Jobs._acq_args = p
, Jobs._acq_user = user
, Jobs._acq_cid = cId }
-- addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
-- addCorpusWithForm user = Named.AddWithForm $ \cid -> AsyncJobs $
-- serveJobsAPI AddCorpusFormJob $ \_jHandle i -> do
-- -- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- -- called in a few places, and the job status might be different between invocations.
-- -- markStarted 3 jHandle
-- -- New.addToCorpusWithForm user cid i jHandle
-- void $ Jobs.sendJob $ Jobs.AddCorpusFormAsync { Jobs._acf_args = i
-- , Jobs._acf_user = user
-- , Jobs._acf_cid = cid }
addCorpusWithQuery user =
Named.AddWithQuery {
addWithQueryEp = \cId -> serveWorkerAPI $ \p ->
Jobs.AddCorpusWithQuery { Jobs._acq_args = p
, Jobs._acq_user = user
, Jobs._acq_cid = cId }
}
addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
addCorpusWithForm user = Named.AddWithForm $ \cId ->
serveWorkerAPI $ \p ->
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
Jobs.AddCorpusFormAsync { Jobs._acf_args = p
, Jobs._acf_user = user
, Jobs._acf_cid = cId }
--addCorpusWithFile :: User -> ServerT Named.AddWithFile (GargM Env BackendInternalError)
--addCorpusWithFile user cid =
-- serveJobsAPI AddCorpusFileJob $ \jHandle i ->
-- New.addToCorpusWithFile user cid i jHandle
addCorpusWithForm user =
Named.AddWithForm {
addWithFormEp = \cId -> serveWorkerAPI $ \p ->
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
Jobs.AddCorpusFormAsync { Jobs._acf_args = p
, Jobs._acf_user = user
, Jobs._acf_cid = cId }
}
addAnnuaireWithForm :: Named.AddAnnuaireWithForm (AsServerT (GargM Env BackendInternalError))
addAnnuaireWithForm = Named.AddAnnuaireWithForm $ \cid -> AsyncJobs $
serveJobsAPI AddAnnuaireFormJob $ \jHandle i ->
Annuaire.addToAnnuaireWithForm cid i jHandle
addAnnuaireWithForm =
Named.AddAnnuaireWithForm {
addWithFormEp = \aId -> serveWorkerAPI $ \i ->
Jobs.AddToAnnuaireWithForm { _aawf_annuaire_id = aId
, _aawf_args = i }
}
{-|
Module : Gargantext.API.Routes.Named.Annuaire
Description : Annuaire API routes
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Routes.Named.Annuaire (
......@@ -6,8 +17,8 @@ module Gargantext.API.Routes.Named.Annuaire (
) where
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.Corpus.Annuaire (AnnuaireWithForm)
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Database.Admin.Types.Node
import Servant
......@@ -18,5 +29,5 @@ newtype AddAnnuaireWithForm mode = AddAnnuaireWithForm
:> "add"
:> "form"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] AnnuaireWithForm JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] AnnuaireWithForm)
} deriving Generic
......@@ -11,9 +11,9 @@ module Gargantext.API.Routes.Named.Contact (
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.Contact.Types (AddContactParams(..))
import Gargantext.API.Routes.Named.Node (NodeNodeAPI(..))
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Database.Admin.Types.Hyperdata.Contact
import Gargantext.Database.Admin.Types.Node
import Servant
......@@ -26,5 +26,5 @@ data ContactAPI mode = ContactAPI
newtype ContactAsyncAPI mode = ContactAsyncAPI
{ addContactAsyncEp :: mode :- NamedRoutes (AsyncJobs JobLog '[JSON] AddContactParams JobLog)
{ addContactAsyncEp :: mode :- NamedRoutes (WorkerAPI '[JSON] AddContactParams)
} deriving Generic
......@@ -33,7 +33,6 @@ newtype AddWithForm mode = AddWithForm
:> "add"
:> "form"
:> "async"
-- :> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
} deriving Generic
......@@ -42,6 +41,5 @@ newtype AddWithQuery mode = AddWithQuery
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "query"
-- :> NamedRoutes (AsyncJobs JobLog '[JSON] WithQuery JobLog)
:> NamedRoutes (WorkerAPI '[JSON] WithQuery)
} deriving Generic
{-# LANGUAGE TemplateHaskell #-}
{-|
Module : Gargantext.API.Routes.Named.Document
Description : Document API
Copyright : (c) CNRS, 2024
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Routes.Named.Document (
......@@ -15,10 +25,10 @@ module Gargantext.API.Routes.Named.Document (
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.Document.Export.Types
import Gargantext.API.Node.DocumentsFromWriteNodes.Types ( Params(..) )
import Gargantext.API.Node.DocumentUpload.Types ( DocumentUpload(..), )
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Utils.Servant (ZIP)
import Servant
......@@ -37,7 +47,7 @@ data DocumentExportEndpoints mode = DocumentExportEndpoints
newtype DocumentsFromWriteNodesAPI mode = DocumentsFromWriteNodesAPI
{ docFromWriteNodesEp :: mode :- Summary " Documents from Write nodes."
:> NamedRoutes (AsyncJobs JobLog '[JSON] Params JobLog)
:> NamedRoutes (WorkerAPI '[JSON] Params)
} deriving Generic
......@@ -46,5 +56,5 @@ newtype DocumentUploadAPI mode = DocumentUploadAPI
:> "document"
:> "upload"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[JSON] DocumentUpload JobLog)
:> NamedRoutes (WorkerAPI '[JSON] DocumentUpload)
} deriving Generic
......@@ -8,10 +8,10 @@ module Gargantext.API.Routes.Named.File (
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Node.File.Types
import Gargantext.API.Node.Types
import Gargantext.API.Worker (WorkerAPI)
import Servant
import Gargantext.API.Node.File.Types
data FileAPI mode = FileAPI
{ fileDownloadEp :: mode :- Summary "File download"
......@@ -24,6 +24,6 @@ data FileAsyncAPI mode = FileAsyncAPI
{ addFileAsyncEp :: mode :- Summary "File Async Api"
:> "file"
:> "add"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] NewWithFile JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithFile)
} deriving Generic
......@@ -8,7 +8,7 @@ module Gargantext.API.Routes.Named.FrameCalc (
import Servant
import GHC.Generics
import Gargantext.API.Node.FrameCalcUpload.Types (FrameCalcUpload)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Worker (WorkerAPI)
newtype FrameCalcAPI mode = FrameCalcAPI
......@@ -16,6 +16,6 @@ newtype FrameCalcAPI mode = FrameCalcAPI
:> "add"
:> "framecalc"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[JSON] FrameCalcUpload JobLog)
:> NamedRoutes (WorkerAPI '[JSON] FrameCalcUpload)
} deriving Generic
......@@ -10,10 +10,10 @@ module Gargantext.API.Routes.Named.List (
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Types
import Gargantext.API.Types (HTML)
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Utils.Servant qualified as GUS
import Servant
......@@ -40,7 +40,7 @@ newtype JSONAPI mode = JSONAPI
:> "add"
:> "form"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] WithJsonFile JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] WithJsonFile)
} deriving Generic
......@@ -52,5 +52,5 @@ newtype TSVAPI mode = TSVAPI
:> "add"
:> "form"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] WithTextFile JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] WithTextFile)
} deriving Generic
......@@ -29,7 +29,6 @@ module Gargantext.API.Routes.Named.Node (
) where
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Auth.PolicyCheck ( PolicyChecked )
import Gargantext.API.Ngrams.Types (TabType(..))
import Gargantext.API.Routes.Named.Document
......@@ -43,6 +42,7 @@ import Gargantext.API.Routes.Named.Table
import Gargantext.API.Node.Types ( RenameNode(..), NodesToScore(..), NodesToCategory(..) )
import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Update.Types ( UpdateNodeParams(..), Charts(..), Granularity(..), Method(..) )
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Core.Types
import Gargantext.Core.Types.Query
import Gargantext.Database.Admin.Types.Hyperdata.User ( HyperdataUser )
......@@ -133,7 +133,7 @@ newtype NodeNodeAPI a mode = NodeNodeAPI
newtype PostNodeAsyncAPI mode = PostNodeAsyncAPI
{ postNodeAsyncEp :: mode :- Summary "Post Node"
:> "async"
:> NamedRoutes (AsyncJobs JobLog '[FormUrlEncoded] PostNode JobLog)
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] PostNode)
} deriving Generic
......@@ -146,7 +146,7 @@ newtype CatAPI mode = CatAPI
newtype UpdateAPI mode = UpdateAPI
{ updateNodeEp :: mode :- Summary " Update node according to NodeType params"
:> NamedRoutes (AsyncJobs JobLog '[JSON] UpdateNodeParams JobLog)
:> NamedRoutes (WorkerAPI '[JSON] UpdateNodeParams)
} deriving Generic
......
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Routes.Named.Table (
......@@ -19,10 +18,10 @@ module Gargantext.API.Routes.Named.Table (
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.HashedResponse
import Gargantext.API.Ngrams.Types (TabType(..), UpdateTableNgramsCharts, Version, QueryParamR, Versioned, VersionedWithCount, NgramsTable, NgramsTablePatch)
import Gargantext.API.Ngrams.Types qualified as Ngrams
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.API.Table.Types ( TableQuery(..), FacetTableResult )
import Gargantext.Core.Text.Corpus.Query (RawQuery)
import Gargantext.Core.Types.Main (ListType)
......@@ -106,5 +105,5 @@ data TableNgramsAsyncAPI mode = TableNgramsAsyncAPI
:> "async"
:> "charts"
:> "update"
:> NamedRoutes (AsyncJobs JobLog '[JSON] UpdateTableNgramsCharts JobLog)
:> NamedRoutes (WorkerAPI '[JSON] UpdateTableNgramsCharts)
} deriving Generic
......@@ -19,14 +19,13 @@ module Gargantext.API.Routes.Named.Viz (
import Data.Aeson ( Value )
import Data.Text (Text)
import GHC.Generics
import Gargantext.API.Admin.Orchestrator.Types ( JobLog )
import Gargantext.API.Viz.Types (PhyloData(..))
import Gargantext.API.Worker (WorkerAPI)
import Gargantext.Core.Types
import Gargantext.Core.Viz.Graph.Types
import Gargantext.Core.Viz.LegacyPhylo (Level)
import Gargantext.Core.Viz.Phylo.Legacy.LegacyMain (MinSizeBranch)
import Servant
import Servant.Job.Async (AsyncJobsAPI)
import Servant.XML.Conduit (XML)
......@@ -64,7 +63,7 @@ data GraphAPI mode = GraphAPI
newtype GraphAsyncAPI mode = GraphAsyncAPI
{ recomputeGraphEp :: mode :- Summary "Recompute graph"
:> "recompute"
:> AsyncJobsAPI JobLog () JobLog
:> NamedRoutes (WorkerAPI '[JSON] ())
} deriving Generic
......
......@@ -3,6 +3,7 @@ module Gargantext.API.Server.Named.Ngrams (
-- * Server handlers
apiNgramsTableCorpus
, apiNgramsTableDoc
, tableNgramsPostChartsAsync
) where
import Control.Lens ((%%~))
......@@ -11,16 +12,17 @@ import Data.Set qualified as Set
import Gargantext.API.Admin.Auth (withNamedAccess)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, PathId (..))
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Admin.Orchestrator.Types (AsyncJobs(..))
import Gargantext.API.Errors.Types (BackendInternalError)
import Gargantext.API.Metrics qualified as Metrics
import Gargantext.API.Ngrams
import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Table qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.NodeStory.Types (HasNodeStory)
import Gargantext.Core.Types hiding (Terms)
import Gargantext.Core.Types.Query (Limit(..), Offset(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Admin.Config (userMaster)
import Gargantext.Database.Query.Table.Ngrams ( selectNgramsByDoc )
import Gargantext.Database.Query.Table.Node (getNode)
......@@ -28,7 +30,6 @@ import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node.Select ( selectNodesWithUsername )
import Gargantext.Database.Schema.Node (node_id, node_parent_id, node_user_id)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Gargantext.Utils.Jobs.Monad
import Servant.Server.Generic (AsServerT)
......@@ -65,10 +66,12 @@ getTableNgramsVersion _nId _tabType listId = currentVersion listId
apiNgramsAsync :: NodeId -> Named.TableNgramsAsyncAPI (AsServerT (GargM Env BackendInternalError))
apiNgramsAsync _dId = Named.TableNgramsAsyncAPI $ AsyncJobs $
serveJobsAPI TableNgramsJob $ \jHandle i -> withTracer (printDebug "tableNgramsPostChartsAsync") jHandle $
\jHandle' -> tableNgramsPostChartsAsync i jHandle'
apiNgramsAsync nId =
Named.TableNgramsAsyncAPI {
updateTableNgramsChartsEp = serveWorkerAPI $ \p ->
Jobs.NgramsPostCharts { Jobs._npc_node_id = nId
, Jobs._npc_args = p }
}
tableNgramsPostChartsAsync :: ( HasNodeStory env err m
, MonadJobStatus m )
......
......@@ -38,3 +38,18 @@ serveWorkerAPI f = WorkerAPI { workerAPIPost }
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
serveWorkerAPIEJob :: (MonadError err m, IsGargServer env err m)
=> (input -> Either err Job)
-> WorkerAPI contentType input (AsServerT m)
serveWorkerAPIEJob f = WorkerAPI { workerAPIPost }
where
workerAPIPost i = do
let eJob = f i
case eJob of
Left err -> throwError err
Right job -> do
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
......@@ -72,11 +72,6 @@ gServer (NotificationsConfig { .. }) = do
forever $ do
r <- atomically $ TChan.readTChan tChan
case Aeson.decode (BSL.fromStrict r) of
Just _ujp@(UpdateJobProgress _s) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] " <> show ujp
-- send the same message that we received
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
Just (UpdateTreeFirstLevel _node_id) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
......
......@@ -15,18 +15,13 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.Notifications.CentralExchange.Types where
import Codec.Binary.UTF8.String qualified as CBUTF8
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Lazy qualified as BSL
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.Core.Types (NodeId)
import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Prelude
import Prelude qualified
import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobStatus)
{-
......@@ -38,17 +33,14 @@ various events).
-- | INTERNAL MESSAGES
data CEMessage =
-- | Old-style jobs, update progress
UpdateJobProgress (JobStatus 'Safe JobLog)
-- | New-style jobs (async worker).
-- Please note that (I think) all jobs are associated with some NodeId
-- (providing a nodeId allows us to discover new jobs on the frontend).
-- | UpdateWorkerProgress JobInfo NodeId JobLog
| UpdateWorkerProgress JobInfo JobLog
UpdateWorkerProgress JobInfo JobLog
-- | Update tree for given nodeId
| UpdateTreeFirstLevel NodeId
instance Prelude.Show CEMessage where
show (UpdateJobProgress js) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
......@@ -56,9 +48,6 @@ instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_job_progress" -> do
js <- o .: "js"
pure $ UpdateJobProgress js
"update_worker_progress" -> do
ji <- o .: "ji"
jl <- o .: "jl"
......@@ -70,11 +59,6 @@ instance FromJSON CEMessage where
pure $ UpdateTreeFirstLevel node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where
toJSON (UpdateJobProgress js) = object [
"type" .= toJSON ("update_job_progress" :: Text)
, "js" .= toJSON js
]
-- toJSON (UpdateWorkerProgress ji nodeId jl) = object [
toJSON (UpdateWorkerProgress ji jl) = object [
"type" .= toJSON ("update_worker_progress" :: Text)
, "ji" .= toJSON ji
......
......@@ -39,7 +39,6 @@ import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS
import Servant.Job.Types (job_id)
import StmContainers.Set qualified as SSet
{-
......@@ -150,10 +149,6 @@ sendNotification throttleTChan ceMessage sub = do
-- exchange message - decide whether to send this message via
-- that socket or not
case (topic, ceMessage) of
(UpdateJobProgress jId, CETypes.UpdateJobProgress jobStatus) -> do
if jId == jobStatus ^. job_id
then Just $ NUpdateJobProgress jId jobStatus -- (MJobStatus jobStatus)
else Nothing
-- (UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' nodeId jobLog) -> do
(UpdateWorkerProgress jobInfo, CETypes.UpdateWorkerProgress jobInfo' jobLog) -> do
if jobInfo == jobInfo'
......@@ -197,8 +192,6 @@ _filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessag
-- 'CETypes.CEMessage' (i.e. should given 'Subscription' be informed
-- of this message).
ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool
ceMessageSubPred (CETypes.UpdateJobProgress js) (Subscription { s_topic }) =
s_topic == UpdateJobProgress (js ^. job_id)
-- ceMessageSubPred (CETypes.UpdateWorkerProgress ji nodeId _jl) (Subscription { s_topic }) =
ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }) =
s_topic == UpdateWorkerProgress ji
......
......@@ -216,26 +216,17 @@ class HasDispatcher env dispatcher where
-- | A notification is sent to clients who subscribed to specific topics
data Notification =
-- NUpdateJobProgress (JobID 'Safe) MJobStatus
NUpdateJobProgress (JobID 'Safe) (JobStatus 'Safe JobLog)
-- | NUpdateWorkerProgress JobInfo NodeId MJobLog
| NUpdateWorkerProgress JobInfo JobLog
NUpdateWorkerProgress JobInfo JobLog
| NUpdateTree NodeId
| NWorkerJobStarted NodeId JobInfo
| NWorkerJobFinished NodeId JobInfo
instance Prelude.Show Notification where
show (NUpdateJobProgress jId mjs) = "NUpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) -- <> ", " <> show mjs
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
show (NUpdateWorkerProgress jobInfo mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show mJobLog
show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId
show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji
show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji
instance ToJSON Notification where
toJSON (NUpdateJobProgress jId mjs) = Aeson.object [
"type" .= ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
, "job_status" .= toJSON mjs
]
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
toJSON (NUpdateWorkerProgress jobInfo mJobLog) = Aeson.object [
"type" .= ("update_worker_progress" :: Text)
......@@ -262,10 +253,6 @@ instance FromJSON Notification where
parseJSON = Aeson.withObject "Notification" $ \o -> do
t <- o .: "type"
case t of
"update_job_progress" -> do
jId <- o .: "j_id"
mjs <- o .: "job_status"
pure $ NUpdateJobProgress jId mjs
"update_worker_progress" -> do
jobInfo <- o .: "job_info"
mJobLog <- o .: "job_log"
......
......@@ -13,18 +13,18 @@ Portability : POSIX
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedLists #-} -- allows to write Map and HashMap as lists
{-# LANGUAGE TypeOperators #-}
module Gargantext.Core.Viz.Graph.API
where
import Control.Lens (set, _Just, (^?), at)
import Data.HashMap.Strict qualified as HashMap
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Ngrams.Tools
import Gargantext.API.Prelude (GargM)
import Gargantext.API.Routes.Named.Viz qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Methods.Similarities (Similarity(..), GraphMetric(..), withMetric)
import Gargantext.Core.NodeStory.Types ( HasNodeStory, a_version, unNodeStory, NodeListStory )
import Gargantext.Core.Text.Ngrams (NgramsType(..))
......@@ -32,6 +32,7 @@ import Gargantext.Core.Types.Main ( ListType(MapTerm) )
import Gargantext.Core.Viz.Graph.GEXF ()
import Gargantext.Core.Viz.Graph.Tools -- (cooc2graph)
import Gargantext.Core.Viz.Graph.Types
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Metrics.NgramsByContext (getContextsByNgramsOnlyUser)
import Gargantext.Database.Action.Node (mkNodeWithParent)
import Gargantext.Database.Admin.Config ( userMaster )
......@@ -43,7 +44,7 @@ import Gargantext.Database.Query.Table.Node.Select ( selectNodesWithUsername )
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata, node_name)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI, MonadJobStatus(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant
import Servant.Server.Generic (AsServerT)
......@@ -213,8 +214,10 @@ defaultGraphMetadata cId lId t repo gm str = do
-- (map (\n -> LegendField n "#FFFFFF" (pack $ show n)) [1..10])
graphAsync :: NodeId -> Named.GraphAsyncAPI (AsServerT (GargM Env BackendInternalError))
graphAsync n = Named.GraphAsyncAPI $
serveJobsAPI RecomputeGraphJob $ \jHandle _ -> graphRecompute n jHandle
graphAsync nId =
Named.GraphAsyncAPI {
recomputeGraphEp = serveWorkerAPI $ const $ Jobs.RecomputeGraph { _rg_node_id = nId }
}
--graphRecompute :: UserId
......
......@@ -25,11 +25,21 @@ import Async.Worker.Types (HasWorkerBroker)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Ngrams.List (postAsyncJSON)
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Contact (addContact)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.DocumentsFromWriteNodes (documentsFromWriteNodes)
import Gargantext.API.Node.DocumentUpload (documentUploadAsync)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Update (updateNode)
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_jobs)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Viz.Graph.API (graphRecompute)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
......@@ -203,6 +213,9 @@ performAction env _state bm = do
case job of
Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping"
AddContact { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] add contact"
addContact _ac_user _ac_node_id _ac_args jh
AddCorpusFormAsync { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] add corpus form"
addToCorpusWithForm _acf_user _acf_cid _acf_args jh
......@@ -210,15 +223,43 @@ performAction env _state bm = do
$(logLocM) DEBUG "[performAction] add corpus with query"
let limit = Just $ fromIntegral $ env ^. hasConfig . gc_jobs . jc_max_docs_scrapers
addToCorpusWithQuery _acq_user _acq_cid _acq_args limit jh
AddToAnnuaireWithForm { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] add to annuaire with form"
Annuaire.addToAnnuaireWithForm _aawf_annuaire_id _aawf_args jh
AddWithFile { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] add with file"
addWithFile _awf_authenticatedUser _awf_node_id _awf_args jh
DocumentsFromWriteNodes { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] documents from write nodes"
documentsFromWriteNodes _dfwn_authenticatedUser _dfwn_node_id _dfwn_args jh
ForgotPasswordAsync { _fpa_args = ForgotPasswordAsyncParams { email } } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] forgot password: " <> email
us <- getUsersWithEmail (T.toLower email)
case us of
[u] -> forgotUserPassword u
_ -> pure ()
NewNodeAsync { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] new node async "
void $ postNode' _nna_authenticatedUser _nna_node_id _nna_postNode
FrameCalcUpload { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] frame calc upload"
frameCalcUploadAsync _fca_authenticatedUser _fca_node_id _fca_args jh
JSONPost { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] json post"
void $ postAsyncJSON _jp_list_id _jp_ngrams_list jh
NgramsPostCharts { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] ngrams post charts"
void $ tableNgramsPostChartsAsync _npc_args jh
PostNodeAsync { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] post node async"
void $ postNode' _pna_authenticatedUser _pna_node_id _pna_args
RecomputeGraph { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] recompute graph"
void $ graphRecompute _rg_node_id jh
UpdateNode { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] update node"
void $ updateNode _un_node_id _un_args jh
UploadDocument { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] upload document"
void $ documentUploadAsync _ud_node_id _ud_args jh
GargJob { _gj_garg_job } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "Garg job: " <> show _gj_garg_job <> " (handling of this job is still not implemented!)"
return ()
......@@ -18,7 +18,6 @@ import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view)
import Gargantext.API.Admin.EnvTypes qualified as EnvTypes
import Gargantext.Core.Config (gc_worker, HasConfig(..))
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
......@@ -30,7 +29,6 @@ import Gargantext.Prelude
sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env)
=> Job
-> Cmd' env err (MessageId PGMQBroker)
-- -> Cmd' env err ()
sendJob job = do
gcConfig <- view $ hasConfig
let WorkerSettings { _wsDefinitions } = gcConfig ^. gc_worker
......@@ -49,12 +47,3 @@ updateJobData :: Job -> W.SendJob PGMQBroker Job -> W.SendJob PGMQBroker Job
updateJobData (AddCorpusFormAsync {}) sj = sj { W.timeout = 300 }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 }
updateJobData _ sj = sj { W.resendOnKill = False }
-- | This is just a list of what's implemented and what not.
-- After we migrate to async workers, this should be removed
-- (see G.C.Worker -> performAction on what's implemented already)
handledJobs :: [ EnvTypes.GargJob ]
handledJobs = [ EnvTypes.AddCorpusFormJob
, EnvTypes.AddCorpusQueryJob
, EnvTypes.ForgotPasswordJob ]
......@@ -17,25 +17,55 @@ import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncParams)
import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.API.Ngrams.Types (NgramsList, UpdateTableNgramsCharts(_utn_list_id))
import Gargantext.API.Node.Corpus.Annuaire (AnnuaireWithForm)
import Gargantext.API.Node.Contact.Types (AddContactParams)
import Gargantext.API.Node.DocumentsFromWriteNodes.Types qualified as DFWN
import Gargantext.API.Node.DocumentUpload.Types (DocumentUpload)
import Gargantext.API.Node.FrameCalcUpload.Types (FrameCalcUpload)
import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types (NewWithForm, WithQuery(..))
import Gargantext.API.Node.Update.Types (UpdateNodeParams)
import Gargantext.API.Node.Types (NewWithFile, NewWithForm, WithQuery(..))
import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeId(UnsafeMkNodeId))
import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId))
import Gargantext.Prelude
data Job =
Ping
| AddContact { _ac_args :: AddContactParams
, _ac_node_id :: NodeId
, _ac_user :: User }
| AddCorpusFormAsync { _acf_args :: NewWithForm
, _acf_user :: User
, _acf_cid :: CorpusId }
| AddCorpusWithQuery { _acq_args :: WithQuery
, _acq_user :: User
, _acq_cid :: CorpusId }
| AddWithFile { _awf_args :: NewWithFile
, _awf_authenticatedUser :: AuthenticatedUser
, _awf_node_id :: NodeId }
| AddToAnnuaireWithForm { _aawf_annuaire_id :: AnnuaireId
, _aawf_args :: AnnuaireWithForm }
| DocumentsFromWriteNodes { _dfwn_args :: DFWN.Params
, _dfwn_authenticatedUser :: AuthenticatedUser
, _dfwn_node_id :: NodeId }
| ForgotPasswordAsync { _fpa_args :: ForgotPasswordAsyncParams }
| NewNodeAsync { _nna_node_id :: NodeId
, _nna_authenticatedUser :: AuthenticatedUser
, _nna_postNode :: PostNode }
| FrameCalcUpload { _fca_args :: FrameCalcUpload
, _fca_authenticatedUser :: AuthenticatedUser
, _fca_node_id :: NodeId }
| JSONPost { _jp_list_id :: ListId
, _jp_ngrams_list :: NgramsList }
| NgramsPostCharts { _npc_node_id :: NodeId
, _npc_args :: UpdateTableNgramsCharts }
| PostNodeAsync { _pna_node_id :: NodeId
, _pna_authenticatedUser :: AuthenticatedUser
, _pna_args :: PostNode }
| RecomputeGraph { _rg_node_id :: NodeId }
| UpdateNode { _un_node_id :: NodeId
, _un_args :: UpdateNodeParams }
| UploadDocument { _ud_node_id :: NodeId
, _ud_args :: DocumentUpload }
| GargJob { _gj_garg_job :: GargJob }
deriving (Show, Eq)
instance FromJSON Job where
......@@ -43,6 +73,11 @@ instance FromJSON Job where
type_ <- o .: "type"
case type_ of
"Ping" -> return Ping
"AddContact" -> do
_ac_args <- o .: "args"
_ac_node_id <- o .: "node_id"
_ac_user <- o .: "user"
return $ AddContact { .. }
"AddCorpusFormAsync" -> do
_acf_args <- o .: "args"
_acf_user <- o .: "user"
......@@ -53,20 +88,63 @@ instance FromJSON Job where
_acq_user <- o .: "user"
_acq_cid <- o .: "cid"
return $ AddCorpusWithQuery { .. }
"AddToAnnuaireWithForm" -> do
_aawf_args <- o .: "args"
_aawf_annuaire_id <- o .: "annuaire_id"
return $ AddToAnnuaireWithForm { .. }
"AddWithFile" -> do
_awf_args <- o .: "args"
_awf_authenticatedUser <- o .: "authenticated_user"
_awf_node_id <- o .: "node_id"
return $ AddWithFile { .. }
"DocumentsFromWriteNodes" -> do
_dfwn_args <- o .: "args"
_dfwn_authenticatedUser <- o .: "authenticated_user"
_dfwn_node_id <- o .: "node_id"
return $ DocumentsFromWriteNodes { .. }
"ForgotPasswordAsync" -> do
_fpa_args <- o .: "args"
return $ ForgotPasswordAsync { _fpa_args }
"NewNodeAsync" -> do
_nna_node_id <- o .: "node_id"
_nna_authenticatedUser <- o .: "authenticated_user"
_nna_postNode <- o .: "post_node"
return $ NewNodeAsync { .. }
return $ ForgotPasswordAsync { .. }
"FrameCalcUpload" -> do
_fca_args <- o .: "args"
_fca_authenticatedUser <- o .: "authenticated_user"
_fca_node_id <- o .: "node_id"
return $ FrameCalcUpload { .. }
"JSONPost" -> do
_jp_list_id <- o .: "list_id"
_jp_ngrams_list <- o .: "ngrams_list"
return $ JSONPost { .. }
"NgramsPostCharts" -> do
_npc_node_id <- o .: "node_id"
_npc_args <- o .: "args"
return $ NgramsPostCharts { .. }
"PostNodeAsync" -> do
_pna_node_id <- o .: "node_id"
_pna_authenticatedUser <- o .: "authenticated_user"
_pna_args <- o .: "args"
return $ PostNodeAsync { .. }
"RecomputeGraph" -> do
_rg_node_id <- o .: "node_id"
return $ RecomputeGraph { .. }
"UpdateNode" -> do
_un_node_id <- o .: "node_id"
_un_args <- o .: "args"
return $ UpdateNode { .. }
"UploadDocument" -> do
_ud_node_id <- o .: "node_id"
_ud_args <- o .: "args"
return $ UploadDocument { .. }
"GargJob" -> do
_gj_garg_job <- o .: "garg_job"
return $ GargJob { _gj_garg_job }
return $ GargJob { .. }
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [ "type" .= ("Ping" :: Text) ]
toJSON (AddContact { .. }) =
object [ "type" .= ("AddContact" :: Text)
, "args" .= _ac_args
, "user" .= _ac_user
, "node_id" .= _ac_node_id ]
toJSON (AddCorpusFormAsync { .. }) =
object [ "type" .= ("AddCorpusFormAsync" :: Text)
, "args" .= _acf_args
......@@ -77,14 +155,52 @@ instance ToJSON Job where
, "args" .= _acq_args
, "user" .= _acq_user
, "cid" .= _acq_cid ]
toJSON (AddToAnnuaireWithForm { .. }) =
object [ "type" .= ("AddToAnnuaireWithForm" :: Text)
, "args" .= _aawf_args
, "annuaire_id" .= _aawf_annuaire_id ]
toJSON (AddWithFile { .. }) =
object [ "type" .= ("AddWithFile" :: Text)
, "args" .= _awf_args
, "authenticated_user" .= _awf_authenticatedUser
, "node_id" .= _awf_node_id ]
toJSON (DocumentsFromWriteNodes { .. }) =
object [ "type" .= ("DocumentsFromWriteNodes" :: Text)
, "args" .= _dfwn_args
, "authenticated_user" .= _dfwn_authenticatedUser
, "node_id" .= _dfwn_node_id ]
toJSON (ForgotPasswordAsync { .. }) =
object [ "type" .= ("ForgotPasswordAsync" :: Text)
, "args" .= _fpa_args ]
toJSON (NewNodeAsync { .. }) =
object [ "type" .= ("NewNodeAsync" :: Text)
, "node_id" .= _nna_node_id
, "authenticated_user" .= _nna_authenticatedUser
, "post_node" .= _nna_postNode ]
toJSON (FrameCalcUpload { .. }) =
object [ "type" .= ("FrameCalcUpload" :: Text)
, "args" .= _fca_args
, "authenticated_user" .= _fca_authenticatedUser
, "node_id" .= _fca_node_id ]
toJSON (JSONPost { .. }) =
object [ "type" .= ("JSONPost" :: Text)
, "list_id" .= _jp_list_id
, "ngrams_list" .= _jp_ngrams_list ]
toJSON (NgramsPostCharts { .. }) =
object [ "type" .= ("NgramsPostCharts" :: Text)
, "node_id" .= _npc_node_id
, "args" .= _npc_args ]
toJSON (PostNodeAsync { .. }) =
object [ "type" .= ("PostNodeAsync" :: Text)
, "node_id" .= _pna_node_id
, "authenticated_user" .= _pna_authenticatedUser
, "args" .= _pna_args ]
toJSON (RecomputeGraph { .. }) =
object [ "type" .= ("RecomputeGraph" :: Text)
, "node_id" .= _rg_node_id ]
toJSON (UpdateNode { .. }) =
object [ "type" .= ("UpdateNode" :: Text)
, "node_id" .= _un_node_id
, "args" .= _un_args ]
toJSON (UploadDocument { .. }) =
object [ "type" .= ("UploadDocument" :: Text)
, "node_id" .= _ud_node_id
, "args" .= _ud_args ]
toJSON (GargJob { .. }) =
object [ "type" .= ("GargJob" :: Text)
, "garg_job" .= _gj_garg_job ]
......@@ -101,9 +217,19 @@ instance ToJSON Job where
getWorkerMNodeId :: Job -> Maybe NodeId
getWorkerMNodeId Ping = Nothing
getWorkerMNodeId (AddContact { _ac_node_id }) = Just _ac_node_id
getWorkerMNodeId (AddCorpusFormAsync { _acf_args, _acf_cid }) = Just _acf_cid
getWorkerMNodeId (AddCorpusWithQuery { _acq_args = WithQuery { _wq_node_id }}) = Just $ UnsafeMkNodeId _wq_node_id
getWorkerMNodeId (NewNodeAsync { _nna_node_id }) = Just _nna_node_id
getWorkerMNodeId (AddToAnnuaireWithForm { _aawf_annuaire_id }) = Just _aawf_annuaire_id
getWorkerMNodeId (AddWithFile { _awf_node_id }) = Just _awf_node_id
getWorkerMNodeId (DocumentsFromWriteNodes { _dfwn_node_id }) = Just _dfwn_node_id
getWorkerMNodeId (ForgotPasswordAsync {}) = Nothing
getWorkerMNodeId (FrameCalcUpload { _fca_node_id }) = Just _fca_node_id
getWorkerMNodeId (JSONPost { _jp_list_id }) = Just _jp_list_id
getWorkerMNodeId (NgramsPostCharts { _npc_args }) = Just $ _utn_list_id _npc_args
getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id
getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id
getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (GargJob {}) = Nothing
{-|
Module : Gargantext.Utils.Jobs
Description : Gargantext utilities
Copyright : (c) CNRS, 2017
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.Utils.Jobs (
-- * Serving the JOBS API
serveJobsAPI
-- * Parsing and reading @GargJob@s from disk
, readPrios
-- * Handy re-exports
, MonadJobStatus(..)
, markFailureNoErr
, markFailedNoErr
) where
import Data.Text qualified as T
import Gargantext.API.Admin.EnvTypes ( mkJobHandle, parseGargJob, Env, GargJob(..) )
import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) )
import Gargantext.API.Prelude ( GargM )
import Gargantext.Core.Worker.Jobs qualified as Jobs
-- import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal qualified as Internal
import Gargantext.Utils.Jobs.Monad ( JobError, MonadJobStatus(..), markFailureNoErr, markFailedNoErr )
-- import Prelude
import Servant.Job.Async qualified as SJ
import System.Directory (doesFileExist)
jobErrorToGargError
:: JobError -> BackendInternalError
jobErrorToGargError = InternalJobError
serveJobsAPI
:: (
Foldable callbacks
, Ord (JobType m)
, Show (JobType m)
, ToJSON (JobEventType m)
, ToJSON (JobOutputType m)
, MonadJobStatus m
, m ~ GargM Env BackendInternalError
, JobEventType m ~ JobOutputType m
, MonadLogger m
)
=> JobType m
-> (JobHandle m -> input -> m ())
-> SJ.AsyncJobsServerT' ctI ctO callbacks (JobEventType m) input (JobOutputType m) m
serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do
runExceptT $ flip runReaderT env $ do
$(logLocM) DEBUG (T.pack $ "Running job of type: " ++ show jobType)
when (jobType `elem` Jobs.handledJobs) $
panicTrace "[serveJobsAPI] WRONG! Use Garagntext.API.Worker.serveWorkerAPI instead!"
-- void $ Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType }
f jHandle i
getLatestJobStatus jHandle
parsePrios :: [Text] -> IO [(GargJob, Int)]
parsePrios [] = pure []
parsePrios (x : xs) = (:) <$> go (T.unpack x) <*> parsePrios xs
where
go s = case break (=='=') s of
([], _) -> errorTrace "parsePrios: empty jobname?"
(prop, valS)
| Just val <- readMaybe (T.tail $ T.pack valS)
, Just j <- parseGargJob (T.pack prop) -> pure (j, val)
| otherwise -> errorTrace $
"parsePrios: invalid input. " ++ show (prop, valS)
readPrios :: Logger IO -> FilePath -> IO [(GargJob, Int)]
readPrios logger fp = do
exists <- doesFileExist fp
case exists of
False -> do
$(logLoc) logger WARNING $ T.pack $ fp ++ " doesn't exist, using default job priorities."
pure []
True -> parsePrios . lines =<< readFile fp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment