Commit 51c37532 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

addDocumentsToHyperCorpus emits warnings for failed doc insertions

This commit just does the required refactoring to propagate insertion
failures as part of `insertMasterDocs` and emit warnings inside
`addDocumentsToHyperCorpus`, but it doesn't yet catch and handle errors
in the former.
parent ca7f0f26
......@@ -122,15 +122,17 @@ insertSearxResponse :: ( MonadBase IO m
, HasValidationError err
, MonadCatch m
, MonadLogger m
, MonadJobStatus m
)
=> User
=> JobHandle m
-> User
-> CorpusId
-> ListId
-> Lang
-> Either Prelude.String SearxResponse
-> m ()
insertSearxResponse _ _ _ _ (Left _) = pure ()
insertSearxResponse user cId listId l (Right (SearxResponse { _srs_results })) = do
insertSearxResponse _ _ _ _ _ (Left _) = pure ()
insertSearxResponse jobHandle user cId listId l (Right (SearxResponse { _srs_results })) = do
-- docs :: [Either Text HyperdataDocument]
let docs = hyperdataDocumentFromSearxResult l <$> _srs_results
--printDebug "[triggerSearxSearch] docs" docs
......@@ -145,7 +147,7 @@ insertSearxResponse user cId listId l (Right (SearxResponse { _srs_results })) =
-}
--_ <- flowDataText user (DataNew [docs']) (Multi l) cId Nothing logStatus
let mCorpus = Nothing :: Maybe HyperdataCorpus
void $ addDocumentsToHyperCorpus mCorpus (Multi l) cId docs'
void $ addDocumentsToHyperCorpus jobHandle mCorpus (Multi l) cId docs'
_ <- buildSocialList l user cId listId mCorpus Nothing
......@@ -197,7 +199,7 @@ triggerSearxSearch user cId q l jobHandle = do
, _fsp_query = Query.getRawQuery q
, _fsp_url = surl }
insertSearxResponse user cId listId l res
insertSearxResponse jobHandle user cId listId l res
markProgress page jobHandle
) [1..numPages]
......
......@@ -63,15 +63,16 @@ documentUploadAsync :: (FlowCmdM env err m, MonadJobStatus m, MonadCatch m)
-> m ()
documentUploadAsync nId doc jobHandle = do
markStarted 1 jobHandle
_docIds <- documentUpload nId doc
_docIds <- documentUpload jobHandle nId doc
-- printDebug "documentUploadAsync" docIds
markComplete jobHandle
documentUpload :: (FlowCmdM env err m, MonadCatch m)
=> NodeId
documentUpload :: (FlowCmdM env err m, MonadCatch m, MonadJobStatus m)
=> JobHandle m
-> NodeId
-> DocumentUpload
-> m [DocId]
documentUpload nId doc = do
documentUpload jobHandle nId doc = do
mcId <- runDBQuery $ getClosestParentIdByType' nId NodeCorpus
let cId = case mcId of
Just c -> c
......@@ -100,7 +101,7 @@ documentUpload nId doc = do
, _hd_institutes_tree = Nothing }
let lang = EN
addDocumentsToHyperCorpus (Nothing :: Maybe HyperdataCorpus) (Multi lang) cId [hd]
addDocumentsToHyperCorpus jobHandle (Nothing :: Maybe HyperdataCorpus) (Multi lang) cId [hd]
-- | Imports the documents contained into this 'DocumentExport' into this (local) version
-- of the running node.
......@@ -110,20 +111,22 @@ remoteImportDocuments :: ( HasNodeError err
, HasNLPServer env
, HasNodeStoryEnv env err
, IsDBCmd env err m
, MonadJobStatus m
, MonadLogger m
, MonadCatch m
, MonadIO m)
=> AuthenticatedUser
=> JobHandle m
-> AuthenticatedUser
-> ParentId
-> NodeId
-> WorkSplit
-> [Document]
-- ^ Total docs
-> m [NodeId]
remoteImportDocuments loggedInUser corpusId nodeId WorkSplit{..} documents = do
remoteImportDocuments jobHandle loggedInUser corpusId nodeId WorkSplit{..} documents = do
let la = Multi EN
$(logLocM) INFO $ "Importing " <> T.pack (show _ws_current) <> "/" <> T.pack (show _ws_total) <> " documents for corpus node " <> T.pack (show nodeId)
docs <- addDocumentsToHyperCorpus (Nothing :: Maybe HyperdataCorpus) la corpusId (map (_node_hyperdata . _d_document) documents)
docs <- addDocumentsToHyperCorpus jobHandle (Nothing :: Maybe HyperdataCorpus) la corpusId (map (_node_hyperdata . _d_document) documents)
_versioned <- commitCorpus corpusId (RootId $ _auth_node_id loggedInUser)
$(logLocM) INFO $ "Done importing " <> T.pack (show _ws_current) <> "/" <> T.pack (show _ws_total) <> " documents for corpus node " <> T.pack (show nodeId)
pure docs
......@@ -319,4 +319,4 @@ performAction env _s bm = do
ImportRemoteDocuments (ImportRemoteDocumentsPayload loggedInUser parentId corpusId docs workSplit)
-> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote documents"
void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs
void $ remoteImportDocuments jh loggedInUser parentId corpusId workSplit docs
......@@ -112,7 +112,9 @@ import Gargantext.Database.Schema.Node
import Gargantext.Database.Types
import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..) )
import Gargantext.Utils.Jobs.Monad ( markFailureNoErr )
import Servant.Client.Core (ClientError)
------------------------------------------------------------------------
......@@ -316,7 +318,6 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
]
let u = userFromMkCorpusUser mkCorpusUser
$(logLocM) DEBUG "Calling flowCorpusUser"
flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
......@@ -324,12 +325,13 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
addDocumentsWithProgress :: CorpusId -> [(Int, a)] -> m ()
addDocumentsWithProgress userCorpusId docsChunk = do
$(logLocM) DEBUG $ T.pack $ "calling insertDoc, ([idx], mLength) = " <> show (fst <$> docsChunk, count)
docs <- addDocumentsToHyperCorpus c la userCorpusId (map snd docsChunk)
docs <- addDocumentsToHyperCorpus jobHandle c la userCorpusId (map snd docsChunk)
markProgress (length docs) jobHandle
-- | Given a list of corpus documents and a 'NodeId' identifying the 'CorpusId', adds
-- the given documents to the corpus. Returns the Ids of the inserted documents.
-- the given documents to the corpus. Returns the Ids of the inserted documents alongside
-- a list of documents that failed extraction for some reason
addDocumentsToHyperCorpus :: ( IsDBCmd env err m
, HasNodeError err
, HasNLPServer env
......@@ -338,25 +340,28 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m
, MkCorpus corpus
, MonadLogger m
, MonadCatch m
, MonadJobStatus m
)
=> Maybe corpus
=> JobHandle m
-> Maybe corpus
-> TermType Lang
-> CorpusId
-> [document]
-> m [DocId]
addDocumentsToHyperCorpus mb_hyper la corpusId docs = do
addDocumentsToHyperCorpus jobHandle mb_hyper la corpusId docs = do
cfg <- view hasConfig
nlp <- view (nlpServerGet $ _tt_lang la)
-- First extract all the ngrams for the input documents via the nlp server,
-- collect errors (if any) and pass to 'insertMasterDocs' only the documents
-- for which the ngrams extraction succeeded. At the moment errors are just
-- logged, but in the future they could be returned upstream so that we can
-- display a final result of how many were skipped, how many succeded etc.
-- log errors (if any) and pass the final result to 'insertMasterDocs'.
uncommittedNgrams <- extractNgramsFromDocuments nlp la docs
runDBTx $ do
ids <- insertMasterDocs cfg uncommittedNgrams mb_hyper docs
void $ Doc.add corpusId (map nodeId2ContextId ids)
pure ids
(failures, ids) <- runDBTx $ do
(f,i) <- insertMasterDocs cfg uncommittedNgrams mb_hyper docs
void $ Doc.add corpusId (map nodeId2ContextId i)
pure (f,i)
forM_ failures $ \failure -> do
let msg = UnsafeMkHumanFriendlyErrorText (T.pack $ "Skipping adding document to hypercorpus due to " <> displayException failure)
emitWarning msg jobHandle
pure ids
------------------------------------------------------------------------
createNodes :: ( HasNodeError err
......@@ -485,8 +490,16 @@ newtype UncommittedNgrams doc = UncommittedNgrams
data InsertDocError
= NgramsNotFound !(Maybe DocumentHashId) !DocId
| DocumentInsertionError
deriving Show
instance Exception InsertDocError where
displayException = \case
NgramsNotFound _mb_hashId docId
-> "Couldn't find the associated ngrams for input document " <> show docId
DocumentInsertionError
-> "Document failed to be saved inside the database."
extractNgramsFromDocument :: ( UniqParameters doc
, HasText doc
, ExtractNgrams m doc
......@@ -573,7 +586,7 @@ insertMasterDocs :: ( HasNodeError err
-- with the node being created.
-> Maybe c
-> [doc]
-> DBUpdate err [DocId]
-> DBUpdate err ([InsertDocError], [DocId])
insertMasterDocs cfg uncommittedNgrams c hs = do
(masterUserId, _, masterCorpusId) <- getOrMkRootWithCorpus cfg MkCorpusUserMaster c
(ids', documentsWithId) <- insertDocs masterUserId masterCorpusId (map (toNode masterUserId Nothing) hs )
......@@ -588,7 +601,7 @@ insertMasterDocs cfg uncommittedNgrams c hs = do
lId <- getOrMkList masterCorpusId masterUserId
_ <- saveDocNgramsWith lId ngramsDocsMap
pure $ map contextId2NodeId ids'
pure $ ([], map contextId2NodeId ids') --FIXME: populate errors
saveDocNgramsWith :: ListId
......
......@@ -31,7 +31,7 @@ import Test.API.Routes (get_corpus_sqlite_export)
import Test.API.Setup (withTestDBAndPort, dbEnvSetup, SpecContext (..))
import Test.API.UpdateList (createFortranDocsList)
import Test.Database.Operations.DocumentSearch (exampleDocument_01, exampleDocument_02)
import Test.Database.Types (runTestMonad)
import Test.Database.Types (runTestMonad, test_job_handle)
import Test.Hspec
import Test.Hspec.Wai.Internal (withApplication)
import Test.Utils (withValidLogin)
......@@ -54,7 +54,7 @@ tests = sequential $ around withTestDBAndPort $ beforeWith dbEnvSetup $ do
let docs = [ exampleDocument_01, exampleDocument_02 ]
let lang = EN
_ <- addDocumentsToHyperCorpus (Just $ corpus ^. node_hyperdata) (Multi lang) corpusId docs
_ <- addDocumentsToHyperCorpus (test_job_handle $ _sctx_env ctx) (Just $ corpus ^. node_hyperdata) (Multi lang) corpusId docs
(CorpusSQLiteData { .. }) <- mkCorpusSQLiteData corpusId Nothing
......
......@@ -131,7 +131,8 @@ addCorpusDocuments env = runTestMonad env $ do
let lang = EN
let docs = [exampleDocument_01, exampleDocument_02, exampleDocument_03, exampleDocument_04]
_ <- addDocumentsToHyperCorpus (Just $ _node_hyperdata $ corpus)
_ <- addDocumentsToHyperCorpus (test_job_handle env)
(Just $ _node_hyperdata $ corpus)
(Multi lang)
corpusId
docs
......
......@@ -145,6 +145,7 @@ setup = do
, test_nodeStory
, test_usernameGen = ugen
, test_logger = logger
, test_job_handle = TestNoJobHandle
, test_worker_tid
}
......
......@@ -61,6 +61,7 @@ data TestEnv = TestEnv {
, test_usernameGen :: !Counter
, test_logger :: !(Logger (GargM TestEnv BackendInternalError))
, test_worker_tid :: !ThreadId
, test_job_handle :: !(JobHandle (TestMonadM TestEnv BackendInternalError))
}
newtype TestMonadM env err a = TestMonad { _TestMonad :: ExceptT err (ReaderT env IO) a }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment