Commit 29aa887a authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Add FetchDocumentsHAL job type

The idea is that API.get for the HAL case will produce
a data producer which will submit a list of jobs where we will be
fetching documents incrementally.
parent 00c23c73
Pipeline #7920 passed with stages
in 62 minutes and 28 seconds
...@@ -10,12 +10,17 @@ Portability : POSIX ...@@ -10,12 +10,17 @@ Portability : POSIX
-} -}
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE MultiWayIf #-}
module Gargantext.API.Node.Update module Gargantext.API.Node.Update
where where
import Control.Exception.Safe (MonadCatch)
import Control.Lens (view, (^?), _Just) import Control.Lens (view, (^?), _Just)
import Data.LanguageCodes qualified as ISO639
import Data.Set qualified as Set import Data.Set qualified as Set
import Data.Text qualified as T
import Gargantext.API.Admin.EnvTypes (Env) import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError ) import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Metrics qualified as Metrics import Gargantext.API.Metrics qualified as Metrics
...@@ -24,27 +29,41 @@ import Gargantext.API.Node.Update.Types (Method(..), UpdateNodeParams(..), Updat ...@@ -24,27 +29,41 @@ import Gargantext.API.Node.Update.Types (Method(..), UpdateNodeParams(..), Updat
import Gargantext.API.Prelude (GargM, simuLogs) import Gargantext.API.Prelude (GargM, simuLogs)
import Gargantext.API.Routes.Named.Node qualified as Named import Gargantext.API.Routes.Named.Node qualified as Named
import Gargantext.API.Worker (serveWorkerAPI) import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Core.NodeStory.Types (HasNodeStory, hasNodeStory) import Gargantext.Core.NodeStory.Types (HasNodeStory, hasNodeStory)
import Gargantext.Core.Text.Ngrams (NgramsType(NgramsTerms)) import Gargantext.Core.Text.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Core.Text.Terms
import Gargantext.Core.Types.Main (ListType(..)) import Gargantext.Core.Types.Main (ListType(..))
import Gargantext.Core.Viz.Graph.API (recomputeGraph) import Gargantext.Core.Viz.Graph.API (recomputeGraph)
import Gargantext.Core.Viz.Phylo (subConfigAPI2config, phylo_computeTime) import Gargantext.Core.Viz.Phylo (subConfigAPI2config, phylo_computeTime)
import Gargantext.Core.Viz.Phylo.API.Tools (flowPhyloAPI) import Gargantext.Core.Viz.Phylo.API.Tools (flowPhyloAPI)
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (reIndexWith) import Gargantext.Database.Action.Flow (reIndexWith, addDocumentsToHyperCorpus)
import Gargantext.Database.Action.Flow.Pairing (pairing) import Gargantext.Database.Action.Flow.Pairing (pairing)
import Gargantext.Database.Action.Flow.Types
import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContextScore) import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContextScore)
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataDocument)
import Gargantext.Database.Admin.Types.Hyperdata.Phylo ( HyperdataPhylo(HyperdataPhylo), hp_data ) import Gargantext.Database.Admin.Types.Hyperdata.Phylo ( HyperdataPhylo(HyperdataPhylo), hp_data )
import Gargantext.Database.Admin.Types.Node ( NodeId, NodeType(NodeCorpus, NodeAnnuaire, NodeTexts, NodeGraph, NodePhylo, NodeList) ) import Gargantext.Database.Admin.Types.Node ( NodeId, NodeType(NodeCorpus, NodeAnnuaire, NodeTexts, NodeGraph, NodePhylo, NodeList), CorpusId )
import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.Node (defaultList, getNode, getChildrenByType, getNodeWith) import Gargantext.Database.Query.Table.Node (defaultList, getNode, getChildrenByType, getNodeWith)
import Gargantext.Database.Query.Table.Node.Error
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata) import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_parent_id, node_hyperdata) import Gargantext.Database.Schema.Node (node_parent_id, node_hyperdata)
import Gargantext.Database.Prelude
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging ( MonadLogger ) import Gargantext.System.Logging ( MonadLogger )
import Gargantext.System.Logging (logLocM)
import Gargantext.System.Logging.Types (LogLevel(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Gargantext.Utils.UTCTime (timeMeasured) import Gargantext.Utils.UTCTime (timeMeasured)
import HAL qualified
import Servant.Server.Generic (AsServerT) import Servant.Server.Generic (AsServerT)
import Data.Conduit
import Data.Conduit.List hiding (mapM_)
import Gargantext.Core
import Gargantext.Core.Text.Corpus.API.Hal (toDoc')
import Conduit (mapMC)
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus)
------------------------------------------------------------------------ ------------------------------------------------------------------------
api :: NodeId -> Named.UpdateAPI (AsServerT (GargM Env BackendInternalError)) api :: NodeId -> Named.UpdateAPI (AsServerT (GargM Env BackendInternalError))
...@@ -207,3 +226,40 @@ updateDocs cId jobHandle = do ...@@ -207,3 +226,40 @@ updateDocs cId jobHandle = do
-- printDebug "updateContextsScore" (cId, lId, u) -- printDebug "updateContextsScore" (cId, lId, u)
pure () pure ()
fetchHALDocuments :: ( IsDBCmd env err m
, HasNodeError err
, HasNLPServer env
, FlowCorpus HyperdataDocument
, ExtractNgrams m HyperdataDocument
, MonadLogger m
, MonadCatch m
)
=> Maybe CorpusId
-> Maybe ISO639.ISO639_1
-> Text
-> Int
-> Int
-> m ()
fetchHALDocuments Nothing _query _lang _offset _limit = do
$(logLocM) ERROR $ "fetchHALDocuments failed because no corpusId was provided."
fetchHALDocuments (Just corpusId) lang query offset limit = do
docs_e <- liftBase $ HAL.getMetadataWithOptsC HAL.defaultHalOptions [query] (Just offset) (Just $ fromIntegral limit) lang
case docs_e of
Left err -> do
$(logLocM) ERROR $ T.pack (show err)
Right (_len, docsC) -> do
docs <- liftBase $ runConduit $ (docsC .| mapMC (toDoc' lang) .| consume)
-- FIXME(adn) How can we pass the TermType correctly in a serialised fashion?
void $ addDocumentsToHyperCorpus @_ @_ @_ @_ @HyperdataCorpus Nothing (Mono $ fromISOLang lang) corpusId docs
-- FIXME(adn) Implement this properly.
fromISOLang :: Maybe ISO639.ISO639_1 -> Lang
fromISOLang Nothing = EN
fromISOLang (Just l) =
if | l == ISO639.EN
-> EN
| l == ISO639.FR
-> FR
| otherwise
-> EN
{-# LANGUAGE TemplateHaskell #-}
{-| {-|
Module : Gargantext.Core.Text.Corpus.API.Hal Module : Gargantext.Core.Text.Corpus.API.Hal
Description : Pubmed API connection Description : Pubmed API connection
......
...@@ -39,7 +39,7 @@ import Gargantext.API.Node.File (addWithFile) ...@@ -39,7 +39,7 @@ import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync) import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.New (postNode') import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Types (_wtf_file_oid) import Gargantext.API.Node.Types (_wtf_file_oid)
import Gargantext.API.Node.Update (updateNode) import Gargantext.API.Node.Update (updateNode, fetchHALDocuments)
import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..)) import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync) import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging) import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging)
...@@ -50,7 +50,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET ...@@ -50,7 +50,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Viz.Graph.API (graphRecompute) import Gargantext.Core.Viz.Graph.API (graphRecompute)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId, ImportRemoteDocumentsPayload(..), ImportRemoteTermsPayload(..)) import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId, ImportRemoteDocumentsPayload(..), ImportRemoteTermsPayload(..), FetchDocumentsHALPayload (..))
import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState) import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState)
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude import Gargantext.Database.Prelude
...@@ -320,3 +320,9 @@ performAction env _s bm = do ...@@ -320,3 +320,9 @@ performAction env _s bm = do
-> runWorkerMonad env $ do -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] import remote documents" $(logLocM) DEBUG $ "[performAction] import remote documents"
void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs
-- | Fetch some documents from HAL
FetchDocumentsHAL (FetchDocumentsHALPayload corpusId lang query offset limit)
-> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] fetch documents from HAL"
fetchHALDocuments corpusId query lang offset limit
...@@ -33,6 +33,34 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams) ...@@ -33,6 +33,34 @@ import Gargantext.API.Node.Update.Types (UpdateNodeParams)
import Gargantext.Core.Types.Individu (User) import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId), ParentId) import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId), ParentId)
import Gargantext.Prelude import Gargantext.Prelude
import qualified Data.LanguageCodes as ISO639
data FetchDocumentsHALPayload
= FetchDocumentsHALPayload
{ _fdhp_corpus_id :: Maybe CorpusId
, _fdhp_query :: Text
, _fdhp_lang :: Maybe ISO639.ISO639_1
, _fdhp_offset :: Int
, _fdhp_limit :: Int
} deriving (Show, Eq)
instance ToJSON FetchDocumentsHALPayload where
toJSON FetchDocumentsHALPayload{..} =
object [ "corpus_id" .= _fdhp_corpus_id
, "query" .= _fdhp_query
, "lang" .= _fdhp_lang
, "offset" .= _fdhp_offset
, "limit" .= _fdhp_limit
]
instance FromJSON FetchDocumentsHALPayload where
parseJSON = withObject "FetchDocumentsHALPayload" $ \o -> do
_fdhp_corpus_id <- o .: "corpus_id"
_fdhp_query <- o .: "query"
_fdhp_lang <- o .: "lang"
_fdhp_offset <- o .: "offset"
_fdhp_limit <- o .: "limit"
pure FetchDocumentsHALPayload{..}
data ImportRemoteTermsPayload data ImportRemoteTermsPayload
= ImportRemoteTermsPayload = ImportRemoteTermsPayload
...@@ -134,6 +162,7 @@ data Job = ...@@ -134,6 +162,7 @@ data Job =
, _ud_args :: DocumentUpload } , _ud_args :: DocumentUpload }
| ImportRemoteDocuments !ImportRemoteDocumentsPayload | ImportRemoteDocuments !ImportRemoteDocumentsPayload
| ImportRemoteTerms !ImportRemoteTermsPayload | ImportRemoteTerms !ImportRemoteTermsPayload
| FetchDocumentsHAL !FetchDocumentsHALPayload
deriving (Show, Eq) deriving (Show, Eq)
instance FromJSON Job where instance FromJSON Job where
parseJSON = withObject "Job" $ \o -> do parseJSON = withObject "Job" $ \o -> do
...@@ -283,6 +312,12 @@ instance ToJSON Job where ...@@ -283,6 +312,12 @@ instance ToJSON Job where
let o1 = KM.fromList [ ("type", toJSON @T.Text "ImportRemoteTerms") ] let o1 = KM.fromList [ ("type", toJSON @T.Text "ImportRemoteTerms") ]
in JS.Object $ o1 <> o in JS.Object $ o1 <> o
_ -> errorTrace "impossible, toJSON ImportRemoteTerms did not return an Object." _ -> errorTrace "impossible, toJSON ImportRemoteTerms did not return an Object."
toJSON (FetchDocumentsHAL payload) =
case toJSON payload of
(JS.Object o) ->
let o1 = KM.fromList [ ("type", toJSON @T.Text "FetchDocumentsHAL") ]
in JS.Object $ o1 <> o
_ -> errorTrace "impossible, toJSON FetchDocumentsHAL did not return an Object."
-- | We want to have a way to specify 'Maybe NodeId' from given worker -- | We want to have a way to specify 'Maybe NodeId' from given worker
-- parameters. The given 'Maybe CorpusId' is an alternative, when -- parameters. The given 'Maybe CorpusId' is an alternative, when
...@@ -308,3 +343,4 @@ getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id ...@@ -308,3 +343,4 @@ getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _ _)) = Just corpusId getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _ _)) = Just corpusId
getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId
getWorkerMNodeId (FetchDocumentsHAL (FetchDocumentsHALPayload{_fdhp_corpus_id})) = _fdhp_corpus_id
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment