Commit b2f7a9a8 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Chunks the insertion of remote docs

parent 0d4e0554
Pipeline #7208 passed with stages
in 54 minutes and 17 seconds
......@@ -613,6 +613,7 @@ library
, singletons ^>= 3.0.2
, singletons-th >= 3.1 && < 3.2
, smtp-mail >= 0.3.0.0
, split >= 0.2.0
, stemmer == 0.5.2
, stm >= 2.5.1.0 && < 2.6
, stm-containers >= 1.2.0.3 && < 1.3
......
......@@ -22,7 +22,6 @@ import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError (..) )
import Gargantext.API.Node.Corpus.New (commitCorpus)
import Gargantext.API.Node.Document.Export.Types ( Document(..))
import Gargantext.API.Node.Document.Export.Types (DocumentExport(..))
import Gargantext.API.Node.DocumentUpload.Types
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.Document qualified as Named
......@@ -34,6 +33,7 @@ import Gargantext.Core.Text.Corpus.Parsers.Date (mDateSplit)
import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Core.Worker.Jobs.Types (WorkSplit(..))
import Gargantext.Database.Action.Flow (addDocumentsToHyperCorpus)
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataCorpus )
......@@ -116,13 +116,15 @@ remoteImportDocuments :: ( HasNodeError err
=> AuthenticatedUser
-> ParentId
-> NodeId
-> DocumentExport
-> WorkSplit
-> [Document]
-- ^ Total docs
-> m [NodeId]
remoteImportDocuments loggedInUser corpusId nodeId (DocumentExport documents _gargVersion) = do
remoteImportDocuments loggedInUser corpusId nodeId WorkSplit{..} documents = do
let la = Multi EN
nlpServerConfig <- view $ nlpServerGet (_tt_lang la)
$(logLocM) INFO $ "Importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
$(logLocM) INFO $ "Importing " <> T.pack (show _ws_current) <> "/" <> T.pack (show _ws_total) <> " documents for corpus node " <> T.pack (show nodeId)
docs <- addDocumentsToHyperCorpus nlpServerConfig (Nothing :: Maybe HyperdataCorpus) la corpusId (map (_node_hyperdata . _d_document) documents)
_versioned <- commitCorpus corpusId (RootId $ _auth_node_id loggedInUser)
$(logLocM) INFO $ "Done importing " <> T.pack (show $ length documents) <> " documents for corpus node " <> T.pack (show nodeId)
$(logLocM) INFO $ "Done importing " <> T.pack (show _ws_current) <> "/" <> T.pack (show _ws_total) <> " documents for corpus node " <> T.pack (show nodeId)
pure docs
......@@ -16,13 +16,14 @@ import Control.Exception.Safe qualified as Safe
import Control.Exception (toException)
import Control.Lens (view, (#))
import Control.Monad.Except (throwError, MonadError)
import Control.Monad (void, liftM2)
import Control.Monad (void, liftM2, forM_)
import Data.Aeson qualified as JSON
import Data.ByteString.Builder qualified as B
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Foldable (for_, foldlM)
import Data.List.Split qualified as Split
import Data.Text qualified as T
import Gargantext.API.Admin.Auth
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(..))
......@@ -118,8 +119,15 @@ remoteImportHandler loggedInUser c = do
$(logLocM) INFO $ "Created a new node " <> T.pack (show $ new_node) <> " of type " <> T.pack (show ty)
for_ (liftM2 (,) mb_docs mb_parent) $ \(docsList, parentId) -> do
$(logLocM) INFO $ "Found document list to import..."
let payload = Jobs.ImportRemoteDocumentsPayload loggedInUser parentId new_node docsList
void $ sendJob $ Jobs.ImportRemoteDocuments payload
let totalDocs = _de_documents docsList
let chunks = Split.chunksOf 100 totalDocs
forM_ (zip [1..] chunks) $ \(local_ix, chunk) -> do
let ws = Jobs.WorkSplit
{ Jobs._ws_current = min (length totalDocs) (((local_ix - 1) * length chunk) + length chunk)
, Jobs._ws_total = length totalDocs
}
let payload = Jobs.ImportRemoteDocumentsPayload loggedInUser parentId new_node chunk ws
void $ sendJob $ Jobs.ImportRemoteDocuments payload
for_ mb_terms $ \ngramsList -> do
$(logLocM) INFO $ "Found ngrams list to import..."
void $ sendJob $ Jobs.ImportRemoteTerms $ Jobs.ImportRemoteTermsPayload new_node ngramsList
......
......@@ -309,7 +309,7 @@ performAction env _state bm = do
$(logLocM) DEBUG $ "Done updating node scores for corpus node " <> T.pack (show list_id)
-- | Remotely import documents
ImportRemoteDocuments (ImportRemoteDocumentsPayload loggedInUser parentId corpusId docs)
ImportRemoteDocuments (ImportRemoteDocumentsPayload loggedInUser parentId corpusId docs workSplit)
-> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote documents"
void $ remoteImportDocuments loggedInUser parentId corpusId docs
void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs
......@@ -23,7 +23,7 @@ import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, ForgotPasswordAsyncPa
import Gargantext.API.Ngrams.Types (NgramsList, UpdateTableNgramsCharts(_utn_list_id))
import Gargantext.API.Node.Contact.Types (AddContactParams)
import Gargantext.API.Node.Corpus.Annuaire (AnnuaireWithForm)
import Gargantext.API.Node.Document.Export.Types (DocumentExport)
import Gargantext.API.Node.Document.Export.Types (Document)
import Gargantext.API.Node.DocumentsFromWriteNodes.Types qualified as DFWN
import Gargantext.API.Node.DocumentUpload.Types (DocumentUpload)
import Gargantext.API.Node.FrameCalcUpload.Types (FrameCalcUpload)
......@@ -52,12 +52,30 @@ instance FromJSON ImportRemoteTermsPayload where
_irtp_ngrams_list <- o .: "ngrams_list"
pure ImportRemoteTermsPayload{..}
data WorkSplit
= WorkSplit { _ws_current :: Int, _ws_total :: Int }
deriving (Show, Eq)
instance ToJSON WorkSplit where
toJSON WorkSplit{..} =
object [ "current" .= _ws_current
, "total" .= _ws_total
]
instance FromJSON WorkSplit where
parseJSON = withObject "WorkSplit" $ \o -> do
_ws_current <- o .: "current"
_ws_total <- o .: "total"
pure WorkSplit{..}
data ImportRemoteDocumentsPayload
= ImportRemoteDocumentsPayload
{ _irdp_user :: AuthenticatedUser
, _irdp_parent_id :: ParentId
, _irdp_corpus_id :: NodeId
, _irdp_document_export :: DocumentExport
, _irdp_documents :: [Document]
-- | Useful to compute total progress in logs.
, _irdp_work_split :: WorkSplit
} deriving (Show, Eq)
instance ToJSON ImportRemoteDocumentsPayload where
......@@ -65,7 +83,8 @@ instance ToJSON ImportRemoteDocumentsPayload where
object [ "user" .= _irdp_user
, "corpus_id" .= _irdp_corpus_id
, "parent_id" .= _irdp_parent_id
, "document_export" .= _irdp_document_export
, "documents" .= _irdp_documents
, "work_split" .= _irdp_work_split
]
instance FromJSON ImportRemoteDocumentsPayload where
......@@ -73,7 +92,8 @@ instance FromJSON ImportRemoteDocumentsPayload where
_irdp_user <- o .: "user"
_irdp_parent_id <- o .: "parent_id"
_irdp_corpus_id <- o .: "corpus_id"
_irdp_document_export <- o .: "document_export"
_irdp_documents <- o .: "documents"
_irdp_work_split <- o .: "work_split"
pure ImportRemoteDocumentsPayload{..}
data Job =
......@@ -283,5 +303,5 @@ getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id
getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id
getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _)) = Just corpusId
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _ _)) = Just corpusId
getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment