Commit e7ce7bcb authored by Fabien Maniere's avatar Fabien Maniere

Merge branch 'dev-threaded-flow-poc' into 'dev'

Concurrent queries in NLP

See merge request !451
parents 3e57a2fe d90467da
Pipeline #7937 passed with stages
in 59 minutes and 28 seconds
...@@ -88,7 +88,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -88,7 +88,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _wsLongJobTimeout = 3000 , _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsAdditionalDelayAfterRead = 5 , _wsAdditionalDelayAfterRead = 5
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"}
, _wsNlpConduitChunkSize = 10 }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
_lc_log_level = INFO _lc_log_level = INFO
, _lc_log_file = Nothing , _lc_log_file = Nothing
......
...@@ -166,6 +166,10 @@ default_job_timeout = 60 ...@@ -166,6 +166,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
long_job_timeout = 3000 long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# if you leave the same credentials as in [database] section above, # if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database # workers will try to set up the `gargantext_pgmq` database
# automatically # automatically
......
...@@ -570,6 +570,7 @@ library ...@@ -570,6 +570,7 @@ library
, json-stream ^>= 0.4.2.4 , json-stream ^>= 0.4.2.4
, lens >= 5.2.2 && < 5.3 , lens >= 5.2.2 && < 5.3
, lens-aeson < 1.3 , lens-aeson < 1.3
, lifted-async >= 0.10 && < 0.12
, list-zipper , list-zipper
, massiv < 1.1 , massiv < 1.1
, matrix ^>= 0.3.6.1 , matrix ^>= 0.3.6.1
......
...@@ -9,19 +9,12 @@ Portability : POSIX ...@@ -9,19 +9,12 @@ Portability : POSIX
-} -}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Core.Config.Ini.NLP ( module Gargantext.Core.Config.Ini.NLP (
-- * Types -- * Types
NLPConfig(..) NLPConfig(..)
-- * Utility functions -- * Utility functions
, readConfig , readConfig
-- * Lenses
, nlp_default
, nlp_languages
) )
where where
...@@ -59,4 +52,3 @@ readConfig fp = do ...@@ -59,4 +52,3 @@ readConfig fp = do
, T.pack $ show m_nlp_other ] , T.pack $ show m_nlp_other ]
Just ret -> pure ret Just ret -> pure ret
makeLenses ''NLPConfig
...@@ -19,7 +19,6 @@ module Gargantext.Core.Config.NLP ( ...@@ -19,7 +19,6 @@ module Gargantext.Core.Config.NLP (
-- * Lenses -- * Lenses
, nlp_default , nlp_default
, nlp_languages , nlp_languages
) )
where where
...@@ -48,9 +47,9 @@ data NLPConfig = NLPConfig { _nlp_default :: URI ...@@ -48,9 +47,9 @@ data NLPConfig = NLPConfig { _nlp_default :: URI
instance FromValue NLPConfig where instance FromValue NLPConfig where
fromValue v = do fromValue v = do
_nlp_default <- parseTableFromValue (reqKey "EN") v _nlp_default <- parseTableFromValue (reqKey "EN") v
-- _nlp_languages <- fromValue <$> getTable
MkTable t <- parseTableFromValue getTable v MkTable t <- parseTableFromValue getTable v
_nlp_languages <- mapM fromValue (snd <$> t) _nlp_languages <- mapM fromValue (snd <$> t)
return $ NLPConfig { .. } return $ NLPConfig { .. }
instance ToValue NLPConfig where instance ToValue NLPConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -58,7 +57,7 @@ instance ToTable NLPConfig where ...@@ -58,7 +57,7 @@ instance ToTable NLPConfig where
toTable (NLPConfig { .. }) = toTable (NLPConfig { .. }) =
table ([ k .= v | (k, v) <- Map.toList _nlp_languages ] table ([ k .= v | (k, v) <- Map.toList _nlp_languages ]
-- output the default "EN" language as well -- output the default "EN" language as well
<> [ ("EN" :: Text) .= _nlp_default ]) <> [ ("EN" :: Text) .= _nlp_default ] )
-- readConfig :: SettingsFile -> IO NLPConfig -- readConfig :: SettingsFile -> IO NLPConfig
......
...@@ -53,6 +53,8 @@ data WorkerSettings = ...@@ -53,6 +53,8 @@ data WorkerSettings =
, _wsDefaultDelay :: B.TimeoutS , _wsDefaultDelay :: B.TimeoutS
, _wsAdditionalDelayAfterRead :: B.TimeoutS , _wsAdditionalDelayAfterRead :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition] , _wsDefinitions :: ![WorkerDefinition]
, _wsNlpConduitChunkSize :: Int
} deriving (Show, Eq) } deriving (Show, Eq)
instance FromValue WorkerSettings where instance FromValue WorkerSettings where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
...@@ -61,6 +63,7 @@ instance FromValue WorkerSettings where ...@@ -61,6 +63,7 @@ instance FromValue WorkerSettings where
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout" _wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
_wsDefaultJobTimeout <- reqKey "default_job_timeout" _wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout" _wsLongJobTimeout <- reqKey "long_job_timeout"
_wsNlpConduitChunkSize <- reqKey "nlp_conduit_chunk_size"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
additionalDelayAfterRead <- reqKey "additional_delay_after_read" additionalDelayAfterRead <- reqKey "additional_delay_after_read"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
...@@ -69,7 +72,8 @@ instance FromValue WorkerSettings where ...@@ -69,7 +72,8 @@ instance FromValue WorkerSettings where
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay , _wsDefaultDelay = B.TimeoutS defaultDelay
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead } , _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead
, _wsNlpConduitChunkSize }
instance ToValue WorkerSettings where instance ToValue WorkerSettings where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
...@@ -80,7 +84,8 @@ instance ToTable WorkerSettings where ...@@ -80,7 +84,8 @@ instance ToTable WorkerSettings where
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead , "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions
, "nlp_conduit_chunk_size" .= _wsNlpConduitChunkSize ]
data WorkerDefinition = data WorkerDefinition =
WorkerDefinition { WorkerDefinition {
......
...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where where
import Conduit import Conduit
import Control.Concurrent.Async.Lifted qualified as AsyncL
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch) import Control.Exception.Safe (catch, MonadCatch)
...@@ -70,7 +71,8 @@ import Data.Text qualified as T ...@@ -70,7 +71,8 @@ import Data.Text qualified as T
import Gargantext.API.Ngrams.Tools (getTermsWith) import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.API.Ngrams.Types (NgramsTerm) import Gargantext.API.Ngrams.Types (NgramsTerm)
import Gargantext.Core (Lang(..), withDefaultLanguage, NLPServerConfig) import Gargantext.Core (Lang(..), withDefaultLanguage, NLPServerConfig)
import Gargantext.Core.Config (GargConfig(..), hasConfig) import Gargantext.Core.Config (GargConfig(..), hasConfig, gc_worker)
import Gargantext.Core.Config.Worker (wsNlpConduitChunkSize)
import Gargantext.Core.Config.Types (APIsConfig(..)) import Gargantext.Core.Config.Types (APIsConfig(..))
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire) import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet) import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
...@@ -98,7 +100,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact ) ...@@ -98,7 +100,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) ) import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) )
import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument ) import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument )
import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId) import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude import Gargantext.Database.Class ( DBCmdWithEnv, IsDBCmd )
import Gargantext.Database.Transactional ( DBUpdate, runDBTx )
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 ) import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith ) import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add) import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add)
...@@ -108,8 +111,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId) ...@@ -108,8 +111,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId)
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId) import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser) import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser)
import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId ) import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId )
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node ( NodePoly(_node_id, _node_hash_id), node_hyperdata )
import Gargantext.Database.Types import Gargantext.Database.Types ( Indexed(Indexed) )
import Gargantext.Prelude hiding (catch, onException, to) import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr ) import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr )
...@@ -296,11 +299,12 @@ flow :: forall env err m a c. ...@@ -296,11 +299,12 @@ flow :: forall env err m a c.
-> m CorpusId -> m CorpusId
flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
cfg <- view hasConfig cfg <- view hasConfig
let chunkSize = cfg ^. gc_worker . wsNlpConduitChunkSize
(_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c (_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c
forM_ msgs ce_notify forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 5 .| CList.chunksOf chunkSize
.| mapM_C (addDocumentsWithProgress userCorpusId) .| mapM_C (addDocumentsWithProgress userCorpusId)
.| sinkNull) `CES.catches` .| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do [ CES.Handler $ \(e :: ClientError) -> do
...@@ -544,13 +548,20 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -544,13 +548,20 @@ extractNgramsFromDocuments :: forall doc env err m.
-> TermType Lang -> TermType Lang
-> [doc] -> [doc]
-> m (UncommittedNgrams doc) -> m (UncommittedNgrams doc)
extractNgramsFromDocuments nlpServer lang docs = extractNgramsFromDocuments nlpServer lang docs = do
foldlM go mempty docs ret <- AsyncL.mapConcurrently (extractNgramsFromDocument nlpServer lang) docs
where -- sem <- QSemL.newQSem 10
go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc) -- let f = extractNgramsFromDocument nlpServer lang
go !acc inputDoc = do -- ret <- AsyncL.mapConcurrently (\doc ->
ngrams <- extractNgramsFromDocument nlpServer lang inputDoc -- CEL.bracket_ (QSemL.waitQSem sem) (QSemL.signalQSem sem) (f doc)
pure $ acc <> ngrams -- ) docs
pure $ foldl (<>) mempty ret
-- foldlM go mempty docs
-- where
-- go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc)
-- go !acc inputDoc = do
-- ngrams <- extractNgramsFromDocument nlpServer lang inputDoc
-- pure $ acc <> ngrams
commitNgramsForDocuments :: UniqParameters doc commitNgramsForDocuments :: UniqParameters doc
=> UncommittedNgrams doc => UncommittedNgrams doc
......
...@@ -99,6 +99,10 @@ default_job_timeout = 60 ...@@ -99,6 +99,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
long_job_timeout = 3000 long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment