Commit 00c23c73 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Introduce the DataProducer abstraction

This commit refactors the codebase such that instead of passing a
Conduit for the data fetched from the external APIs we have now a
`DataProducer` which can eventually support splitting the fetching of
data into multiple async jobs.
parent ca7f0f26
Pipeline #7919 failed with stages
in 59 minutes and 29 seconds
...@@ -32,7 +32,7 @@ import Gargantext.Core.Text.Terms (TermType(..)) ...@@ -32,7 +32,7 @@ import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (flow) import Gargantext.Database.Action.Flow (flow)
import Gargantext.Database.Action.Flow.Types (FlowCmdM) import Gargantext.Database.Action.Flow.Types (FlowCmdM, ResultsCount (..), DataProducer (..))
import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact, hyperdataContact ) import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact, hyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire(..) ) import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire(..) )
import Gargantext.Database.Admin.Types.Node ( CorpusId, NodeId ) import Gargantext.Database.Admin.Types.Node ( CorpusId, NodeId )
...@@ -67,7 +67,11 @@ addContact :: (FlowCmdM env err m, MonadJobStatus m, MonadCatch m) ...@@ -67,7 +67,11 @@ addContact :: (FlowCmdM env err m, MonadJobStatus m, MonadCatch m)
addContact u nId (AddContactParams fn ln) jobHandle = do addContact u nId (AddContactParams fn ln) jobHandle = do
markStarted 2 jobHandle markStarted 2 jobHandle
_ <- flow (Nothing :: Maybe HyperdataAnnuaire) (MkCorpusUserNormalCorpusIds u [nId]) (Multi EN) Nothing (1, yield $ hyperdataContact fn ln) jobHandle _ <- flow (Nothing :: Maybe HyperdataAnnuaire)
(MkCorpusUserNormalCorpusIds u [nId])
(Multi EN)
Nothing
(ResultsCount 1, DataStreamingProducer $ yield $ hyperdataContact fn ln) jobHandle
markComplete jobHandle markComplete jobHandle
addContact _uId _nId _p jobHandle = do addContact _uId _nId _p jobHandle = do
......
...@@ -18,11 +18,16 @@ import Data.Aeson (genericParseJSON, genericToJSON) ...@@ -18,11 +18,16 @@ import Data.Aeson (genericParseJSON, genericToJSON)
import Data.Swagger (ToSchema(..), genericDeclareNamedSchema) import Data.Swagger (ToSchema(..), genericDeclareNamedSchema)
import Gargantext.API.Node.Corpus.New.Types qualified as NewTypes import Gargantext.API.Node.Corpus.New.Types qualified as NewTypes
import Gargantext.Core (Lang(..)) import Gargantext.Core (Lang(..))
import Gargantext.Core.NodeStory.Types
import Gargantext.Core.Notifications.CentralExchange.Types
import Gargantext.Core.Types
import Gargantext.Core.Utils.Aeson (jsonOptions) import Gargantext.Core.Utils.Aeson (jsonOptions)
import Gargantext.Core.Utils.Prefix (unPrefixSwagger) import Gargantext.Core.Utils.Prefix (unPrefixSwagger)
import Gargantext.Database.Action.Flow.Types (FlowCmdM) -- flowAnnuaire import Gargantext.Database (IsDBCmdExtra)
import Gargantext.Database.Admin.Types.Node (AnnuaireId) import Gargantext.Database.Query.Table.Node.Error
import Gargantext.Database.Query.Tree
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Servant import Servant
import Web.FormUrlEncoded (FromForm) import Web.FormUrlEncoded (FromForm)
...@@ -50,7 +55,15 @@ instance ToSchema AnnuaireWithForm where ...@@ -50,7 +55,15 @@ instance ToSchema AnnuaireWithForm where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_wf_") declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_wf_")
------------------------------------------------------------------------ ------------------------------------------------------------------------
addToAnnuaireWithForm :: (FlowCmdM env err m, MonadJobStatus m) addToAnnuaireWithForm :: ( IsDBCmdExtra env err m
, HasNodeStory env err m
, HasNodeError err
, HasValidationError err
, HasTreeError err
, MonadLogger m
, HasCentralExchangeNotification env
, MonadJobStatus m
)
=> AnnuaireId => AnnuaireId
-> AnnuaireWithForm -> AnnuaireWithForm
-> JobHandle m -> JobHandle m
......
...@@ -34,20 +34,21 @@ import Gargantext.API.Admin.Orchestrator.Types qualified as API ...@@ -34,20 +34,21 @@ import Gargantext.API.Admin.Orchestrator.Types qualified as API
import Gargantext.API.Ngrams (commitStatePatch, Versioned(..)) import Gargantext.API.Ngrams (commitStatePatch, Versioned(..))
import Gargantext.API.Node.Corpus.New.Types ( FileType(..) ) import Gargantext.API.Node.Corpus.New.Types ( FileType(..) )
import Gargantext.API.Node.Corpus.Searx ( triggerSearxSearch ) import Gargantext.API.Node.Corpus.Searx ( triggerSearxSearch )
import Gargantext.API.Node.Corpus.Types ( Datafield(Web), datafield2origin ) import Gargantext.API.Node.Corpus.Types ( Datafield(..), Database (..))
import Gargantext.API.Node.Corpus.Update (addLanguageToCorpus) import Gargantext.API.Node.Corpus.Update (addLanguageToCorpus)
import Gargantext.API.Node.Types import Gargantext.API.Node.Types
import Gargantext.Core (withDefaultLanguage, defaultLanguage) import Gargantext.Core (withDefaultLanguage, defaultLanguage)
import Gargantext.Core.Config (gc_jobs, hasConfig) import Gargantext.Core.Config (gc_jobs, hasConfig)
import Gargantext.Core.Config.Types (jc_max_docs_parsers) import Gargantext.Core.Config.Types (jc_max_docs_parsers)
import Gargantext.Core.NodeStory (currentVersion, NgramsStatePatch', HasNodeStoryEnv (..)) import Gargantext.Core.NodeStory (currentVersion, NgramsStatePatch', HasNodeStoryEnv (..))
import Gargantext.Core.Text.Corpus.API (ExternalAPIs(..))
import Gargantext.Core.Text.Corpus.Parsers qualified as Parser (FileType(..), parseFormatC, _ParseFormatError) import Gargantext.Core.Text.Corpus.Parsers qualified as Parser (FileType(..), parseFormatC, _ParseFormatError)
import Gargantext.Core.Text.Corpus.Parsers.Types import Gargantext.Core.Text.Corpus.Parsers.Types
import Gargantext.Core.Text.Corpus.Query qualified as API import Gargantext.Core.Text.Corpus.Query qualified as API
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Utils.Prefix (unPrefix) import Gargantext.Core.Utils.Prefix (unPrefix)
import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-}) import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-})
import Gargantext.Database.Action.Flow.Types (FlowCmdM) import Gargantext.Database.Action.Flow.Types (FlowCmdM, DataOrigin(..), ResultsCount (..), DataProducer (..), hoistDataText)
import Gargantext.Database.Action.Mail (sendMail) import Gargantext.Database.Action.Mail (sendMail)
import Gargantext.Database.Action.Node (mkNodeWithParent) import Gargantext.Database.Action.Node (mkNodeWithParent)
import Gargantext.Database.Action.User (getUserId) import Gargantext.Database.Action.User (getUserId)
...@@ -66,59 +67,11 @@ import Gargantext.System.Logging ( logLocM, LogLevel(..) ) ...@@ -66,59 +67,11 @@ import Gargantext.System.Logging ( logLocM, LogLevel(..) )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..)) import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (JobHandle, MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (JobHandle, MonadJobStatus(..))
------------------------------------------------------------------------ datafield2origin :: Datafield -> DataOrigin
{- datafield2origin (External Empty) = InternalOrigin IsTex
data Query = Query { query_query :: Text datafield2origin (External (DB db)) = ExternalOrigin db
, query_node_id :: Int -- -- | This isn't really used
, query_lang :: Lang datafield2origin _ = InternalOrigin IsTex
, query_databases :: [DataOrigin]
}
deriving (Eq, Generic)
deriveJSON (unPrefix "query_") 'Query
instance Arbitrary Query where
arbitrary = elements [ Query q n la fs
| q <- ["honeybee* AND collapse"
,"covid 19"
]
, n <- [0..10]
, la <- allLangs
, fs <- take 3 $ repeat allDataOrigins
]
instance ToSchema Query where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "query_")
-}
------------------------------------------------------------------------
{-
type Api = PostApi
:<|> GetApi
type PostApi = Summary "New Corpus endpoint"
:> ReqBody '[JSON] Query
:> Post '[JSON] CorpusId
type GetApi = Get '[JSON] ApiInfo
-}
-- | TODO manage several apis
-- TODO-ACCESS
-- TODO this is only the POST
{-
api :: (FlowCmdM env err m) => UserId -> Query -> m CorpusId
api uid (Query q _ as) = do
cId <- case head as of
Nothing -> flowCorpusSearchInDatabase (UserDBId uid) EN q
Just API.All -> flowCorpusSearchInDatabase (UserDBId uid) EN q
Just a -> do
docs <- liftBase $ API.get a q (Just 1000)
cId' <- flowCorpus (UserDBId uid) (Left q) (Multi EN) [docs]
pure cId'
pure cId
-}
------------------------------------------------ ------------------------------------------------
-- TODO use this route for Client implementation -- TODO use this route for Client implementation
...@@ -202,7 +155,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q ...@@ -202,7 +155,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
$(logLocM) DEBUG "[addToCorpusWithQuery] Processing dataText results" $(logLocM) DEBUG "[addToCorpusWithQuery] Processing dataText results"
markProgress 1 jobHandle markProgress 1 jobHandle
corpusId <- flowDataText user txt (Multi l) cid (Just flw) jobHandle corpusId <- flowDataText user (hoistDataText liftBase txt) (Multi l) cid (Just flw) jobHandle
$(logLocM) DEBUG $ "[addToCorpusWithQuery] corpus id " <> show corpusId $(logLocM) DEBUG $ "[addToCorpusWithQuery] corpus id " <> show corpusId
_ <- commitCorpus cid user _ <- commitCorpus cid user
...@@ -288,7 +241,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do ...@@ -288,7 +241,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
(Multi l) (Multi l)
(Just (nwtf ^. wtf_selection)) (Just (nwtf ^. wtf_selection))
--(Just $ fromIntegral $ length docs, docsC') --(Just $ fromIntegral $ length docs, docsC')
(count, docsC') -- TODO fix number of docs (ResultsCount count, DataStreamingProducer docsC') -- TODO fix number of docs
--(map (map toHyperdataDocument) docs) --(map (map toHyperdataDocument) docs)
jobHandle jobHandle
......
...@@ -18,7 +18,6 @@ import Data.Aeson.Types ( Parser ) ...@@ -18,7 +18,6 @@ import Data.Aeson.Types ( Parser )
import Data.Swagger import Data.Swagger
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.Orchestrator.Types qualified as Types import Gargantext.API.Admin.Orchestrator.Types qualified as Types
import Gargantext.Database.Action.Flow.Types (DataOrigin(..))
import Gargantext.Prelude import Gargantext.Prelude
type EPOAPIToken = Text type EPOAPIToken = Text
...@@ -43,12 +42,6 @@ instance ToSchema Database where ...@@ -43,12 +42,6 @@ instance ToSchema Database where
declareNamedSchema = genericDeclareNamedSchemaUnrestricted defaultSchemaOptions declareNamedSchema = genericDeclareNamedSchemaUnrestricted defaultSchemaOptions
datafield2origin :: Datafield -> DataOrigin
datafield2origin (External Empty) = InternalOrigin Types.IsTex
datafield2origin (External (DB db)) = ExternalOrigin db
-- -- | This isn't really used
datafield2origin _ = InternalOrigin Types.IsTex
------------------------------------------------------------------------ ------------------------------------------------------------------------
data Datafield = Gargantext data Datafield = Gargantext
| External Database | External Database
......
...@@ -37,7 +37,7 @@ import Gargantext.Core.Text.Terms (TermType(..)) ...@@ -37,7 +37,7 @@ import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow (flowDataText, DataText(..)) import Gargantext.Database.Action.Flow (flowDataText, DataText(..))
import Gargantext.Database.Action.Flow.Types (FlowCmdM) import Gargantext.Database.Action.Flow.Types (FlowCmdM, ResultsCount (..), DataProducer (..))
import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..)) import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..))
import Gargantext.Database.Admin.Types.Hyperdata.Frame ( HyperdataFrame(..), getHyperdataFrameContents ) import Gargantext.Database.Admin.Types.Hyperdata.Frame ( HyperdataFrame(..), getHyperdataFrameContents )
import Gargantext.Database.Admin.Types.Node ( NodeId, Node, NodeType(..) ) import Gargantext.Database.Admin.Types.Node ( NodeId, Node, NodeType(..) )
...@@ -102,7 +102,7 @@ documentsFromWriteNodes authenticatedUser nId Params { selection, lang, paragrap ...@@ -102,7 +102,7 @@ documentsFromWriteNodes authenticatedUser nId Params { selection, lang, paragrap
let parsed = List.concat $ rights parsedE let parsed = List.concat $ rights parsedE
-- printDebug "DocumentsFromWriteNodes: uId" uId -- printDebug "DocumentsFromWriteNodes: uId" uId
_ <- flowDataText (RootId userNodeId) _ <- flowDataText (RootId userNodeId)
(DataNew (Just $ fromIntegral $ length parsed, yieldMany parsed)) (DataNew (ResultsCount $ fromIntegral $ length parsed) (DataStreamingProducer $ yieldMany parsed))
(Multi lang) (Multi lang)
cId cId
(Just selection) (Just selection)
......
...@@ -37,6 +37,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..) ...@@ -37,6 +37,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document (HyperdataDocument(..)
import Gargantext.Prelude hiding (get) import Gargantext.Prelude hiding (get)
import Gargantext.Utils.Jobs.Error import Gargantext.Utils.Jobs.Error
import Servant.Client (ClientError) import Servant.Client (ClientError)
import Gargantext.Database.Action.Flow.Types
data GetCorpusError data GetCorpusError
= -- | We couldn't parse the user input query into something meaningful. = -- | We couldn't parse the user input query into something meaningful.
...@@ -62,31 +63,38 @@ get :: ExternalAPIs ...@@ -62,31 +63,38 @@ get :: ExternalAPIs
-> Text -> Text
-> Maybe Corpus.Limit -> Maybe Corpus.Limit
-- -> IO [HyperdataDocument] -- -> IO [HyperdataDocument]
-> IO (Either GetCorpusError (Maybe Integer, ConduitT () HyperdataDocument IO ())) -> IO (Either GetCorpusError (ResultsCount, DataProducer IO HyperdataDocument))
get externalAPI lang q epoAPIUrl limit = do get externalAPI lang q epoAPIUrl limit = do
-- For PUBMED, HAL, IsTex, Isidore and OpenAlex, we want to send the query as-it. -- For PUBMED, HAL, IsTex, Isidore and OpenAlex, we want to send the query as-it.
-- For Arxiv we parse the query into a structured boolean query we submit over. -- For Arxiv we parse the query into a structured boolean query we submit over.
case externalAPI of case externalAPI of
PubMed mPubmedAPIKey -> PubMed mPubmedAPIKey ->
first (ExternalAPIError externalAPI) <$> PUBMED.get (fromMaybe "" mPubmedAPIKey) q limit toStreamingProducer externalAPI <$> PUBMED.get (fromMaybe "" mPubmedAPIKey) q limit
OpenAlex -> OpenAlex ->
first (ExternalAPIError externalAPI) <$> OpenAlex.get (fromMaybe "" Nothing {- email -}) q (Just $ toISO639 lang) limit toStreamingProducer externalAPI <$> OpenAlex.get (fromMaybe "" Nothing {- email -}) q (Just $ toISO639 lang) limit
Arxiv -> runExceptT $ do Arxiv -> runExceptT $ do
corpusQuery <- ExceptT (pure parse_query) corpusQuery <- ExceptT (pure parse_query)
ExceptT $ fmap Right (Arxiv.get lang corpusQuery limit) ExceptT $ fmap (Right . toConduitProducer) (Arxiv.get lang corpusQuery limit)
HAL -> HAL ->
first (ExternalAPIError externalAPI) <$> HAL.getC (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) toStreamingProducer externalAPI <$> HAL.getC (Just $ toISO639 lang) (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
IsTex -> do IsTex -> do
docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit) docs <- ISTEX.get lang (Corpus.getRawQuery q) (Corpus.getLimit <$> limit)
pure $ Right (Just $ fromIntegral $ length docs, yieldMany docs) pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs)
Isidore -> do Isidore -> do
docs <- ISIDORE.get lang (Corpus.getLimit <$> limit) (Just $ Corpus.getRawQuery q) Nothing docs <- ISIDORE.get lang (Corpus.getLimit <$> limit) (Just $ Corpus.getRawQuery q) Nothing
pure $ Right (Just $ fromIntegral $ length docs, yieldMany docs) pure $ Right $ toConduitProducer (Just $ fromIntegral $ length docs, yieldMany docs)
EPO mAPIUser mAPIToken -> do EPO mAPIUser mAPIToken -> do
let mEPOAuthKey = EPO.AuthKey <$> (EPO.User <$> mAPIUser) let mEPOAuthKey = EPO.AuthKey <$> (EPO.User <$> mAPIUser)
<*> (EPO.Token <$> mAPIToken) <*> (EPO.Token <$> mAPIToken)
first (ExternalAPIError externalAPI) <$> EPO.get mEPOAuthKey epoAPIUrl q (toISO639 lang) limit toStreamingProducer externalAPI <$> EPO.get mEPOAuthKey epoAPIUrl q (toISO639 lang) limit
where where
parse_query = first (InvalidInputQuery q . T.pack) $ Corpus.parseQuery q parse_query = first (InvalidInputQuery q . T.pack) $ Corpus.parseQuery q
toStreamingProducer :: ExternalAPIs
-> Either ClientError (Maybe Integer, ConduitT () HyperdataDocument IO ())
-> Either GetCorpusError (ResultsCount, DataProducer IO HyperdataDocument)
toStreamingProducer externalAPI = bimap (ExternalAPIError externalAPI) toConduitProducer
toConduitProducer :: (Maybe Integer, ConduitT () HyperdataDocument IO ())
-> (ResultsCount, DataProducer IO HyperdataDocument)
toConduitProducer (mb_r, conduitData) = (ResultsCount (fromMaybe 0 mb_r), DataStreamingProducer conduitData)
...@@ -55,9 +55,9 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -55,9 +55,9 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where where
import Conduit import Conduit
import Control.Exception.Safe (catch, MonadCatch)
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch)
import Data.Conduit qualified as C import Data.Conduit qualified as C
import Data.Conduit.Internal (zipSources) import Data.Conduit.Internal (zipSources)
import Data.Conduit.List qualified as CL import Data.Conduit.List qualified as CL
...@@ -88,9 +88,10 @@ import Gargantext.Core.Text.Terms.Mono.Stem (stem, StemmingAlgorithm(..)) ...@@ -88,9 +88,10 @@ import Gargantext.Core.Text.Terms.Mono.Stem (stem, StemmingAlgorithm(..))
import Gargantext.Core.Types (HasValidationError, TermsCount, TermsWeight(..)) import Gargantext.Core.Types (HasValidationError, TermsCount, TermsWeight(..))
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Types.Main ( ListType(MapTerm) ) import Gargantext.Core.Types.Main ( ListType(MapTerm) )
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Database.Action.Flow.Extract () -- ExtractNgramsT instances import Gargantext.Database.Action.Flow.Extract () -- ExtractNgramsT instances
import Gargantext.Database.Action.Flow.List ( flowList_DbRepo, toNodeNgramsW' ) import Gargantext.Database.Action.Flow.List ( flowList_DbRepo, toNodeNgramsW' )
import Gargantext.Database.Action.Flow.Types ( do_api, DataOrigin(..), DataText(..), FlowCorpus, DocumentIdWithNgrams (..) ) import Gargantext.Database.Action.Flow.Types ( do_api, DataOrigin(..), DataText(..), FlowCorpus, DocumentIdWithNgrams (..), DataProducer(..), ResultsCount(..))
import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, insertDocs, mkNodeIdNgramsMap, ngramsByDoc, documentIdWithNgrams) import Gargantext.Database.Action.Flow.Utils (insertDocNgrams, insertDocs, mkNodeIdNgramsMap, ngramsByDoc, documentIdWithNgrams)
import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContextScore) import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContextScore)
import Gargantext.Database.Action.Search (searchDocInDatabase) import Gargantext.Database.Action.Search (searchDocInDatabase)
...@@ -119,6 +120,7 @@ import Servant.Client.Core (ClientError) ...@@ -119,6 +120,7 @@ import Servant.Client.Core (ClientError)
-- Imports for upgrade function -- Imports for upgrade function
import Gargantext.Database.Query.Tree.Error ( HasTreeError ) import Gargantext.Database.Query.Tree.Error ( HasTreeError )
import Gargantext.Core.Text.Terms.Multi (MultitermsExtractionException) import Gargantext.Core.Text.Terms.Multi (MultitermsExtractionException)
import Gargantext.Core.Worker.Jobs.Types (Job)
------------------------------------------------------------------------ ------------------------------------------------------------------------
...@@ -128,12 +130,20 @@ allDataOrigins = map InternalOrigin API.externalAPIs <> map ExternalOrigin API.e ...@@ -128,12 +130,20 @@ allDataOrigins = map InternalOrigin API.externalAPIs <> map ExternalOrigin API.e
--------------- ---------------
-- Show instance is not possible because of IO -- Show instance is not possible because of IO
printDataText :: DataText -> IO () printDataText :: DataText IO -> IO ()
printDataText (DataOld xs) = putText $ show xs printDataText (DataOld xs) = putText $ show xs
printDataText (DataNew (maybeInt, conduitData)) = do printDataText (DataNew resultsCount producer) = do
res <- C.runConduit (conduitData .| CL.consume) res <- printDataProducer producer
putText $ show (maybeInt, res) putText $ show (resultsCount, res)
printDataProducer :: DataProducer IO HyperdataDocument -> IO (Either [HyperdataDocument] [Job])
printDataProducer = \case
DataBatchProducer batches
-> Left . mconcat <$> sequence batches
DataStreamingProducer conduitData
-> Left <$> C.runConduit (conduitData .| CL.consume)
DataAsyncBatchProducer jobs
-> pure $ Right jobs
-- TODO use the split parameter in config file -- TODO use the split parameter in config file
getDataText :: (HasNodeError err) getDataText :: (HasNodeError err)
...@@ -141,11 +151,11 @@ getDataText :: (HasNodeError err) ...@@ -141,11 +151,11 @@ getDataText :: (HasNodeError err)
-> TermType Lang -> TermType Lang
-> API.RawQuery -> API.RawQuery
-> Maybe API.Limit -> Maybe API.Limit
-> DBCmdWithEnv env err (Either API.GetCorpusError DataText) -> DBCmdWithEnv env err (Either API.GetCorpusError (DataText IO))
getDataText (ExternalOrigin api) la q li = do getDataText (ExternalOrigin api) la q li = do
cfg <- view hasConfig cfg <- view hasConfig
eRes <- liftBase $ API.get api (_tt_lang la) q (_ac_epo_api_url $ _gc_apis cfg) li eRes <- liftBase $ API.get api (_tt_lang la) q (_ac_epo_api_url $ _gc_apis cfg) li
pure $ DataNew <$> eRes pure $ uncurry DataNew <$> eRes
getDataText (InternalOrigin _) la q _li = do getDataText (InternalOrigin _) la q _li = do
cfg <- view hasConfig cfg <- view hasConfig
runDBTx $ do runDBTx $ do
...@@ -179,7 +189,7 @@ flowDataText :: forall env err m. ...@@ -179,7 +189,7 @@ flowDataText :: forall env err m.
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
) )
=> User => User
-> DataText -> DataText m
-> TermType Lang -> TermType Lang
-> CorpusId -> CorpusId
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
...@@ -196,10 +206,10 @@ flowDataText u (DataOld ids) tt cid mfslw _ = do ...@@ -196,10 +206,10 @@ flowDataText u (DataOld ids) tt cid mfslw _ = do
flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
where where
corpusType = Nothing :: Maybe HyperdataCorpus corpusType = Nothing :: Maybe HyperdataCorpus
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle = do flowDataText u (DataNew mLen dataProducer) tt cid mfslw jobHandle = do
$(logLocM) DEBUG $ T.pack $ "Found " <> show mLen <> " new documents to process" $(logLocM) DEBUG $ T.pack $ "Found " <> show mLen <> " new documents to process"
for_ (mLen <&> fromInteger) (`addMoreSteps` jobHandle) addMoreSteps (fromIntegral $ _ResultsCount mLen) jobHandle
flowCorpus (MkCorpusUserNormalCorpusIds u [cid]) tt mfslw (fromMaybe 0 mLen, transPipe liftBase txtC) jobHandle flowCorpus (MkCorpusUserNormalCorpusIds u [cid]) tt mfslw (mLen, dataProducer) jobHandle
------------------------------------------------------------------------ ------------------------------------------------------------------------
-- TODO use proxy -- TODO use proxy
...@@ -220,7 +230,8 @@ flowAnnuaire :: ( IsDBCmd env err m ...@@ -220,7 +230,8 @@ flowAnnuaire :: ( IsDBCmd env err m
flowAnnuaire mkCorpusUser l filePath jobHandle = do flowAnnuaire mkCorpusUser l filePath jobHandle = do
-- TODO Conduit for file -- TODO Conduit for file
docs <- liftBase (readFile_Annuaire filePath :: IO [HyperdataContact]) docs <- liftBase (readFile_Annuaire filePath :: IO [HyperdataContact])
flow (Nothing :: Maybe HyperdataAnnuaire) mkCorpusUser l Nothing (fromIntegral $ length docs, yieldMany docs) jobHandle let (mLen, producer) = (ResultsCount $ fromIntegral $ length docs, DataStreamingProducer $ yieldMany docs)
flow (Nothing :: Maybe HyperdataAnnuaire) mkCorpusUser l Nothing (mLen, producer) jobHandle
------------------------------------------------------------------------ ------------------------------------------------------------------------
flowCorpusFile :: ( IsDBCmd env err m flowCorpusFile :: ( IsDBCmd env err m
...@@ -244,7 +255,8 @@ flowCorpusFile mkCorpusUser la ft ff fp mfslw jobHandle = do ...@@ -244,7 +255,8 @@ flowCorpusFile mkCorpusUser la ft ff fp mfslw jobHandle = do
eParsed <- liftBase $ parseFile ft ff fp eParsed <- liftBase $ parseFile ft ff fp
case eParsed of case eParsed of
Right parsed -> do Right parsed -> do
flowCorpus mkCorpusUser la mfslw (fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) jobHandle let (mLen, producer) = (ResultsCount $ fromIntegral $ length parsed, DataStreamingProducer (yieldMany parsed .| mapC toHyperdataDocument))
flowCorpus mkCorpusUser la mfslw (mLen, producer) jobHandle
--let docs = splitEvery 500 $ take l parsed --let docs = splitEvery 500 $ take l parsed
--flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus --flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
Left e -> panicTrace $ "Error: " <> e Left e -> panicTrace $ "Error: " <> e
...@@ -267,7 +279,7 @@ flowCorpus :: ( IsDBCmd env err m ...@@ -267,7 +279,7 @@ flowCorpus :: ( IsDBCmd env err m
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
-> (Integer, ConduitT () a m ()) -> (ResultsCount, DataProducer m a)
-> JobHandle m -> JobHandle m
-> m CorpusId -> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus) flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
...@@ -291,18 +303,18 @@ flow :: forall env err m a c. ...@@ -291,18 +303,18 @@ flow :: forall env err m a c.
-> MkCorpusUser -> MkCorpusUser
-> TermType Lang -> TermType Lang
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
-> (Integer, ConduitT () a m ()) -> (ResultsCount, DataProducer m a)
-> JobHandle m -> JobHandle m
-> m CorpusId -> m CorpusId
flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do flow c mkCorpusUser la mfslw (count, dataProducer) jobHandle = do
$(logLocM) DEBUG $ "Starting to process " <> show count <> " results.."
cfg <- view hasConfig cfg <- view hasConfig
(_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c (_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c
forM_ msgs ce_notify forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 5 runDataProducer jobHandle (addDocumentsWithProgress userCorpusId) dataProducer
.| mapM_C (addDocumentsWithProgress userCorpusId) `CES.catches`
.| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do [ CES.Handler $ \(e :: ClientError) -> do
$(logLocM) ERROR ("Client error: " <> show e :: Text) $(logLocM) ERROR ("Client error: " <> show e :: Text)
markFailure 1 (Just e) jobHandle markFailure 1 (Just e) jobHandle
...@@ -319,7 +331,6 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do ...@@ -319,7 +331,6 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
$(logLocM) DEBUG "Calling flowCorpusUser" $(logLocM) DEBUG "Calling flowCorpusUser"
flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
where where
addDocumentsWithProgress :: CorpusId -> [(Int, a)] -> m () addDocumentsWithProgress :: CorpusId -> [(Int, a)] -> m ()
addDocumentsWithProgress userCorpusId docsChunk = do addDocumentsWithProgress userCorpusId docsChunk = do
...@@ -328,6 +339,50 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do ...@@ -328,6 +339,50 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
markProgress (length docs) jobHandle markProgress (length docs) jobHandle
runDataProducer :: forall env err m a.
( IsDBCmd env err m
, HasNodeStory env err m
, MonadLogger m
, HasNLPServer env
, HasTreeError err
, HasValidationError err
, FlowCorpus a
, ExtractNgrams m a
, MonadJobStatus m
, HasCentralExchangeNotification env
, MonadCatch m
)
=> JobHandle m
-> ([(Int, a)] -> m ())
-> DataProducer m a
-> m ()
runDataProducer jobHandle processData = \case
DataBatchProducer batches
-> forM_ (zip [1..] batches) $ \(curBatch, b) -> do
docs <- b
-- FIXME(adn) proper documentIndex!
processData $ zip (repeat curBatch) docs
DataStreamingProducer conduitData
-> runConduit (zipSources (yieldMany ([1..] :: [Int])) conduitData
.| CList.chunksOf 5
.| mapM_C processData
.| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do
$(logLocM) ERROR ("Client error: " <> show e :: Text)
markFailure 1 (Just e) jobHandle
-- ignore this and proceed with list generation
pure ()
, CES.Handler $ \(e :: SomeException) -> do
$(logLocM) ERROR ("Exception during API call: " <> show e :: Text)
markFailureNoErr 1 jobHandle
-- ignore this and proceed with list generation
pure ()
]
DataAsyncBatchProducer jobs
-> forM_ jobs sendJob
-- | Given a list of corpus documents and a 'NodeId' identifying the 'CorpusId', adds -- | Given a list of corpus documents and a 'NodeId' identifying the 'CorpusId', adds
-- the given documents to the corpus. Returns the Ids of the inserted documents. -- the given documents to the corpus. Returns the Ids of the inserted documents.
addDocumentsToHyperCorpus :: ( IsDBCmd env err m addDocumentsToHyperCorpus :: ( IsDBCmd env err m
......
...@@ -13,6 +13,8 @@ Portability : POSIX ...@@ -13,6 +13,8 @@ Portability : POSIX
{-# LANGUAGE ConstrainedClassMethods #-} {-# LANGUAGE ConstrainedClassMethods #-}
{-# LANGUAGE InstanceSigs #-} {-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DerivingStrategies #-}
module Gargantext.Database.Action.Flow.Types module Gargantext.Database.Action.Flow.Types
where where
...@@ -37,6 +39,8 @@ import Gargantext.Database.Types (Indexed) ...@@ -37,6 +39,8 @@ import Gargantext.Database.Types (Indexed)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging ( MonadLogger ) import Gargantext.System.Logging ( MonadLogger )
import Gargantext.Core.Notifications.CentralExchange.Types (HasCentralExchangeNotification) import Gargantext.Core.Notifications.CentralExchange.Types (HasCentralExchangeNotification)
import Gargantext.Core.Worker.Jobs.Types (Job)
import Data.Conduit (transPipe)
type FlowCmdM env err m = type FlowCmdM env err m =
...@@ -82,6 +86,34 @@ deriveJSON (unPrefix "_do_") ''DataOrigin ...@@ -82,6 +86,34 @@ deriveJSON (unPrefix "_do_") ''DataOrigin
instance ToSchema DataOrigin where instance ToSchema DataOrigin where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_") declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_do_")
data DataText = DataOld ![NodeId] data DataProducer m a
| DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ()) = -- | Produces documents in batches, and as such the inner 'IO' action must return
--- | DataNew ![[HyperdataDocument]] -- a small number of documents.
DataBatchProducer [m [a]]
-- | Produces documents in a streaming fashion, and as such it's well suited for
-- tasks which cannot be easily parallelised (for example external APIs returning tokens
-- to strep through the pagination without given access to ways to randomly jump to the
-- desired page of results).
| DataStreamingProducer (ConduitT () a m ())
-- | A data producer that knows how to generate jobs out of its task.
| DataAsyncBatchProducer [Job]
hoistDataProducer :: Monad m => (forall x. m x -> n x) -> DataProducer m a -> DataProducer n a
hoistDataProducer hoistFn = \case
DataBatchProducer batches -> DataBatchProducer $ map hoistFn batches
DataStreamingProducer conduitData -> DataStreamingProducer (transPipe hoistFn conduitData)
DataAsyncBatchProducer jobs -> DataAsyncBatchProducer jobs
newtype ResultsCount = ResultsCount { _ResultsCount :: Integer }
deriving newtype (Show, Ord, Eq)
data DataText m =
-- | We found some old (cached) data we can serve directly
DataOld ![NodeId]
-- | We need to produce the new data
| DataNew !ResultsCount !(DataProducer m HyperdataDocument)
hoistDataText :: Monad m => (forall x. m x -> n x) -> DataText m -> DataText n
hoistDataText hoistFn = \case
DataOld old -> DataOld old
DataNew res producer -> DataNew res (hoistDataProducer hoistFn producer)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment