[flow] add chunk size to toml settings

Also, implement pure `mapConcurrently` in extract ngrams, as it's
closely related to chunk size.
parent 76fa3e75
Pipeline #7927 passed with stages
in 114 minutes and 37 seconds
...@@ -88,7 +88,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -88,7 +88,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _wsLongJobTimeout = 3000 , _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsAdditionalDelayAfterRead = 5 , _wsAdditionalDelayAfterRead = 5
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"}
, _wsNlpConduitChunkSize = 10 }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
_lc_log_level = INFO _lc_log_level = INFO
, _lc_log_file = Nothing , _lc_log_file = Nothing
......
...@@ -166,6 +166,10 @@ default_job_timeout = 60 ...@@ -166,6 +166,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
long_job_timeout = 3000 long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# if you leave the same credentials as in [database] section above, # if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database # workers will try to set up the `gargantext_pgmq` database
# automatically # automatically
......
...@@ -510,8 +510,6 @@ library ...@@ -510,8 +510,6 @@ library
, ansi-terminal , ansi-terminal
, array ^>= 0.5.4.0 , array ^>= 0.5.4.0
, async ^>= 2.2.4 , async ^>= 2.2.4
, lifted-async
, lifted-base
, attoparsec ^>= 0.14.4 , attoparsec ^>= 0.14.4
, base64-bytestring ^>= 1.2.1.0 , base64-bytestring ^>= 1.2.1.0
, bimap >= 0.5.0 , bimap >= 0.5.0
...@@ -572,6 +570,7 @@ library ...@@ -572,6 +570,7 @@ library
, json-stream ^>= 0.4.2.4 , json-stream ^>= 0.4.2.4
, lens >= 5.2.2 && < 5.3 , lens >= 5.2.2 && < 5.3
, lens-aeson < 1.3 , lens-aeson < 1.3
, lifted-async >= 0.10 && < 0.12
, list-zipper , list-zipper
, massiv < 1.1 , massiv < 1.1
, matrix ^>= 0.3.6.1 , matrix ^>= 0.3.6.1
......
...@@ -9,19 +9,12 @@ Portability : POSIX ...@@ -9,19 +9,12 @@ Portability : POSIX
-} -}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Core.Config.Ini.NLP ( module Gargantext.Core.Config.Ini.NLP (
-- * Types -- * Types
NLPConfig(..) NLPConfig(..)
-- * Utility functions -- * Utility functions
, readConfig , readConfig
-- * Lenses
, nlp_default
, nlp_languages
) )
where where
...@@ -59,4 +52,3 @@ readConfig fp = do ...@@ -59,4 +52,3 @@ readConfig fp = do
, T.pack $ show m_nlp_other ] , T.pack $ show m_nlp_other ]
Just ret -> pure ret Just ret -> pure ret
makeLenses ''NLPConfig
...@@ -19,7 +19,6 @@ module Gargantext.Core.Config.NLP ( ...@@ -19,7 +19,6 @@ module Gargantext.Core.Config.NLP (
-- * Lenses -- * Lenses
, nlp_default , nlp_default
, nlp_languages , nlp_languages
) )
where where
...@@ -48,9 +47,9 @@ data NLPConfig = NLPConfig { _nlp_default :: URI ...@@ -48,9 +47,9 @@ data NLPConfig = NLPConfig { _nlp_default :: URI
instance FromValue NLPConfig where instance FromValue NLPConfig where
fromValue v = do fromValue v = do
_nlp_default <- parseTableFromValue (reqKey "EN") v _nlp_default <- parseTableFromValue (reqKey "EN") v
-- _nlp_languages <- fromValue <$> getTable
MkTable t <- parseTableFromValue getTable v MkTable t <- parseTableFromValue getTable v
_nlp_languages <- mapM fromValue (snd <$> t) _nlp_languages <- mapM fromValue (snd <$> t)
return $ NLPConfig { .. } return $ NLPConfig { .. }
instance ToValue NLPConfig where instance ToValue NLPConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -58,7 +57,7 @@ instance ToTable NLPConfig where ...@@ -58,7 +57,7 @@ instance ToTable NLPConfig where
toTable (NLPConfig { .. }) = toTable (NLPConfig { .. }) =
table ([ k .= v | (k, v) <- Map.toList _nlp_languages ] table ([ k .= v | (k, v) <- Map.toList _nlp_languages ]
-- output the default "EN" language as well -- output the default "EN" language as well
<> [ ("EN" :: Text) .= _nlp_default ]) <> [ ("EN" :: Text) .= _nlp_default ] )
-- readConfig :: SettingsFile -> IO NLPConfig -- readConfig :: SettingsFile -> IO NLPConfig
......
...@@ -53,6 +53,8 @@ data WorkerSettings = ...@@ -53,6 +53,8 @@ data WorkerSettings =
, _wsDefaultDelay :: B.TimeoutS , _wsDefaultDelay :: B.TimeoutS
, _wsAdditionalDelayAfterRead :: B.TimeoutS , _wsAdditionalDelayAfterRead :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition] , _wsDefinitions :: ![WorkerDefinition]
, _wsNlpConduitChunkSize :: Int
} deriving (Show, Eq) } deriving (Show, Eq)
instance FromValue WorkerSettings where instance FromValue WorkerSettings where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
...@@ -61,6 +63,7 @@ instance FromValue WorkerSettings where ...@@ -61,6 +63,7 @@ instance FromValue WorkerSettings where
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout" _wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
_wsDefaultJobTimeout <- reqKey "default_job_timeout" _wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout" _wsLongJobTimeout <- reqKey "long_job_timeout"
_wsNlpConduitChunkSize <- reqKey "nlp_conduit_chunk_size"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
additionalDelayAfterRead <- reqKey "additional_delay_after_read" additionalDelayAfterRead <- reqKey "additional_delay_after_read"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
...@@ -69,7 +72,8 @@ instance FromValue WorkerSettings where ...@@ -69,7 +72,8 @@ instance FromValue WorkerSettings where
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay , _wsDefaultDelay = B.TimeoutS defaultDelay
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead } , _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead
, _wsNlpConduitChunkSize }
instance ToValue WorkerSettings where instance ToValue WorkerSettings where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
...@@ -80,7 +84,8 @@ instance ToTable WorkerSettings where ...@@ -80,7 +84,8 @@ instance ToTable WorkerSettings where
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead , "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions
, "nlp_conduit_chunk_size" .= _wsNlpConduitChunkSize ]
data WorkerDefinition = data WorkerDefinition =
WorkerDefinition { WorkerDefinition {
......
...@@ -56,8 +56,6 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -56,8 +56,6 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
import Conduit import Conduit
import Control.Concurrent.Async.Lifted qualified as AsyncL import Control.Concurrent.Async.Lifted qualified as AsyncL
import Control.Concurrent.QSem.Lifted qualified as QSemL
import Control.Exception.Lifted qualified as CEL
import Control.Exception.Safe qualified as CES import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch) import Control.Exception.Safe (catch, MonadCatch)
...@@ -73,7 +71,8 @@ import Data.Text qualified as T ...@@ -73,7 +71,8 @@ import Data.Text qualified as T
import Gargantext.API.Ngrams.Tools (getTermsWith) import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.API.Ngrams.Types (NgramsTerm) import Gargantext.API.Ngrams.Types (NgramsTerm)
import Gargantext.Core (Lang(..), withDefaultLanguage, NLPServerConfig) import Gargantext.Core (Lang(..), withDefaultLanguage, NLPServerConfig)
import Gargantext.Core.Config (GargConfig(..), hasConfig) import Gargantext.Core.Config (GargConfig(..), hasConfig, gc_worker)
import Gargantext.Core.Config.Worker (wsNlpConduitChunkSize)
import Gargantext.Core.Config.Types (APIsConfig(..)) import Gargantext.Core.Config.Types (APIsConfig(..))
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire) import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet) import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
...@@ -101,7 +100,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact ) ...@@ -101,7 +100,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) ) import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) )
import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument ) import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument )
import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId) import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude import Gargantext.Database.Class ( DBCmdWithEnv, IsDBCmd )
import Gargantext.Database.Transactional ( DBUpdate, runDBTx )
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 ) import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith ) import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add) import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add)
...@@ -111,8 +111,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId) ...@@ -111,8 +111,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId)
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId) import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser) import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser)
import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId ) import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId )
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node ( NodePoly(_node_id, _node_hash_id), node_hyperdata )
import Gargantext.Database.Types import Gargantext.Database.Types ( Indexed(Indexed) )
import Gargantext.Prelude hiding (catch, onException, to) import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr ) import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr )
...@@ -299,11 +299,12 @@ flow :: forall env err m a c. ...@@ -299,11 +299,12 @@ flow :: forall env err m a c.
-> m CorpusId -> m CorpusId
flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
cfg <- view hasConfig cfg <- view hasConfig
let chunkSize = cfg ^. gc_worker . wsNlpConduitChunkSize
(_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c (_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c
forM_ msgs ce_notify forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 10 .| CList.chunksOf chunkSize
.| mapM_C (addDocumentsWithProgress userCorpusId) .| mapM_C (addDocumentsWithProgress userCorpusId)
.| sinkNull) `CES.catches` .| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do [ CES.Handler $ \(e :: ClientError) -> do
...@@ -548,11 +549,12 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -548,11 +549,12 @@ extractNgramsFromDocuments :: forall doc env err m.
-> [doc] -> [doc]
-> m (UncommittedNgrams doc) -> m (UncommittedNgrams doc)
extractNgramsFromDocuments nlpServer lang docs = do extractNgramsFromDocuments nlpServer lang docs = do
sem <- QSemL.newQSem 10 ret <- AsyncL.mapConcurrently (extractNgramsFromDocument nlpServer lang) docs
let f = extractNgramsFromDocument nlpServer lang -- sem <- QSemL.newQSem 10
ret <- AsyncL.mapConcurrently (\doc -> -- let f = extractNgramsFromDocument nlpServer lang
CEL.bracket_ (QSemL.waitQSem sem) (QSemL.signalQSem sem) (f doc) -- ret <- AsyncL.mapConcurrently (\doc ->
) docs -- CEL.bracket_ (QSemL.waitQSem sem) (QSemL.signalQSem sem) (f doc)
-- ) docs
pure $ foldl (<>) mempty ret pure $ foldl (<>) mempty ret
-- foldlM go mempty docs -- foldlM go mempty docs
-- where -- where
......
...@@ -99,6 +99,10 @@ default_job_timeout = 60 ...@@ -99,6 +99,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds) # default timeout for "long" jobs (in seconds)
long_job_timeout = 3000 long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment