Commit 9dd874c4 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Barebone but buggy prototype

parent 0d1d0886
Pipeline #7921 passed with stages
in 57 minutes and 11 seconds
...@@ -146,7 +146,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q ...@@ -146,7 +146,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
let db = datafield2origin datafield let db = datafield2origin datafield
-- mPubmedAPIKey <- getUserPubmedAPIKey user -- mPubmedAPIKey <- getUserPubmedAPIKey user
-- printDebug "[addToCorpusWithQuery] mPubmedAPIKey" mPubmedAPIKey -- printDebug "[addToCorpusWithQuery] mPubmedAPIKey" mPubmedAPIKey
eTxt <- getDataText db (Multi l) q maybeLimit eTxt <- getDataText cid db (Multi l) q maybeLimit
-- printDebug "[G.A.N.C.New] lTxts" lTxts -- printDebug "[G.A.N.C.New] lTxts" lTxts
case eTxt of case eTxt of
......
...@@ -57,12 +57,10 @@ import Gargantext.System.Logging.Types (LogLevel(..)) ...@@ -57,12 +57,10 @@ import Gargantext.System.Logging.Types (LogLevel(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Gargantext.Utils.UTCTime (timeMeasured) import Gargantext.Utils.UTCTime (timeMeasured)
import HAL qualified import HAL qualified
import HAL.Types qualified as HAL
import Servant.Server.Generic (AsServerT) import Servant.Server.Generic (AsServerT)
import Data.Conduit
import Data.Conduit.List hiding (mapM_)
import Gargantext.Core import Gargantext.Core
import Gargantext.Core.Text.Corpus.API.Hal (toDoc') import Gargantext.Core.Text.Corpus.API.Hal (toDoc')
import Conduit (mapMC)
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus) import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus)
------------------------------------------------------------------------ ------------------------------------------------------------------------
...@@ -234,21 +232,19 @@ fetchHALDocuments :: ( IsDBCmd env err m ...@@ -234,21 +232,19 @@ fetchHALDocuments :: ( IsDBCmd env err m
, MonadLogger m , MonadLogger m
, MonadCatch m , MonadCatch m
) )
=> Maybe CorpusId => CorpusId
-> Maybe ISO639.ISO639_1 -> Maybe ISO639.ISO639_1
-> Text -> Text
-> Int -> Int
-> Int -> Int
-> m () -> m ()
fetchHALDocuments Nothing _query _lang _offset _limit = do fetchHALDocuments corpusId lang query offset limit = do
$(logLocM) ERROR $ "fetchHALDocuments failed because no corpusId was provided." docs_e <- liftBase $ HAL.getMetadataWith [query] (Just offset) (Just $ fromIntegral limit) lang
fetchHALDocuments (Just corpusId) lang query offset limit = do
docs_e <- liftBase $ HAL.getMetadataWithOptsC HAL.defaultHalOptions [query] (Just offset) (Just $ fromIntegral limit) lang
case docs_e of case docs_e of
Left err -> do Left err -> do
$(logLocM) ERROR $ T.pack (show err) $(logLocM) ERROR $ T.pack (show err)
Right (_len, docsC) -> do Right HAL.Response{HAL._docs} -> do
docs <- liftBase $ runConduit $ (docsC .| mapMC (toDoc' lang) .| consume) docs <- mapM (liftBase . toDoc' lang) _docs
-- FIXME(adn) How can we pass the TermType correctly in a serialised fashion? -- FIXME(adn) How can we pass the TermType correctly in a serialised fashion?
void $ addDocumentsToHyperCorpus @_ @_ @_ @_ @HyperdataCorpus Nothing (Mono $ fromISOLang lang) corpusId docs void $ addDocumentsToHyperCorpus @_ @_ @_ @_ @HyperdataCorpus Nothing (Mono $ fromISOLang lang) corpusId docs
......
...@@ -38,6 +38,7 @@ import Gargantext.Prelude hiding (get) ...@@ -38,6 +38,7 @@ import Gargantext.Prelude hiding (get)
import Gargantext.Utils.Jobs.Error import Gargantext.Utils.Jobs.Error
import Servant.Client (ClientError) import Servant.Client (ClientError)
import Gargantext.Database.Action.Flow.Types import Gargantext.Database.Action.Flow.Types
import Gargantext.Core.Types (CorpusId)
data GetCorpusError data GetCorpusError
= -- | We couldn't parse the user input query into something meaningful. = -- | We couldn't parse the user input query into something meaningful.
...@@ -54,7 +55,8 @@ instance ToHumanFriendlyError GetCorpusError where ...@@ -54,7 +55,8 @@ instance ToHumanFriendlyError GetCorpusError where
"There was a network problem while contacting the " <> T.pack (show api) <> " API provider. Please try again later or contact your network administrator." "There was a network problem while contacting the " <> T.pack (show api) <> " API provider. Please try again later or contact your network administrator."
-- | Get External API metadata main function -- | Get External API metadata main function
get :: ExternalAPIs get :: CorpusId
-> ExternalAPIs
-> Lang -> Lang
-- ^ A user-selected language in which documents needs to be retrieved. -- ^ A user-selected language in which documents needs to be retrieved.
-- If the provider doesn't support the search filtered by language, or if the language -- If the provider doesn't support the search filtered by language, or if the language
...@@ -64,7 +66,7 @@ get :: ExternalAPIs ...@@ -64,7 +66,7 @@ get :: ExternalAPIs
-> Maybe Corpus.Limit -> Maybe Corpus.Limit
-- -> IO [HyperdataDocument] -- -> IO [HyperdataDocument]
-> IO (Either GetCorpusError (ResultsCount, DataProducer IO HyperdataDocument)) -> IO (Either GetCorpusError (ResultsCount, DataProducer IO HyperdataDocument))
get externalAPI lang q epoAPIUrl limit = do get corpusId externalAPI lang q epoAPIUrl limit = do
-- For PUBMED, HAL, IsTex, Isidore and OpenAlex, we want to send the query as-it. -- For PUBMED, HAL, IsTex, Isidore and OpenAlex, we want to send the query as-it.
-- For Arxiv we parse the query into a structured boolean query we submit over. -- For Arxiv we parse the query into a structured boolean query we submit over.
case externalAPI of case externalAPI of
...@@ -80,7 +82,7 @@ get externalAPI lang q epoAPIUrl limit = do ...@@ -80,7 +82,7 @@ get externalAPI lang q epoAPIUrl limit = do
-- let's create a data producer that spins out separate jobs, and process batches -- let's create a data producer that spins out separate jobs, and process batches
-- of 25 documents at the time. -- of 25 documents at the time.
first (ExternalAPIError externalAPI) <$> first (ExternalAPIError externalAPI) <$>
HAL.getDataProducer (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) HAL.getDataProducer corpusId (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
IsTex -> do IsTex -> do
docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs) pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs)
......
...@@ -27,8 +27,8 @@ import HAL qualified ...@@ -27,8 +27,8 @@ import HAL qualified
import HAL.Doc.Document qualified as HAL import HAL.Doc.Document qualified as HAL
import HAL.Types qualified as HAL import HAL.Types qualified as HAL
import Servant.Client (ClientError (..)) import Servant.Client (ClientError (..))
import System.IO.Error (userError)
import Gargantext.Core.Worker.Jobs.Types (FetchDocumentsHALPayload(..), Job (..)) import Gargantext.Core.Worker.Jobs.Types (FetchDocumentsHALPayload(..), Job (..))
import Gargantext.Core.Types (CorpusId)
get :: Maybe ISO639.ISO639_1 -> Text -> Maybe Int -> IO [HyperdataDocument] get :: Maybe ISO639.ISO639_1 -> Text -> Maybe Int -> IO [HyperdataDocument]
get la q ml = do get la q ml = do
...@@ -74,26 +74,25 @@ toDoc' la (HAL.Document { .. }) = do ...@@ -74,26 +74,25 @@ toDoc' la (HAL.Document { .. }) = do
-- A Simple ExceptT to make working with network requests a bit more pleasant. -- A Simple ExceptT to make working with network requests a bit more pleasant.
type HALMonad a = ExceptT ClientError IO a type HALMonad a = ExceptT ClientError IO a
getDataProducer :: Maybe ISO639.ISO639_1 getDataProducer :: CorpusId
-> Maybe ISO639.ISO639_1
-> Text -> Text
-> Maybe Int -> Maybe Int
-> IO (Either ClientError (ResultsCount, DataProducer IO HyperdataDocument)) -> IO (Either ClientError (ResultsCount, DataProducer IO HyperdataDocument))
getDataProducer la q _mb_limit = runExceptT $ do getDataProducer corpusId la q _mb_limit = runExceptT $ do
-- First of all, make a trivial query to fetch the full number of documents. Then, split the -- First of all, make a trivial query to fetch the full number of documents. Then, split the
-- total requests into suitable batches and turn them into Jobs. -- total requests into suitable batches and turn them into Jobs.
(mb_docs, _) <- ExceptT $ HAL.getMetadataWithC [q] (Just 0) (Just 1) la total <- ExceptT $ HAL.countResultsOpts' (HAL.defaultHalOptions { HAL._hco_batchSize = 1 }) q la
case mb_docs of putStrLn $ "Found " <> show total <> " documents matching the query."++""
Nothing -> throwError $ ConnectionError (toException $ userError "impossible, hal didn't return numDocs in the response.")
Just total -> do
let (batches,finalBatchSize) = (fromInteger total) `divMod` halBatchSize let (batches,finalBatchSize) = (fromInteger total) `divMod` halBatchSize
pure (ResultsCount total, DataAsyncBatchProducer $ mkBatches batches finalBatchSize 0) pure (ResultsCount total, DataAsyncBatchProducer $ mkBatches (max 0 batches) finalBatchSize 0)
where where
mkBatches 1 finalBatchSize offset = mkBatches 0 finalBatchSize offset =
[FetchDocumentsHAL (FetchDocumentsHALPayload Nothing q la offset finalBatchSize)] [FetchDocumentsHAL (FetchDocumentsHALPayload corpusId q la offset finalBatchSize)]
mkBatches curBatch finalBatchSize offset = mkBatches curBatch finalBatchSize offset =
FetchDocumentsHAL (FetchDocumentsHALPayload Nothing q la offset halBatchSize) FetchDocumentsHAL (FetchDocumentsHALPayload corpusId q la offset halBatchSize)
: mkBatches (curBatch - 1) finalBatchSize (offset + halBatchSize) : mkBatches (curBatch - 1) finalBatchSize (offset + halBatchSize)
-- | The size of a single batch. -- | The size of a single batch.
halBatchSize :: Int halBatchSize :: Int
halBatchSize = 500 halBatchSize = 100
...@@ -37,7 +37,7 @@ import qualified Data.LanguageCodes as ISO639 ...@@ -37,7 +37,7 @@ import qualified Data.LanguageCodes as ISO639
data FetchDocumentsHALPayload data FetchDocumentsHALPayload
= FetchDocumentsHALPayload = FetchDocumentsHALPayload
{ _fdhp_corpus_id :: Maybe CorpusId { _fdhp_corpus_id :: CorpusId
, _fdhp_query :: Text , _fdhp_query :: Text
, _fdhp_lang :: Maybe ISO639.ISO639_1 , _fdhp_lang :: Maybe ISO639.ISO639_1
, _fdhp_offset :: Int , _fdhp_offset :: Int
...@@ -345,4 +345,4 @@ getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id ...@@ -345,4 +345,4 @@ getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _ _)) = Just corpusId getWorkerMNodeId (ImportRemoteDocuments (ImportRemoteDocumentsPayload _ _ corpusId _ _)) = Just corpusId
getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId getWorkerMNodeId (ImportRemoteTerms (ImportRemoteTermsPayload listId _)) = Just listId
getWorkerMNodeId (FetchDocumentsHAL (FetchDocumentsHALPayload{_fdhp_corpus_id})) = _fdhp_corpus_id getWorkerMNodeId (FetchDocumentsHAL (FetchDocumentsHALPayload{_fdhp_corpus_id})) = Just _fdhp_corpus_id
...@@ -98,7 +98,7 @@ import Gargantext.Database.Action.Search (searchDocInDatabase) ...@@ -98,7 +98,7 @@ import Gargantext.Database.Action.Search (searchDocInDatabase)
import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact ) import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) ) import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) )
import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument ) import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument )
import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId) import Gargantext.Database.Admin.Types.Node hiding (INFO, ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 ) import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith ) import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
...@@ -112,7 +112,7 @@ import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId ) ...@@ -112,7 +112,7 @@ import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId )
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node
import Gargantext.Database.Types import Gargantext.Database.Types
import Gargantext.Prelude hiding (catch, onException, to) import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, INFO, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr ) import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr )
import Servant.Client.Core (ClientError) import Servant.Client.Core (ClientError)
...@@ -147,16 +147,17 @@ printDataProducer = \case ...@@ -147,16 +147,17 @@ printDataProducer = \case
-- TODO use the split parameter in config file -- TODO use the split parameter in config file
getDataText :: (HasNodeError err) getDataText :: (HasNodeError err)
=> DataOrigin => CorpusId
-> DataOrigin
-> TermType Lang -> TermType Lang
-> API.RawQuery -> API.RawQuery
-> Maybe API.Limit -> Maybe API.Limit
-> DBCmdWithEnv env err (Either API.GetCorpusError (DataText IO)) -> DBCmdWithEnv env err (Either API.GetCorpusError (DataText IO))
getDataText (ExternalOrigin api) la q li = do getDataText corpusId (ExternalOrigin api) la q li = do
cfg <- view hasConfig cfg <- view hasConfig
eRes <- liftBase $ API.get api (_tt_lang la) q (_ac_epo_api_url $ _gc_apis cfg) li eRes <- liftBase $ API.get corpusId api (_tt_lang la) q (_ac_epo_api_url $ _gc_apis cfg) li
pure $ uncurry DataNew <$> eRes pure $ uncurry DataNew <$> eRes
getDataText (InternalOrigin _) la q _li = do getDataText _ (InternalOrigin _) la q _li = do
cfg <- view hasConfig cfg <- view hasConfig
runDBTx $ do runDBTx $ do
(_masterUserId, _masterRootId, cId) <- getOrMkRootWithCorpus cfg MkCorpusUserMaster (Nothing :: Maybe HyperdataCorpus) (_masterUserId, _masterRootId, cId) <- getOrMkRootWithCorpus cfg MkCorpusUserMaster (Nothing :: Maybe HyperdataCorpus)
...@@ -164,13 +165,14 @@ getDataText (InternalOrigin _) la q _li = do ...@@ -164,13 +165,14 @@ getDataText (InternalOrigin _) la q _li = do
pure $ Right $ DataOld ids pure $ Right $ DataOld ids
getDataText_Debug :: (HasNodeError err) getDataText_Debug :: (HasNodeError err)
=> DataOrigin => CorpusId
-> DataOrigin
-> TermType Lang -> TermType Lang
-> API.RawQuery -> API.RawQuery
-> Maybe API.Limit -> Maybe API.Limit
-> DBCmdWithEnv env err () -> DBCmdWithEnv env err ()
getDataText_Debug a l q li = do getDataText_Debug cid a l q li = do
result <- getDataText a l q li result <- getDataText cid a l q li
case result of case result of
Left err -> liftBase $ putText $ show err Left err -> liftBase $ putText $ show err
Right res -> liftBase $ printDataText res Right res -> liftBase $ printDataText res
...@@ -381,7 +383,9 @@ runDataProducer jobHandle processData = \case ...@@ -381,7 +383,9 @@ runDataProducer jobHandle processData = \case
] ]
DataAsyncBatchProducer jobs DataAsyncBatchProducer jobs
-> forM_ jobs sendJob -> do
addMoreSteps (fromIntegral $ length jobs) jobHandle
forM_ jobs sendJob
-- | Given a list of corpus documents and a 'NodeId' identifying the 'CorpusId', adds -- | Given a list of corpus documents and a 'NodeId' identifying the 'CorpusId', adds
-- the given documents to the corpus. Returns the Ids of the inserted documents. -- the given documents to the corpus. Returns the Ids of the inserted documents.
...@@ -400,6 +404,7 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m ...@@ -400,6 +404,7 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m
-> [document] -> [document]
-> m [DocId] -> m [DocId]
addDocumentsToHyperCorpus mb_hyper la corpusId docs = do addDocumentsToHyperCorpus mb_hyper la corpusId docs = do
$(logLocM) INFO $ "Adding " <> T.pack (show $ length docs) <> " to the hyper corpus " <> T.pack (show corpusId)
cfg <- view hasConfig cfg <- view hasConfig
nlp <- view (nlpServerGet $ _tt_lang la) nlp <- view (nlpServerGet $ _tt_lang la)
-- First extract all the ngrams for the input documents via the nlp server, -- First extract all the ngrams for the input documents via the nlp server,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment