Commit 0d1d0886 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Switch HAL to use the jobs producer

This doesn't do much at the moment, because due to the fact the CorpusId
would be empty, it means that each worker will immediately discard the
message.
parent 29aa887a
...@@ -75,8 +75,12 @@ get externalAPI lang q epoAPIUrl limit = do ...@@ -75,8 +75,12 @@ get externalAPI lang q epoAPIUrl limit = do
Arxiv -> runExceptT $ do Arxiv -> runExceptT $ do
corpusQuery <- ExceptT (pure parse_query) corpusQuery <- ExceptT (pure parse_query)
ExceptT $ fmap (Right . toConduitProducer) (Arxiv.get lang corpusQuery limit) ExceptT $ fmap (Right . toConduitProducer) (Arxiv.get lang corpusQuery limit)
HAL -> HAL -> do
toStreamingProducer externalAPI <$> HAL.getC (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) -- /Beta quality/ -- see issue #511. For HAL
-- let's create a data producer that spins out separate jobs, and process batches
-- of 25 documents at the time.
first (ExternalAPIError externalAPI) <$>
HAL.getDataProducer (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
IsTex -> do IsTex -> do
docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs) pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs)
......
...@@ -19,13 +19,16 @@ import Data.Map.Strict qualified as Map ...@@ -19,13 +19,16 @@ import Data.Map.Strict qualified as Map
import Data.Text (pack) import Data.Text (pack)
import Gargantext.Core.Text.Corpus.Parsers.Date qualified as Date import Gargantext.Core.Text.Corpus.Parsers.Date qualified as Date
import Gargantext.Core.Utils (nonemptyIntercalate) import Gargantext.Core.Utils (nonemptyIntercalate)
import Gargantext.Database.Action.Flow.Types
import Gargantext.Database.Admin.Types.Hyperdata.Document ( HyperdataDocument(..) ) import Gargantext.Database.Admin.Types.Hyperdata.Document ( HyperdataDocument(..) )
import Gargantext.Defaults qualified as Defaults import Gargantext.Defaults qualified as Defaults
import Gargantext.Prelude hiding (intercalate) import Gargantext.Prelude hiding (intercalate)
import HAL qualified import HAL qualified
import HAL.Doc.Document qualified as HAL import HAL.Doc.Document qualified as HAL
import HAL.Types qualified as HAL import HAL.Types qualified as HAL
import Servant.Client (ClientError) import Servant.Client (ClientError (..))
import System.IO.Error (userError)
import Gargantext.Core.Worker.Jobs.Types (FetchDocumentsHALPayload(..), Job (..))
get :: Maybe ISO639.ISO639_1 -> Text -> Maybe Int -> IO [HyperdataDocument] get :: Maybe ISO639.ISO639_1 -> Text -> Maybe Int -> IO [HyperdataDocument]
get la q ml = do get la q ml = do
...@@ -67,3 +70,30 @@ toDoc' la (HAL.Document { .. }) = do ...@@ -67,3 +70,30 @@ toDoc' la (HAL.Document { .. }) = do
, _hd_publication_second = Nothing , _hd_publication_second = Nothing
, _hd_language_iso2 = Just $ show la , _hd_language_iso2 = Just $ show la
, _hd_institutes_tree = Just _document_institutes_tree } , _hd_institutes_tree = Just _document_institutes_tree }
-- A Simple ExceptT to make working with network requests a bit more pleasant.
type HALMonad a = ExceptT ClientError IO a
getDataProducer :: Maybe ISO639.ISO639_1
-> Text
-> Maybe Int
-> IO (Either ClientError (ResultsCount, DataProducer IO HyperdataDocument))
getDataProducer la q _mb_limit = runExceptT $ do
-- First of all, make a trivial query to fetch the full number of documents. Then, split the
-- total requests into suitable batches and turn them into Jobs.
(mb_docs, _) <- ExceptT $ HAL.getMetadataWithC [q] (Just 0) (Just 1) la
case mb_docs of
Nothing -> throwError $ ConnectionError (toException $ userError "impossible, hal didn't return numDocs in the response.")
Just total -> do
let (batches,finalBatchSize) = (fromInteger total) `divMod` halBatchSize
pure (ResultsCount total, DataAsyncBatchProducer $ mkBatches batches finalBatchSize 0)
where
mkBatches 1 finalBatchSize offset =
[FetchDocumentsHAL (FetchDocumentsHALPayload Nothing q la offset finalBatchSize)]
mkBatches curBatch finalBatchSize offset =
FetchDocumentsHAL (FetchDocumentsHALPayload Nothing q la offset halBatchSize)
: mkBatches (curBatch - 1) finalBatchSize (offset + halBatchSize)
-- | The size of a single batch.
halBatchSize :: Int
halBatchSize = 500
...@@ -322,7 +322,7 @@ performAction env _s bm = do ...@@ -322,7 +322,7 @@ performAction env _s bm = do
void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs void $ remoteImportDocuments loggedInUser parentId corpusId workSplit docs
-- | Fetch some documents from HAL -- | Fetch some documents from HAL
FetchDocumentsHAL (FetchDocumentsHALPayload corpusId lang query offset limit) FetchDocumentsHAL (FetchDocumentsHALPayload corpusId query lang offset limit)
-> runWorkerMonad env $ do -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] fetch documents from HAL" $(logLocM) DEBUG $ "[performAction] fetch documents from HAL (query: " <> query <> ", offset: " <> T.pack (show offset) <> ", limit: " <> T.pack (show limit)
fetchHALDocuments corpusId query lang offset limit fetchHALDocuments corpusId lang query offset limit
...@@ -235,6 +235,8 @@ instance FromJSON Job where ...@@ -235,6 +235,8 @@ instance FromJSON Job where
ImportRemoteDocuments <$> parseJSON (JS.Object o) ImportRemoteDocuments <$> parseJSON (JS.Object o)
"ImportRemoteTerms" -> "ImportRemoteTerms" ->
ImportRemoteTerms <$> parseJSON (JS.Object o) ImportRemoteTerms <$> parseJSON (JS.Object o)
"FetchDocumentsHAL" ->
FetchDocumentsHAL <$> parseJSON (JS.Object o)
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s) s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where instance ToJSON Job where
toJSON Ping = object [ "type" .= ("Ping" :: Text) ] toJSON Ping = object [ "type" .= ("Ping" :: Text) ]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment