Commit 92cb0a6c authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[text-api] first rewrite using Conduit

NOTE This doesn't compile yet.
parent d54d5f06
Pipeline #2509 failed with stage
in 8 minutes and 43 seconds
......@@ -215,7 +215,8 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
-- if cid is corpus -> add to corpus
-- if cid is root -> create corpus in Private
txts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
-- TODO Sum lenghts of each txt elements
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just $ 1 + length txts
......
......@@ -16,6 +16,7 @@ Portability : POSIX
module Gargantext.API.Node.DocumentsFromWriteNodes
where
import Conduit
import Control.Lens ((^.))
import Data.Aeson
import Data.Either (Either(..), rights)
......@@ -100,7 +101,7 @@ documentsFromWriteNodes uId nId _p logStatus = do
let parsedE = (\(node, contents) -> hyperdataDocumentFromFrameWrite (node ^. node_hyperdata, contents)) <$> frameWritesWithContents
let parsed = rights parsedE
_ <- flowDataText (RootId (NodeId uId)) (DataNew [parsed]) (Multi EN) cId Nothing logStatus
_ <- flowDataText (RootId (NodeId uId)) (DataNew $ yield parsed) (Multi EN) cId Nothing logStatus
pure $ jobLogSuccess jobLog
------------------------------------------------------------------------
......
......@@ -18,6 +18,7 @@ module Gargantext.Core.Text.Corpus.API
)
where
import Conduit
import Data.Maybe
import Gargantext.API.Admin.Orchestrator.Types (ExternalAPIs(..), externalAPIs)
import Gargantext.Core (Lang(..))
......@@ -37,11 +38,18 @@ get :: ExternalAPIs
-> Lang
-> Query
-> Maybe Limit
-> IO [HyperdataDocument]
get PubMed _la q _l = PUBMED.get q default_limit -- EN only by default
-- -> IO [HyperdataDocument]
-> IO (ConduitT () HyperdataDocument IO ())
get PubMed _la q _l = do
res <- PUBMED.get q default_limit -- EN only by default
pure $ yieldMany res
get HAL la q _l = HAL.getC la q Nothing
get IsTex la q _l = ISTEX.get la q default_limit
get Isidore la q _l = ISIDORE.get la (fromIntegral <$> default_limit) (Just q) Nothing
get IsTex la q _l = do
res <- ISTEX.get la q default_limit
pure $ yieldMany res
get Isidore la q _l = do
res <- ISIDORE.get la (fromIntegral <$> default_limit) (Just q) Nothing
pure $ yieldMany res
get _ _ _ _ = undefined
-- | Some Sugar for the documentation
......
......@@ -30,12 +30,12 @@ get la q ml = do
eDocs <- HAL.getMetadataWith q (Just 0) ml
either (panic . pack . show) (\d -> mapM (toDoc' la) $ HAL._docs d) eDocs
getC :: Lang -> Text -> Maybe Integer -> IO [HyperdataDocument]
getC :: Lang -> Text -> Maybe Integer -> IO (ConduitT () HyperdataDocument IO ())
getC la q ml = do
eDocs <- HAL.getMetadataRecursively q (Just 0) ml
case eDocs of
Left err -> panic $ pack $ show err
Right docsC -> runConduit $ docsC .| mapMC (toDoc' la) .| sinkList
Right docsC -> pure $ docsC .| mapMC (toDoc' la)
toDoc' :: Lang -> HAL.Corpus -> IO HyperdataDocument
toDoc' la (HAL.Corpus i t ab d s aus affs struct_id) = do
......
......@@ -46,8 +46,10 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
)
where
import Conduit
import Control.Lens ((^.), view, _Just, makeLenses)
import Data.Aeson.TH (deriveJSON)
import Data.Conduit.Internal (zipSources)
import Data.Either
import Data.HashMap.Strict (HashMap)
import Data.Hashable (Hashable)
......@@ -103,6 +105,7 @@ import Gargantext.Prelude
import Gargantext.Prelude.Crypto.Hash (Hash)
import qualified Gargantext.Core.Text.Corpus.API as API
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
import qualified Prelude as Prelude
------------------------------------------------------------------------
-- Imports for upgrade function
......@@ -127,7 +130,8 @@ allDataOrigins = map InternalOrigin API.externalAPIs
---------------
data DataText = DataOld ![NodeId]
| DataNew ![[HyperdataDocument]]
| DataNew !(ConduitT () HyperdataDocument IO ())
-- | DataNew ![[HyperdataDocument]]
-- TODO use the split parameter in config file
getDataText :: FlowCmdM env err m
......@@ -136,9 +140,9 @@ getDataText :: FlowCmdM env err m
-> API.Query
-> Maybe API.Limit
-> m DataText
getDataText (ExternalOrigin api) la q li = liftBase $ DataNew
<$> splitEvery 500
<$> API.get api (_tt_lang la) q li
getDataText (ExternalOrigin api) la q li = liftBase $ do
docsC <- API.get api (_tt_lang la) q li
pure $ DataNew docsC
getDataText (InternalOrigin _) _la q _li = do
(_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
......@@ -161,7 +165,7 @@ flowDataText :: ( FlowCmdM env err m
flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
where
corpusType = (Nothing :: Maybe HyperdataCorpus)
flowDataText u (DataNew txt) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txt logStatus
flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txtC logStatus
------------------------------------------------------------------------
-- TODO use proxy
......@@ -173,8 +177,9 @@ flowAnnuaire :: (FlowCmdM env err m)
-> (JobLog -> m ())
-> m AnnuaireId
flowAnnuaire u n l filePath logStatus = do
docs <- liftBase $ (( splitEvery 500 <$> readFile_Annuaire filePath) :: IO [[HyperdataContact]])
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing docs logStatus
-- TODO Conduit for file
docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (yieldMany docs) logStatus
------------------------------------------------------------------------
flowCorpusFile :: (FlowCmdM env err m)
......@@ -189,8 +194,9 @@ flowCorpusFile u n l la ff fp mfslw logStatus = do
eParsed <- liftBase $ parseFile ff fp
case eParsed of
Right parsed -> do
let docs = splitEvery 500 $ take l parsed
flowCorpus u n la mfslw (map (map toHyperdataDocument) docs) logStatus
flowCorpus u n la mfslw (yieldMany parsed .| mapC toHyperdataDocument) logStatus
--let docs = splitEvery 500 $ take l parsed
--flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
Left e -> panic $ "Error: " <> (T.pack e)
------------------------------------------------------------------------
......@@ -201,7 +207,7 @@ flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
-> [[a]]
-> ConduitT () a IO ()
-> (JobLog -> m ())
-> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
......@@ -216,23 +222,36 @@ flow :: ( FlowCmdM env err m
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
-> [[a]]
-> ConduitT () a IO ()
-> (JobLog -> m ())
-> m CorpusId
flow c u cn la mfslw docs logStatus = do
flow c u cn la mfslw docsC logStatus = do
-- TODO if public insertMasterDocs else insertUserDocs
ids <- traverse (\(idx, doc) -> do
id <- insertMasterDocs c la doc
logStatus JobLog { _scst_succeeded = Just $ 1 + idx
, _scst_failed = Just 0
, _scst_remaining = Just $ length docs - idx
, _scst_events = Just []
}
pure id
) (zip [1..] docs)
flowCorpusUser (la ^. tt_lang) u cn c (concat ids) mfslw
ids <- liftBase $ runConduit $
zipSources (yieldMany [1..]) docsC
.| mapMC insertDoc
.| sinkList
-- ids <- traverse (\(idx, doc) -> do
-- id <- insertMasterDocs c la doc
-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ length docs - idx
-- , _scst_events = Just []
-- }
-- pure id
-- ) (zip [1..] docs)
flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
where
insertDoc (idx, doc) = do
id <- insertMasterDocs c la [doc]
-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ length docs - idx
-- , _scst_events = Just []
-- }
pure $ Prelude.head id
------------------------------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment