Commit 046f3b91 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Restore functionality of insertMasterDocs

Now the tests pass again, but crucially `insertMasterDocs` runs in a
single atomic DB update, meaning we can rollback cleanly in case
disaster strikes.
parent 47aa6cf9
Pipeline #7647 passed with stages
in 44 minutes and 12 seconds
...@@ -15,6 +15,7 @@ Portability : POSIX ...@@ -15,6 +15,7 @@ Portability : POSIX
-- TODO-EVENTS: InsertedNodes -- TODO-EVENTS: InsertedNodes
-} -}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE ConstrainedClassMethods #-} {-# LANGUAGE ConstrainedClassMethods #-}
{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DerivingStrategies #-}
...@@ -25,7 +26,6 @@ Portability : POSIX ...@@ -25,7 +26,6 @@ Portability : POSIX
{-# LANGUAGE TupleSections #-} {-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
{-# LANGUAGE BangPatterns #-}
module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
( DataText(..) ( DataText(..)
...@@ -56,6 +56,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -56,6 +56,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
import Conduit import Conduit
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Monad.Catch
import Data.Conduit qualified as C import Data.Conduit qualified as C
import Data.Conduit.Internal (zipSources) import Data.Conduit.Internal (zipSources)
import Data.Conduit.List qualified as CL import Data.Conduit.List qualified as CL
...@@ -74,6 +75,7 @@ import Gargantext.Core.Ext.IMTUser (readFile_Annuaire) ...@@ -74,6 +75,7 @@ import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet) import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
import Gargantext.Core.NodeStory.Types (HasNodeStory, NodeStoryEnv, HasNodeStoryEnv (..)) import Gargantext.Core.NodeStory.Types (HasNodeStory, NodeStoryEnv, HasNodeStoryEnv (..))
import Gargantext.Core.Notifications.CentralExchange.Types (HasCentralExchangeNotification(ce_notify), CEMessage(..)) import Gargantext.Core.Notifications.CentralExchange.Types (HasCentralExchangeNotification(ce_notify), CEMessage(..))
import Gargantext.Core.Text (HasText)
import Gargantext.Core.Text.Corpus.API qualified as API import Gargantext.Core.Text.Corpus.API qualified as API
import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType) import Gargantext.Core.Text.Corpus.Parsers (parseFile, FileFormat, FileType)
import Gargantext.Core.Text.List (buildNgramsLists) import Gargantext.Core.Text.List (buildNgramsLists)
...@@ -99,7 +101,7 @@ import Gargantext.Database.Prelude ...@@ -99,7 +101,7 @@ import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 ) import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith ) import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add) import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add)
import Gargantext.Database.Query.Table.Node.Document.Insert ( ToNode(toNode), UniqParameters (..) ) -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..)) import Gargantext.Database.Query.Table.Node.Document.Insert ( ToNode(toNode), UniqParameters (..), newUniqIdHash ) -- (insertDocuments, ReturnId(..), addUniqIdsDoc, addUniqIdsContact, ToDbData(..))
import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..)) import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId) import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId)
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId) import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
...@@ -110,7 +112,6 @@ import Gargantext.Database.Types ...@@ -110,7 +112,6 @@ import Gargantext.Database.Types
import Gargantext.Prelude hiding (catch, onException, to) import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..) ) import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..) )
import Control.Monad.Catch
------------------------------------------------------------------------ ------------------------------------------------------------------------
-- Imports for upgrade function -- Imports for upgrade function
...@@ -257,7 +258,7 @@ flowCorpus :: ( IsDBCmd env err m ...@@ -257,7 +258,7 @@ flowCorpus :: ( IsDBCmd env err m
, FlowCorpus a , FlowCorpus a
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env, Show a )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
...@@ -278,7 +279,7 @@ flow :: forall env err m a c. ...@@ -278,7 +279,7 @@ flow :: forall env err m a c.
, MkCorpus c , MkCorpus c
, MonadJobStatus m , MonadJobStatus m
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
, MonadCatch m , MonadCatch m, Show a
) )
=> Maybe c => Maybe c
-> MkCorpusUser -> MkCorpusUser
...@@ -318,7 +319,7 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m ...@@ -318,7 +319,7 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m
, FlowCorpus document , FlowCorpus document
, MkCorpus corpus , MkCorpus corpus
, MonadLogger m , MonadLogger m
, MonadCatch m , MonadCatch m, Show document
) )
=> Maybe corpus => Maybe corpus
-> TermType Lang -> TermType Lang
...@@ -452,17 +453,24 @@ buildSocialList l user userCorpusId listId ctype = \case ...@@ -452,17 +453,24 @@ buildSocialList l user userCorpusId listId ctype = \case
type CommittedNgrams = type CommittedNgrams =
HashMap.HashMap ExtractedNgrams (Map NgramsType (Map NodeId (TermsWeight, TermsCount))) HashMap.HashMap ExtractedNgrams (Map NgramsType (Map NodeId (TermsWeight, TermsCount)))
newtype DocumentHashId
= DocumentHashId { _DocumentHashId :: T.Text }
deriving stock (Show, Eq)
deriving newtype Ord
-- | Ngrams that have been extracted from the input 'doc' but not fully associated with -- | Ngrams that have been extracted from the input 'doc' but not fully associated with
-- a persisted entity on the database. -- a persisted entity on the database.
newtype UncommittedNgrams doc = UncommittedNgrams newtype UncommittedNgrams doc = UncommittedNgrams
{ _UncommittedNgrams :: HashMap.HashMap ContextId (DocumentIdWithNgrams ContextId doc ExtractedNgrams) } { _UncommittedNgrams :: Map.Map DocumentHashId (DocumentIdWithNgrams DocumentHashId doc ExtractedNgrams) }
deriving stock Show deriving stock Show
deriving newtype (Semigroup, Monoid) deriving newtype (Semigroup, Monoid)
data InsertDocError data InsertDocError
= NgramsNotFound !ContextId !DocId = NgramsNotFound !(Maybe DocumentHashId) !DocId
deriving Show
extractNgramsFromDocument :: ( UniqParameters doc extractNgramsFromDocument :: ( UniqParameters doc
, HasText doc
, ExtractNgrams doc , ExtractNgrams doc
, IsDBCmd err env m , IsDBCmd err env m
, MonadLogger m , MonadLogger m
...@@ -477,34 +485,38 @@ extractNgramsFromDocument nlpServer lang doc = ...@@ -477,34 +485,38 @@ extractNgramsFromDocument nlpServer lang doc =
-- but still index it in the final map, so that later reconciliation still works. -- but still index it in the final map, so that later reconciliation still works.
-- Pratically speaking it means this won't have any ngrams associated, but the document -- Pratically speaking it means this won't have any ngrams associated, but the document
-- will still be added to the corpus and we can try to regen the ngrams at a later stage. -- will still be added to the corpus and we can try to regen the ngrams at a later stage.
UncommittedNgrams . HashMap.singleton docId <$> UncommittedNgrams . Map.singleton docId <$>
(documentIdWithNgrams (extractNgrams nlpServer $ withLang lang [doc]) (Indexed docId doc) (documentIdWithNgrams (extractNgrams nlpServer $ withLang lang [doc]) (Indexed docId doc)
`catch` \(e :: SomeException) -> do `catch` \(e :: SomeException) -> do
$(logLocM) ERROR $ T.pack $ "Document with hash " <> show docId <> " failed ngrams extraction due to an exception: " <> displayException e $(logLocM) ERROR $ T.pack $ "Document with hash " <> show docId <> " failed ngrams extraction due to an exception: " <> displayException e
pure $ DocumentIdWithNgrams (Indexed docId doc) mempty pure $ DocumentIdWithNgrams (Indexed docId doc) mempty
) )
where where
docId = UnsafeMkContextId $ hash $ uniqParameters doc docId = DocumentHashId $ newUniqIdHash doc
commitNgramsForDocument :: UniqParameters doc commitNgramsForDocument :: UniqParameters doc
=> UncommittedNgrams doc => UncommittedNgrams doc
-> Node doc -> Indexed ContextId (Node doc)
-> Either InsertDocError CommittedNgrams -> Either InsertDocError CommittedNgrams
commitNgramsForDocument (UncommittedNgrams ng) node = commitNgramsForDocument (UncommittedNgrams ng) (Indexed oldIx node) = do
case HashMap.lookup docId ng of docId <- mb_docId
Nothing -> Left $ NgramsNotFound docId (_node_id node) case Map.lookup docId ng of
Nothing -> Left $ NgramsNotFound (Just docId) (_node_id node)
Just ngs -> Right $ mkNodeIdNgramsMap [reIndex ngs] Just ngs -> Right $ mkNodeIdNgramsMap [reIndex ngs]
where where
docId = UnsafeMkContextId $ hash $ uniqParameters $ _node_hyperdata node mb_docId = case DocumentHashId <$> _node_hash_id node of
Nothing -> Left $ NgramsNotFound Nothing (_node_id node)
Just dId -> Right dId
reIndex :: DocumentIdWithNgrams ContextId doc ExtractedNgrams reIndex :: DocumentIdWithNgrams DocumentHashId doc ExtractedNgrams
-> DocumentIdWithNgrams NodeId doc ExtractedNgrams -> DocumentIdWithNgrams NodeId doc ExtractedNgrams
reIndex did = reIndex did =
let (Indexed _ a) = documentWithId did let (Indexed _ a) = documentWithId did
in did { documentWithId = Indexed (_node_id node) a } in did { documentWithId = Indexed (contextId2NodeId oldIx) a }
extractNgramsFromDocuments :: forall doc env err m. extractNgramsFromDocuments :: forall doc env err m.
( UniqParameters doc ( HasText doc
, UniqParameters doc
, ExtractNgrams doc , ExtractNgrams doc
, IsDBCmd env err m , IsDBCmd env err m
, MonadLogger m , MonadLogger m
...@@ -524,7 +536,7 @@ extractNgramsFromDocuments nlpServer lang docs = ...@@ -524,7 +536,7 @@ extractNgramsFromDocuments nlpServer lang docs =
commitNgramsForDocuments :: UniqParameters doc commitNgramsForDocuments :: UniqParameters doc
=> UncommittedNgrams doc => UncommittedNgrams doc
-> [Node doc] -> [Indexed ContextId (Node doc)]
-> ([InsertDocError], CommittedNgrams) -> ([InsertDocError], CommittedNgrams)
commitNgramsForDocuments ng nodes = commitNgramsForDocuments ng nodes =
let (errs, successes) = partitionEithers $ map (commitNgramsForDocument ng) nodes let (errs, successes) = partitionEithers $ map (commitNgramsForDocument ng) nodes
...@@ -533,7 +545,7 @@ commitNgramsForDocuments ng nodes = ...@@ -533,7 +545,7 @@ commitNgramsForDocuments ng nodes =
insertMasterDocs :: ( HasNodeError err insertMasterDocs :: ( HasNodeError err
, UniqParameters doc , UniqParameters doc
, FlowCorpus doc , FlowCorpus doc
, MkCorpus c , MkCorpus c, Show doc
) )
=> GargConfig => GargConfig
-> UncommittedNgrams doc -> UncommittedNgrams doc
...@@ -554,7 +566,7 @@ insertMasterDocs cfg uncommittedNgrams c hs = do ...@@ -554,7 +566,7 @@ insertMasterDocs cfg uncommittedNgrams c hs = do
-- add documents to the corpus (create node_node link) -- add documents to the corpus (create node_node link)
-- this will enable global database monitoring -- this will enable global database monitoring
let (_failedExtraction, ngramsDocsMap) = commitNgramsForDocuments uncommittedNgrams (map _unIndex documentsWithId) let (_failedExtraction, ngramsDocsMap) = commitNgramsForDocuments uncommittedNgrams documentsWithId
lId <- getOrMkList masterCorpusId masterUserId lId <- getOrMkList masterCorpusId masterUserId
_ <- saveDocNgramsWith lId ngramsDocsMap _ <- saveDocNgramsWith lId ngramsDocsMap
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment