[flow] PoC for threaded flow

parent c767088d
Pipeline #7925 passed with stages
in 144 minutes and 38 seconds
...@@ -510,6 +510,8 @@ library ...@@ -510,6 +510,8 @@ library
, ansi-terminal , ansi-terminal
, array ^>= 0.5.4.0 , array ^>= 0.5.4.0
, async ^>= 2.2.4 , async ^>= 2.2.4
, lifted-async
, lifted-base
, attoparsec ^>= 0.14.4 , attoparsec ^>= 0.14.4
, base64-bytestring ^>= 1.2.1.0 , base64-bytestring ^>= 1.2.1.0
, bimap >= 0.5.0 , bimap >= 0.5.0
......
...@@ -55,6 +55,9 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -55,6 +55,9 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where where
import Conduit import Conduit
import Control.Concurrent.Async.Lifted qualified as AsyncL
import Control.Concurrent.QSem.Lifted qualified as QSemL
import Control.Exception.Lifted qualified as CEL
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch) import Control.Exception.Safe (catch, MonadCatch)
...@@ -300,7 +303,7 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do ...@@ -300,7 +303,7 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
forM_ msgs ce_notify forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 5 .| CList.chunksOf 10
.| mapM_C (addDocumentsWithProgress userCorpusId) .| mapM_C (addDocumentsWithProgress userCorpusId)
.| sinkNull) `CES.catches` .| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do [ CES.Handler $ \(e :: ClientError) -> do
...@@ -544,13 +547,19 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -544,13 +547,19 @@ extractNgramsFromDocuments :: forall doc env err m.
-> TermType Lang -> TermType Lang
-> [doc] -> [doc]
-> m (UncommittedNgrams doc) -> m (UncommittedNgrams doc)
extractNgramsFromDocuments nlpServer lang docs = extractNgramsFromDocuments nlpServer lang docs = do
foldlM go mempty docs sem <- QSemL.newQSem 10
where let f = extractNgramsFromDocument nlpServer lang
go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc) ret <- AsyncL.mapConcurrently (\doc ->
go !acc inputDoc = do CEL.bracket_ (QSemL.waitQSem sem) (QSemL.signalQSem sem) (f doc)
ngrams <- extractNgramsFromDocument nlpServer lang inputDoc ) docs
pure $ acc <> ngrams pure $ foldl (<>) mempty ret
-- foldlM go mempty docs
-- where
-- go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc)
-- go !acc inputDoc = do
-- ngrams <- extractNgramsFromDocument nlpServer lang inputDoc
-- pure $ acc <> ngrams
commitNgramsForDocuments :: UniqParameters doc commitNgramsForDocuments :: UniqParameters doc
=> UncommittedNgrams doc => UncommittedNgrams doc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment