Commit 0d4e0554 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Move terms updating to separate job as well

parent c62480c7
Pipeline #7207 passed with stages
in 49 minutes and 14 seconds
......@@ -10,6 +10,7 @@ Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE ViewPatterns #-}
......@@ -20,17 +21,17 @@ import Data.ByteString.Lazy qualified as BSL
import Data.Csv qualified as Tsv
import Data.HashMap.Strict (HashMap)
import Data.HashMap.Strict qualified as HashMap
import Data.Map.Strict (toList)
import Data.Map.Strict qualified as Map
import Data.Map.Strict (toList)
import Data.Set qualified as Set
import Data.Text (concat, pack, splitOn)
import Data.Vector (Vector)
import Data.Vector qualified as Vec
import Data.Vector (Vector)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types (BackendInternalError(InternalServerError))
import Gargantext.API.Ngrams (setListNgrams)
import Gargantext.API.Ngrams.List.Types (_wjf_data, _wtf_data)
import Gargantext.API.Ngrams.Prelude (getNgramsList)
import Gargantext.API.Ngrams (setListNgrams)
import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude (GargM, serverError, HasServerError)
import Gargantext.API.Routes.Named.List qualified as Named
......@@ -46,11 +47,13 @@ import Gargantext.Database.Schema.Ngrams ( text2ngrams, NgramsId )
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude hiding (concat, toList)
import Gargantext.System.Logging (logLocM, MonadLogger)
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Prelude qualified
import Protolude qualified as P
import Servant
import Servant.Server.Generic (AsServerT)
import Gargantext.System.Logging (LogLevel(..))
getAPI :: Named.GETAPI (AsServerT (GargM Env BackendInternalError))
......@@ -114,7 +117,7 @@ jsonPostAsync = Named.JSONAPI {
}
------------------------------------------------------------------------
postAsyncJSON :: (HasNodeStory env err m, MonadJobStatus m)
postAsyncJSON :: (HasNodeStory env err m, MonadJobStatus m, MonadLogger m)
=> ListId
-> NgramsList
-> JobHandle m
......@@ -123,13 +126,17 @@ postAsyncJSON l ngramsList jobHandle = do
markStarted 2 jobHandle
$(logLocM) DEBUG "[postAsyncJSON] Setting the Ngrams list ..."
setList
$(logLocM) DEBUG "[postAsyncJSON] Done."
markProgress 1 jobHandle
corpus_node <- getNode l -- (Proxy :: Proxy HyperdataList)
let corpus_id = fromMaybe (panicTrace "no parent_id") (_node_parent_id corpus_node)
$(logLocM) DEBUG "[postAsyncJSON] Executing re-indexing..."
_ <- reIndexWith corpus_id l NgramsTerms (Set.fromList [MapTerm, CandidateTerm])
$(logLocM) DEBUG "[postAsyncJSON] Re-indexing done."
markComplete jobHandle
......@@ -205,7 +212,7 @@ tsvToNgramsTableMap record = case Vec.toList record of
-- | This is for debugging the TSV parser in the REPL
importTsvFile :: forall env err m. (HasNodeStory env err m, HasServerError err, MonadJobStatus m)
importTsvFile :: forall env err m. (HasNodeStory env err m, HasServerError err, MonadJobStatus m, MonadLogger m)
=> ListId -> P.FilePath -> m ()
importTsvFile lId fp = do
contents <- liftBase $ P.readFile fp
......
......@@ -38,10 +38,10 @@ import Gargantext.Database.Action.Flow (addDocumentsToHyperCorpus)
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataCorpus )
import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..))
import Gargantext.Database.Admin.Types.Node ( DocId, NodeId, NodeType(NodeCorpus) )
import Gargantext.Database.Admin.Types.Node ( DocId, NodeId, NodeType(NodeCorpus), ParentId )
import Gargantext.Database.Prelude (IsDBCmd)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType', getClosestParentIdByType)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Database.Schema.Node (_node_hyperdata)
import Gargantext.Prelude
import Gargantext.System.Logging (logLocM, LogLevel(..), MonadLogger)
......@@ -114,18 +114,15 @@ remoteImportDocuments :: ( HasNodeError err
, MonadLogger m
, MonadIO m)
=> AuthenticatedUser
-> ParentId
-> NodeId
-> DocumentExport
-> m [NodeId]
remoteImportDocuments loggedInUser nodeId (DocumentExport documents _gargVersion) = do
mb_corpusId <- getClosestParentIdByType nodeId NodeCorpus
case mb_corpusId of
Nothing -> panicTrace $ "remoteImportDocuments: impossible, freshly imported doc node without parent corpus"
Just corpusId -> do
let la = Multi EN
nlpServerConfig <- view $ nlpServerGet (_tt_lang la)
$(logLocM) INFO $ "Importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
docs <- addDocumentsToHyperCorpus nlpServerConfig (Nothing :: Maybe HyperdataCorpus) la corpusId (map (_node_hyperdata . _d_document) documents)
_versioned <- commitCorpus corpusId (RootId $ _auth_node_id loggedInUser)
$(logLocM) INFO $ "Done importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
pure docs
remoteImportDocuments loggedInUser corpusId nodeId (DocumentExport documents _gargVersion) = do
let la = Multi EN
nlpServerConfig <- view $ nlpServerGet (_tt_lang la)
$(logLocM) INFO $ "Importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
docs <- addDocumentsToHyperCorpus nlpServerConfig (Nothing :: Maybe HyperdataCorpus) la corpusId (map (_node_hyperdata . _d_document) documents)
_versioned <- commitCorpus corpusId (RootId $ _auth_node_id loggedInUser)
$(logLocM) INFO $ "Done importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
pure docs
......@@ -16,7 +16,7 @@ import Control.Exception.Safe qualified as Safe
import Control.Exception (toException)
import Control.Lens (view, (#))
import Control.Monad.Except (throwError, MonadError)
import Control.Monad (void)
import Control.Monad (void, liftM2)
import Data.Aeson qualified as JSON
import Data.ByteString.Builder qualified as B
import Data.ByteString.Lazy qualified as BL
......@@ -116,15 +116,13 @@ remoteImportHandler loggedInUser c = do
Just ty -> do
new_node <- insertNodeWithHyperdata ty (_node_name x) (_node_hyperdata x) mb_parent (_auth_user_id loggedInUser)
$(logLocM) INFO $ "Created a new node " <> T.pack (show $ new_node) <> " of type " <> T.pack (show ty)
for_ mb_docs $ \docsList -> do
for_ (liftM2 (,) mb_docs mb_parent) $ \(docsList, parentId) -> do
$(logLocM) INFO $ "Found document list to import..."
let payload = Jobs.ImportRemoteDocumentsPayload loggedInUser new_node docsList
let payload = Jobs.ImportRemoteDocumentsPayload loggedInUser parentId new_node docsList
void $ sendJob $ Jobs.ImportRemoteDocuments payload
for_ mb_terms $ \ngramsList -> do
$(logLocM) INFO $ "Found ngrams list to import..."
void $ sendJob $ Jobs.JSONPost { _jp_list_id = new_node
, _jp_ngrams_list = ngramsList
}
void $ sendJob $ Jobs.ImportRemoteTerms $ Jobs.ImportRemoteTermsPayload new_node ngramsList
pure new_node
insertTrees :: Maybe NodeId -> [NodeId] -> Tree ExportableNode -> m [NodeId]
......
......@@ -26,14 +26,15 @@ import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Ngrams.List (postAsyncJSON)
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Contact (addContact)
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.DocumentsFromWriteNodes (documentsFromWriteNodes)
import Gargantext.API.Node.DocumentUpload (documentUploadAsync, remoteImportDocuments)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Node.Update (updateNode)
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_notifications_config, gc_worker)
......@@ -44,8 +45,8 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Viz.Graph.API (graphRecompute)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId, ImportRemoteDocumentsPayload(..), ImportRemoteTermsPayload(..))
import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId, ImportRemoteDocumentsPayload(..))
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude hiding (to)
......@@ -298,6 +299,17 @@ performAction env _state bm = do
void $ documentUploadAsync _ud_node_id _ud_args jh
-- | Remotely import documents
ImportRemoteDocuments (ImportRemoteDocumentsPayload loggedInUser corpusId docs) -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote documents"
void $ remoteImportDocuments loggedInUser corpusId docs
ImportRemoteTerms (ImportRemoteTermsPayload list_id ngrams_list)
-> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote terms"
void $ postAsyncJSON list_id ngrams_list jh
-- Trigger an 'UpdateNode' job to update the score(s)
$(logLocM) DEBUG $ "Updating node scores for corpus node " <> T.pack (show list_id)
void $ updateNode list_id (UpdateNodeParamsTexts Both) jh
$(logLocM) DEBUG $ "Done updating node scores for corpus node " <> T.pack (show list_id)
-- | Remotely import documents
ImportRemoteDocuments (ImportRemoteDocumentsPayload loggedInUser parentId corpusId docs)
-> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote documents"
void $ remoteImportDocuments loggedInUser parentId corpusId docs
......@@ -61,6 +61,8 @@ updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 }
updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 }
updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 }
updateJobData (UploadDocument {}) sj = sj { W.timeout = 3000 }
updateJobData (ImportRemoteDocuments {}) sj = sj { W.timeout = 3000 }
updateJobData (ImportRemoteTerms {}) sj = sj { W.timeout = 3000 }
-- | ForgotPasswordAsync, PostNodeAsync
updateJobData _ sj = sj { W.resendOnKill = False
, W.timeout = 60 }
......@@ -31,12 +31,31 @@ import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types (NewWithFile, NewWithForm, WithQuery(..))
import Gargantext.API.Node.Update.Types (UpdateNodeParams)
import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId))
import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId), ParentId)
import Gargantext.Prelude
data ImportRemoteTermsPayload
= ImportRemoteTermsPayload
{ _irtp_list_id :: ListId
, _irtp_ngrams_list :: NgramsList
} deriving (Show, Eq)
instance ToJSON ImportRemoteTermsPayload where
toJSON ImportRemoteTermsPayload{..} =
object [ "list_id" .= _irtp_list_id
, "ngrams_list" .= _irtp_ngrams_list
]
instance FromJSON ImportRemoteTermsPayload where
parseJSON = withObject "ImportRemoteTermsPayload" $ \o -> do
_irtp_list_id <- o .: "list_id"
_irtp_ngrams_list <- o .: "ngrams_list"
pure ImportRemoteTermsPayload{..}
data ImportRemoteDocumentsPayload
= ImportRemoteDocumentsPayload
{ _irdp_user :: AuthenticatedUser
, _irdp_parent_id :: ParentId
, _irdp_corpus_id :: NodeId
, _irdp_document_export :: DocumentExport
} deriving (Show, Eq)
......@@ -45,12 +64,14 @@ instance ToJSON ImportRemoteDocumentsPayload where
toJSON ImportRemoteDocumentsPayload{..} =
object [ "user" .= _irdp_user
, "corpus_id" .= _irdp_corpus_id
, "parent_id" .= _irdp_parent_id
, "document_export" .= _irdp_document_export
]
instance FromJSON ImportRemoteDocumentsPayload where
parseJSON = withObject "ImportRemoteDocumentsPayload" $ \o -> do
_irdp_user <- o .: "user"
_irdp_parent_id <- o .: "parent_id"
_irdp_corpus_id <- o .: "corpus_id"
_irdp_document_export <- o .: "document_export"
pure ImportRemoteDocumentsPayload{..}
......@@ -91,6 +112,7 @@ data Job =
| UploadDocument { _ud_node_id :: NodeId
, _ud_args :: DocumentUpload }
| ImportRemoteDocuments !ImportRemoteDocumentsPayload
| ImportRemoteTerms !ImportRemoteTermsPayload
deriving (Show, Eq)
instance FromJSON Job where
parseJSON = withObject "Job" $ \o -> do
......@@ -160,6 +182,8 @@ instance FromJSON Job where
return $ UploadDocument { .. }
"ImportRemoteDocuments" ->
ImportRemoteDocuments <$> parseJSON (JS.Object o)
"ImportRemoteTerms" ->
ImportRemoteTerms <$> parseJSON (JS.Object o)
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where
toJSON Ping = object [ "type" .= ("Ping" :: Text) ]
......@@ -230,6 +254,12 @@ instance ToJSON Job where
let o1 = KM.fromList [ ("type", toJSON @T.Text "ImportRemoteDocuments") ]
in JS.Object $ o1 <> o
_ -> errorTrace "impossible, toJSON ImportRemoteDocuments did not return an Object."
toJSON (ImportRemoteTerms payload) =
case toJSON payload of
(JS.Object o) ->
let o1 = KM.fromList [ ("type", toJSON @T.Text "ImportRemoteTerms") ]
in JS.Object $ o1 <> o
_ -> errorTrace "impossible, toJSON ImportRemoteTerms did not return an Object."
-- | We want to have a way to specify 'Maybe NodeId' from given worker
-- parameters. The given 'Maybe CorpusId' is an alternative, when
......@@ -253,4 +283,5 @@ getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id
getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id
getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ corpusId _)) = Just corpusId
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _)) = Just corpusId
getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment