Commit cf184841 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[conduit] implement conduit for Hal, Pubmed

parent b16a5b54
Pipeline #2513 failed with stage
in 8 minutes and 37 seconds
......@@ -22,6 +22,7 @@ Portability : POSIX
module Gargantext.API.Node.Contact
where
import Conduit
import Data.Aeson
import Data.Either (Either(Right))
import Data.Maybe (Maybe(..))
......@@ -93,7 +94,7 @@ addContact u nId (AddContactParams fn ln) logStatus = do
, _scst_remaining = Just 1
, _scst_events = Just []
}
_ <- flow (Nothing :: Maybe HyperdataAnnuaire) u (Right [nId]) (Multi EN) Nothing [[hyperdataContact fn ln]] logStatus
_ <- flow (Nothing :: Maybe HyperdataAnnuaire) u (Right [nId]) (Multi EN) Nothing (Just 1, yield $ hyperdataContact fn ln) logStatus
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
......
......@@ -19,6 +19,7 @@ module Gargantext.API.Node.Corpus.New
where
import Conduit
import Control.Lens hiding (elements, Empty)
import Data.Aeson
import Data.Aeson.TH (deriveJSON)
......@@ -40,7 +41,7 @@ import Gargantext.Prelude
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, ScraperEvent(..), scst_events)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (jobLogSuccess, jobLogFailTotal, jobLogFailTotalWithMessage)
import Gargantext.API.Job (addEvent, jobLogSuccess, jobLogFailTotal, jobLogFailTotalWithMessage)
import Gargantext.API.Node.Corpus.New.File
import Gargantext.API.Node.Corpus.Searx
import Gargantext.API.Node.Corpus.Types
......@@ -214,25 +215,36 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
-- TODO if cid is folder -> create Corpus
-- if cid is corpus -> add to corpus
-- if cid is root -> create corpus in Private
txts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
-- TODO Sum lenghts of each txt elements
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just $ 1 + length txts
, _scst_events = Just []
}
cids <- mapM (\txt -> flowDataText user txt (Multi l) cid Nothing logStatus) txts
printDebug "corpus id" cids
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
-- TODO ...
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
eTxts <- mapM (\db -> getDataText db (Multi l) q maybeLimit) [database2origin dbs]
let lTxts = lefts eTxts
case lTxts of
[] -> do
let txts = rights eTxts
-- TODO Sum lenghts of each txt elements
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just $ 1 + length txts
, _scst_events = Just []
}
cids <- mapM (\txt -> flowDataText user txt (Multi l) cid Nothing logStatus) txts
printDebug "corpus id" cids
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user
-- TODO ...
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
(err:_) -> do
pure $ addEvent "ERROR" (T.pack $ show err) $
JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 1
, _scst_remaining = Just 0
, _scst_events = Just []
}
type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
......@@ -270,15 +282,16 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
_ -> cs d
eDocs <- liftBase $ parse data'
case eDocs of
Right docs' -> do
Right docs -> do
-- TODO Add progress (jobStatus) update for docs - this is a
-- long action
limit' <- view $ hasConfig . gc_max_docs_parsers
let limit = fromIntegral limit'
if length docs' > limit then do
printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show $ length docs')
if length docs > limit then do
printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show $ length docs)
let panicMsg' = [ "[addToCorpusWithForm] number of docs ("
, show $ length docs'
, show $ length docs
, ") exceeds the MAX_DOCS_PARSERS limit ("
, show limit
, ")" ]
......@@ -287,7 +300,6 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
panic panicMsg
else
pure ()
let docs = splitEvery 500 $ take limit docs'
printDebug "Parsing corpus finished : " cid
logStatus jobLog2
......@@ -298,7 +310,8 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
(Right [cid])
(Multi $ fromMaybe EN l)
Nothing
(map (map toHyperdataDocument) docs)
(Just $ fromIntegral $ length docs, yieldMany docs .| mapC toHyperdataDocument)
--(map (map toHyperdataDocument) docs)
logStatus
printDebug "Extraction finished : " cid
......
......@@ -101,7 +101,7 @@ documentsFromWriteNodes uId nId _p logStatus = do
let parsedE = (\(node, contents) -> hyperdataDocumentFromFrameWrite (node ^. node_hyperdata, contents)) <$> frameWritesWithContents
let parsed = rights parsedE
_ <- flowDataText (RootId (NodeId uId)) (DataNew $ yieldMany parsed) (Multi EN) cId Nothing logStatus
_ <- flowDataText (RootId (NodeId uId)) (DataNew (Just $ fromIntegral $ length parsed, yieldMany parsed)) (Multi EN) cId Nothing logStatus
pure $ jobLogSuccess jobLog
------------------------------------------------------------------------
......
......@@ -19,6 +19,7 @@ module Gargantext.Core.Text.Corpus.API
where
import Conduit
import Data.Either (Either(..))
import Data.Maybe
import Gargantext.API.Admin.Orchestrator.Types (ExternalAPIs(..), externalAPIs)
import Gargantext.Core (Lang(..))
......@@ -28,6 +29,7 @@ import qualified Gargantext.Core.Text.Corpus.API.Hal as HAL
import qualified Gargantext.Core.Text.Corpus.API.Isidore as ISIDORE
import qualified Gargantext.Core.Text.Corpus.API.Istex as ISTEX
import qualified Gargantext.Core.Text.Corpus.API.Pubmed as PUBMED
import Servant.Client (ClientError)
-- | TODO put in gargantext.init
default_limit :: Maybe Integer
......@@ -39,17 +41,17 @@ get :: ExternalAPIs
-> Query
-> Maybe Limit
-- -> IO [HyperdataDocument]
-> IO (ConduitT () HyperdataDocument IO ())
get PubMed _la q _l = do
res <- PUBMED.get q default_limit -- EN only by default
pure $ yieldMany res
-> IO (Either ClientError (Maybe Integer, ConduitT () HyperdataDocument IO ()))
get PubMed _la q _l = PUBMED.get q Nothing
--docs <- PUBMED.get q default_limit -- EN only by default
--pure (Just $ fromIntegral $ length docs, yieldMany docs)
get HAL la q _l = HAL.getC la q Nothing
get IsTex la q _l = do
res <- ISTEX.get la q default_limit
pure $ yieldMany res
docs <- ISTEX.get la q default_limit
pure $ Right (Just $ fromIntegral $ length docs, yieldMany docs)
get Isidore la q _l = do
res <- ISIDORE.get la (fromIntegral <$> default_limit) (Just q) Nothing
pure $ yieldMany res
docs <- ISIDORE.get la (fromIntegral <$> default_limit) (Just q) Nothing
pure $ Right (Just $ fromIntegral $ length docs, yieldMany docs)
get _ _ _ _ = undefined
-- | Some Sugar for the documentation
......
......@@ -16,6 +16,7 @@ import Conduit
import Data.Either
import Data.Maybe
import Data.Text (Text, pack, intercalate)
import Servant.Client (ClientError)
import Gargantext.Core (Lang(..))
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataDocument(..))
......@@ -30,12 +31,13 @@ get la q ml = do
eDocs <- HAL.getMetadataWith q (Just 0) ml
either (panic . pack . show) (\d -> mapM (toDoc' la) $ HAL._docs d) eDocs
getC :: Lang -> Text -> Maybe Integer -> IO (ConduitT () HyperdataDocument IO ())
getC :: Lang -> Text -> Maybe Integer -> IO (Either ClientError (Maybe Integer, ConduitT () HyperdataDocument IO ()))
getC la q ml = do
eDocs <- HAL.getMetadataRecursively q (Just 0) ml
case eDocs of
Left err -> panic $ pack $ show err
Right docsC -> pure $ docsC .| mapMC (toDoc' la)
eRes <- HAL.getMetadataWithC q (Just 0) ml
pure $ (\(len, docsC) -> (len, docsC .| mapMC (toDoc' la))) <$> eRes
-- case eRes of
-- Left err -> panic $ pack $ show err
-- Right (len, docsC) -> pure (len, docsC .| mapMC (toDoc' la))
toDoc' :: Lang -> HAL.Corpus -> IO HyperdataDocument
toDoc' la (HAL.Corpus i t ab d s aus affs struct_id) = do
......
......@@ -13,9 +13,12 @@ Portability : POSIX
module Gargantext.Core.Text.Corpus.API.Pubmed
where
import Conduit
import Data.Either (Either)
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as Text
import Servant.Client (ClientError)
import Gargantext.Prelude
import Gargantext.Core (Lang(..))
......@@ -31,9 +34,12 @@ type Limit = PubMed.Limit
-- | TODO put default pubmed query in gargantext.ini
-- by default: 10K docs
get :: Query -> Maybe Limit -> IO [HyperdataDocument]
get q l = either (\e -> panic $ "CRAWL: PubMed" <> e) (map (toDoc EN))
<$> PubMed.getMetadataWith q l
get :: Query -> Maybe Limit -> IO (Either ClientError (Maybe Integer, ConduitT () HyperdataDocument IO ()))
get q l = do
eRes <- PubMed.getMetadataWithC q l
pure $ (\(len, docsC) -> (len, docsC .| mapC (toDoc EN))) <$> eRes
--either (\e -> panic $ "CRAWL: PubMed" <> e) (map (toDoc EN))
-- <$> PubMed.getMetadataWithC q l
toDoc :: Lang -> PubMedDoc.PubMed -> HyperdataDocument
toDoc l (PubMedDoc.PubMed (PubMedDoc.PubMedArticle t j as aus)
......
......@@ -62,6 +62,7 @@ import qualified Data.Text as T
import Data.Traversable (traverse)
import Data.Tuple.Extra (first, second)
import GHC.Generics (Generic)
import Servant.Client (ClientError)
import System.FilePath (FilePath)
import qualified Data.HashMap.Strict as HashMap
import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
......@@ -130,7 +131,7 @@ allDataOrigins = map InternalOrigin API.externalAPIs
---------------
data DataText = DataOld ![NodeId]
| DataNew !(ConduitT () HyperdataDocument IO ())
| DataNew !(Maybe Integer, ConduitT () HyperdataDocument IO ())
-- | DataNew ![[HyperdataDocument]]
-- TODO use the split parameter in config file
......@@ -139,10 +140,10 @@ getDataText :: FlowCmdM env err m
-> TermType Lang
-> API.Query
-> Maybe API.Limit
-> m DataText
-> m (Either ClientError DataText)
getDataText (ExternalOrigin api) la q li = liftBase $ do
docsC <- API.get api (_tt_lang la) q li
pure $ DataNew docsC
eRes <- API.get api (_tt_lang la) q li
pure $ DataNew <$> eRes
getDataText (InternalOrigin _) _la q _li = do
(_masterUserId, _masterRootId, cId) <- getOrMk_RootWithCorpus
......@@ -150,7 +151,7 @@ getDataText (InternalOrigin _) _la q _li = do
(Left "")
(Nothing :: Maybe HyperdataCorpus)
ids <- map fst <$> searchDocInDatabase cId (stemIt q)
pure $ DataOld ids
pure $ Right $ DataOld ids
-------------------------------------------------------------------------------
flowDataText :: forall env err m.
......@@ -166,7 +167,8 @@ flowDataText :: forall env err m.
flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
where
corpusType = (Nothing :: Maybe HyperdataCorpus)
flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw (transPipe liftBase txtC) logStatus
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw logStatus =
flowCorpus u (Right [cid]) tt mfslw (mLen, transPipe liftBase txtC) logStatus
------------------------------------------------------------------------
-- TODO use proxy
......@@ -180,7 +182,7 @@ flowAnnuaire :: (FlowCmdM env err m)
flowAnnuaire u n l filePath logStatus = do
-- TODO Conduit for file
docs <- liftBase $ ((readFile_Annuaire filePath) :: IO [HyperdataContact])
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (yieldMany docs) logStatus
flow (Nothing :: Maybe HyperdataAnnuaire) u n l Nothing (Just $ fromIntegral $ length docs, yieldMany docs) logStatus
------------------------------------------------------------------------
flowCorpusFile :: (FlowCmdM env err m)
......@@ -195,7 +197,7 @@ flowCorpusFile u n _l la ff fp mfslw logStatus = do
eParsed <- liftBase $ parseFile ff fp
case eParsed of
Right parsed -> do
flowCorpus u n la mfslw (yieldMany parsed .| mapC toHyperdataDocument) logStatus
flowCorpus u n la mfslw (Just $ fromIntegral $ length parsed, yieldMany parsed .| mapC toHyperdataDocument) logStatus
--let docs = splitEvery 500 $ take l parsed
--flowCorpus u n la mfslw (yieldMany $ map (map toHyperdataDocument) docs) logStatus
Left e -> panic $ "Error: " <> (T.pack e)
......@@ -208,7 +210,7 @@ flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
-> ConduitT () a m ()
-> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
......@@ -224,10 +226,10 @@ flow :: forall env err m a c.
-> Either CorpusName [CorpusId]
-> TermType Lang
-> Maybe FlowSocialListWith
-> ConduitT () a m ()
-> (Maybe Integer, ConduitT () a m ())
-> (JobLog -> m ())
-> m CorpusId
flow c u cn la mfslw docsC _logStatus = do
flow c u cn la mfslw (mLength, docsC) logStatus = do
-- TODO if public insertMasterDocs else insertUserDocs
ids <- runConduit $
zipSources (yieldMany [1..]) docsC
......@@ -245,14 +247,17 @@ flow c u cn la mfslw docsC _logStatus = do
flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
where
insertDoc :: (Int, a) -> m NodeId
insertDoc (_idx, doc) = do
insertDoc :: (Integer, a) -> m NodeId
insertDoc (idx, doc) = do
id <- insertMasterDocs c la [doc]
-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ length docs - idx
-- , _scst_events = Just []
-- }
case mLength of
Nothing -> pure ()
Just len ->
logStatus JobLog { _scst_succeeded = Just $ fromIntegral $ 1 + idx
, _scst_failed = Just 0
, _scst_remaining = Just $ fromIntegral $ len - idx
, _scst_events = Just []
}
pure $ Prelude.head id
......@@ -272,7 +277,7 @@ flowCorpusUser l user corpusName ctype ids mfslw = do
-- User Flow
(userId, _rootId, userCorpusId) <- getOrMk_RootWithCorpus user corpusName ctype
-- NodeTexts is first
_tId <- insertDefaultNode NodeTexts userCorpusId userId
_tId <- insertDefaultNodeIfNotExists NodeTexts userCorpusId userId
-- printDebug "NodeTexts: " tId
-- NodeList is second
......@@ -298,8 +303,8 @@ flowCorpusUser l user corpusName ctype ids mfslw = do
-- _ <- insertOccsUpdates userCorpusId mastListId
-- printDebug "userListId" userListId
-- User Graph Flow
_ <- insertDefaultNode NodeDashboard userCorpusId userId
_ <- insertDefaultNode NodeGraph userCorpusId userId
_ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
_ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
--_ <- mkPhylo userCorpusId userId
-- Annuaire Flow
-- _ <- mkAnnuaire rootUserId userId
......@@ -344,7 +349,7 @@ saveDocNgramsWith :: ( FlowCmdM env err m)
-> m ()
saveDocNgramsWith lId mapNgramsDocs' = do
terms2id <- insertExtractedNgrams $ HashMap.keys mapNgramsDocs'
printDebug "terms2id" terms2id
--printDebug "terms2id" terms2id
let mapNgramsDocs = HashMap.mapKeys extracted2ngrams mapNgramsDocs'
......@@ -353,7 +358,7 @@ saveDocNgramsWith lId mapNgramsDocs' = do
$ map (first _ngramsTerms . second Map.keys)
$ HashMap.toList mapNgramsDocs
printDebug "saveDocNgramsWith" mapCgramsId
--printDebug "saveDocNgramsWith" mapCgramsId
-- insertDocNgrams
_return <- insertContextNodeNgrams2
$ catMaybes [ ContextNodeNgrams2 <$> Just nId
......
......@@ -255,6 +255,14 @@ insertDefaultNode :: HasDBid NodeType
=> NodeType -> ParentId -> UserId -> Cmd err [NodeId]
insertDefaultNode nt p u = insertNode nt Nothing Nothing p u
insertDefaultNodeIfNotExists :: HasDBid NodeType
=> NodeType -> ParentId -> UserId -> Cmd err [NodeId]
insertDefaultNodeIfNotExists nt p u = do
children <- getChildrenByType p nt
case children of
[] -> insertDefaultNode nt p u
xs -> pure xs
insertNode :: HasDBid NodeType
=> NodeType -> Maybe Name -> Maybe DefaultHyperdata -> ParentId -> UserId -> Cmd err [NodeId]
insertNode nt n h p u = insertNodesR [nodeW nt n h p u]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment