Commit 4739268a authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Merge remote-tracking branch 'refs/remotes/origin/adinapoli/issue-513' into adinapoli/issue-513

parents d473eb5b ea615b2e
......@@ -88,7 +88,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0
, _wsAdditionalDelayAfterRead = 5
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"}
, _wsNlpConduitChunkSize = 10 }
, _gc_logging = Config.LogConfig {
_lc_log_level = INFO
, _lc_log_file = Nothing
......
......@@ -166,6 +166,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database
# automatically
......
......@@ -570,6 +570,7 @@ library
, json-stream ^>= 0.4.2.4
, lens >= 5.2.2 && < 5.3
, lens-aeson < 1.3
, lifted-async >= 0.10 && < 0.12
, list-zipper
, massiv < 1.1
, matrix ^>= 0.3.6.1
......@@ -753,6 +754,7 @@ common commonTestDependencies
, generic-arbitrary >= 1.0.1 && < 2
, graphviz ^>= 2999.20.1.0
, haskell-bee
, haskell-bee-pgmq
, hspec ^>= 2.11.1
, hspec-expectations >= 0.8 && < 0.9
, hspec-expectations-lifted < 0.11
......
......@@ -23,7 +23,7 @@ module Gargantext.API.Job (
, addWarningEvent
) where
import Control.Lens (over, _Just)
import Control.Lens ((%~), over, _Just)
import Data.Text qualified as T
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.Prelude
......@@ -66,7 +66,12 @@ jobLogComplete jl =
& over scst_remaining (const (Just 0))
jobLogAddMore :: Int -> JobLog -> JobLog
jobLogAddMore moreSteps jl = jl & over (scst_remaining . _Just) (+ moreSteps)
jobLogAddMore moreSteps jl =
jl & scst_remaining %~ (maybe (Just 0) Just)
& scst_succeeded %~ (maybe (Just 0) Just)
& scst_failed %~ (maybe (Just 0) Just)
& scst_events %~ (maybe (Just []) Just)
& (scst_remaining . _Just) %~ (+ moreSteps)
jobLogFailures :: Int -> JobLog -> JobLog
jobLogFailures n jl = over (scst_failed . _Just) (+ n) $
......
......@@ -131,7 +131,7 @@ updateNode phyloId (UpdateNodePhylo config) jobHandle = do
let corpusId = fromMaybe (panicTrace "no corpus id") corpusId'
phy <- timeMeasured "updateNode.flowPhyloAPI" $ flowPhyloAPI (subConfigAPI2config config) mbComputeHistory corpusId
phy <- timeMeasured "updateNode.flowPhyloAPI" $ flowPhyloAPI (subConfigAPI2config config) mbComputeHistory corpusId jobHandle
markProgress 1 jobHandle
{-
......
......@@ -4,7 +4,6 @@ module Gargantext.API.Routes.Named.Viz (
-- * Routes types
PhyloAPI(..)
, GetPhylo(..)
, PostPhylo(..)
, GraphAPI(..)
, GraphAsyncAPI(..)
, GraphVersionsAPI(..)
......@@ -31,7 +30,6 @@ import Servant.XML.Conduit (XML)
data PhyloAPI mode = PhyloAPI
{ getPhyloEp :: mode :- Summary "Phylo API" :> NamedRoutes GetPhylo
, postPhyloEp :: mode :- NamedRoutes PostPhylo
} deriving Generic
......@@ -43,9 +41,9 @@ newtype GetPhylo mode = GetPhylo
} deriving Generic
newtype PostPhylo mode = PostPhylo
{ postPhyloByListIdEp :: mode :- QueryParam "listId" ListId :> (Post '[JSON] NodeId)
} deriving Generic
-- newtype PostPhylo mode = PostPhylo
-- { postPhyloByListIdEp :: mode :- QueryParam "listId" ListId :> (Post '[JSON] NodeId)
-- } deriving Generic
-- | There is no Delete specific API for Graph since it can be deleted
......
......@@ -9,19 +9,12 @@ Portability : POSIX
-}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Core.Config.Ini.NLP (
-- * Types
NLPConfig(..)
-- * Utility functions
, readConfig
-- * Lenses
, nlp_default
, nlp_languages
)
where
......@@ -59,4 +52,3 @@ readConfig fp = do
, T.pack $ show m_nlp_other ]
Just ret -> pure ret
makeLenses ''NLPConfig
......@@ -19,7 +19,6 @@ module Gargantext.Core.Config.NLP (
-- * Lenses
, nlp_default
, nlp_languages
)
where
......@@ -48,9 +47,9 @@ data NLPConfig = NLPConfig { _nlp_default :: URI
instance FromValue NLPConfig where
fromValue v = do
_nlp_default <- parseTableFromValue (reqKey "EN") v
-- _nlp_languages <- fromValue <$> getTable
MkTable t <- parseTableFromValue getTable v
_nlp_languages <- mapM fromValue (snd <$> t)
return $ NLPConfig { .. }
instance ToValue NLPConfig where
toValue = defaultTableToValue
......@@ -58,7 +57,7 @@ instance ToTable NLPConfig where
toTable (NLPConfig { .. }) =
table ([ k .= v | (k, v) <- Map.toList _nlp_languages ]
-- output the default "EN" language as well
<> [ ("EN" :: Text) .= _nlp_default ])
<> [ ("EN" :: Text) .= _nlp_default ] )
-- readConfig :: SettingsFile -> IO NLPConfig
......
......@@ -53,6 +53,8 @@ data WorkerSettings =
, _wsDefaultDelay :: B.TimeoutS
, _wsAdditionalDelayAfterRead :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition]
, _wsNlpConduitChunkSize :: Int
} deriving (Show, Eq)
instance FromValue WorkerSettings where
fromValue = parseTableFromValue $ do
......@@ -61,6 +63,7 @@ instance FromValue WorkerSettings where
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
_wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout"
_wsNlpConduitChunkSize <- reqKey "nlp_conduit_chunk_size"
defaultDelay <- reqKey "default_delay"
additionalDelayAfterRead <- reqKey "additional_delay_after_read"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
......@@ -69,7 +72,8 @@ instance FromValue WorkerSettings where
, _wsDefinitions
, _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead }
, _wsAdditionalDelayAfterRead = B.TimeoutS additionalDelayAfterRead
, _wsNlpConduitChunkSize }
instance ToValue WorkerSettings where
toValue = defaultTableToValue
instance ToTable WorkerSettings where
......@@ -80,7 +84,8 @@ instance ToTable WorkerSettings where
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay
, "additional_delay_after_read" .= B._TimeoutS _wsAdditionalDelayAfterRead
, "definitions" .= _wsDefinitions ]
, "definitions" .= _wsDefinitions
, "nlp_conduit_chunk_size" .= _wsNlpConduitChunkSize ]
data WorkerDefinition =
WorkerDefinition {
......
......@@ -26,14 +26,12 @@ import Gargantext.API.Routes.Named.Viz qualified as Named
import Gargantext.API.Viz.Types
import Gargantext.Core.Types.Phylo (GraphData(..))
import Gargantext.Core.Viz.LegacyPhylo hiding (Phylo(..))
import Gargantext.Core.Viz.Phylo (PhyloConfig(..), defaultConfig, _phylo_param, _phyloParam_config)
import Gargantext.Core.Viz.Phylo (PhyloConfig(..), _phylo_param, _phyloParam_config)
import Gargantext.Core.Viz.Phylo.API.Tools
import Gargantext.Database.Prelude
import Gargantext.Database.Admin.Types.Hyperdata
import Gargantext.Database.Admin.Types.Node -- (PhyloId, ListId, CorpusId, UserId, NodeId(..))
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, defaultList)
import Gargantext.Database.Query.Table.Node.Error
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Prelude
import Servant
import Servant.Server.Generic (AsServerT)
......@@ -42,7 +40,6 @@ import Web.HttpApiData (readTextData)
phyloAPI :: IsGargServer err env m => PhyloId -> Named.PhyloAPI (AsServerT m)
phyloAPI n = Named.PhyloAPI
{ getPhyloEp = getPhylo n
, postPhyloEp = postPhylo n
}
-- :<|> putPhylo n
-- :<|> deletePhylo n
......@@ -94,35 +91,6 @@ getPhyloDataJson phyloId = do
-- pure (SVG p)
-- FIXME(adn) This handler mixes DB reads with updates outside of the same
-- transaction, due to the call to 'flowPhyloAPI' in the middle.
postPhylo :: IsGargServer err env m => PhyloId -> Named.PostPhylo (AsServerT m)
postPhylo phyloId = Named.PostPhylo $ \_lId -> do
-- TODO get Reader settings
-- s <- ask
-- let
-- _vrs = Just ("1" :: Text)
-- _sft = Just (Software "Gargantext" "4")
-- _prm = initPhyloParam vrs sft (Just q)
corpusId <- runDBQuery $ getClosestParentIdByType phyloId NodeCorpus
-- Being the first time we ask for the Phylo, there is no historical data
-- available about computing time, so we pass 'Nothing'.
phy <- flowPhyloAPI defaultConfig Nothing (fromMaybe (panicTrace "[G.C.V.P.API] no corpus ID found") corpusId) -- params
-- phyloId <- insertNodes [node NodePhylo "Phylo" (HyperdataPhylo Nothing (Just phy)) (Just corpusId) userId]
_ <- runDBTx $ updateHyperdata phyloId (HyperdataPhylo Nothing (Just phy))
pure phyloId
------------------------------------------------------------------------
-- | DELETE Phylo == delete a node
------------------------------------------------------------------------
------------------------------------------------------------------------
{-
type PutPhylo = (Put '[JSON] Phylo )
--putPhylo :: PhyloId -> Maybe ListId -> PhyloQueryBuild -> Phylo
putPhylo :: PhyloId -> GargServer PutPhylo
putPhylo = undefined
-}
-- | Instances
instance FromHttpApiData DisplayMode where parseUrlPiece = readTextData
......
......@@ -49,6 +49,7 @@ import Gargantext.Database.Schema.Context ( ContextPoly(_context_hyperdata, _con
import Gargantext.Database.Schema.Node ( NodePoly(_node_hyperdata), node_hyperdata )
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging ( MonadLogger, LogLevel(DEBUG), logLocM )
import Gargantext.Utils.Jobs.Monad (JobHandle, MonadJobStatus(markProgress, addMoreSteps))
import Gargantext.Utils.UTCTime (timeMeasured, timeMeasured'')
import Prelude qualified
import System.FilePath ((</>))
......@@ -110,25 +111,35 @@ phylo2dot phylo = do
_ -> pure value
flowPhyloAPI :: (HasNodeStory env err m, HasNodeError err, MonadLogger m)
flowPhyloAPI :: (HasNodeStory env err m, HasNodeError err, MonadLogger m, MonadJobStatus m)
=> PhyloConfig
-> Maybe ComputeTimeHistory
-- ^ Previous compute time historical data, if any.
-> CorpusId
-> JobHandle m
-> m Phylo
flowPhyloAPI config mbOldComputeHistory cId = do
flowPhyloAPI config mbOldComputeHistory cId jobHandle = do
env <- view hasNodeStory
addMoreSteps 5 jobHandle
corpus <- timeMeasured "flowPhyloAPI.corpusIdtoDocuments" $ runDBQuery $ corpusIdtoDocuments env (timeUnit config) cId
markProgress 1 jobHandle
-- writePhylo phyloWithCliquesFile phyloWithCliques
$(logLocM) DEBUG $ "PhyloConfig old: " <> show config
(t1, phyloWithCliques) <- timeMeasured'' DEBUG "flowPhyloAPI.phyloWithCliques" (pure $! toPhyloWithoutLink corpus config)
markProgress 1 jobHandle
(t2, phyloConfigured) <- timeMeasured'' DEBUG "flowPhyloAPI.phyloConfigured" (pure $! setConfig config phyloWithCliques)
markProgress 1 jobHandle
(t3, finalPhylo) <- timeMeasured'' DEBUG "flowPhyloAPI.toPhylo" (pure $! toPhylo phyloConfigured)
markProgress 1 jobHandle
-- As the phylo is computed fresh every time, without looking at the one stored (if any), we
-- have to manually propagate computing time across.
pure $! trackComputeTime (t1 + t2 + t3) (finalPhylo { _phylo_computeTime = mbOldComputeHistory })
let ret = trackComputeTime (t1 + t2 + t3) (finalPhylo { _phylo_computeTime = mbOldComputeHistory })
markProgress 1 jobHandle
pure ret
--------------------------------------------------------------------
corpusIdtoDocuments :: HasNodeError err
......
......@@ -16,7 +16,7 @@ module Gargantext.Core.Viz.Phylo.PhyloMaker where
import Control.Lens hiding (Level)
import Control.Parallel.Strategies (parMap, rpar)
import Control.Parallel.Strategies (parMap, rpar, Strategy)
import Data.Containers.ListUtils (nubOrd)
import Data.Discrimination qualified as D
import Data.List (partition, intersect, tail)
......@@ -37,6 +37,13 @@ import Gargantext.Core.Viz.Phylo.SynchronicClustering (synchronicClustering)
import Gargantext.Core.Viz.Phylo.TemporalMatching (toPhyloQuality, temporalMatching, getNextPeriods, filterDocs, filterDiago, reduceDiagos, toSimilarity)
import Gargantext.Prelude hiding (empty, toList)
defaultStrategy :: Strategy a
defaultStrategy = rpar
------------------
-- | To Phylo | --
------------------
......@@ -151,7 +158,7 @@ evolvSeaLadder nbFdt lambda freq similarities graph = map snd
--------
-- 1.1) for each measure of similarity, prune the flat phylo, compute the branches and estimate the quality
qua :: [Double]
qua = parMap rpar (\thr ->
qua = parMap defaultStrategy (\thr ->
let edges = filter (\edge -> snd edge >= thr) graph
nodes = nubOrd $ concatMap (\((n,n'),_) -> [n,n']) edges
branches = toRelatedComponents nodes edges
......@@ -192,7 +199,7 @@ findSeaLadder phylo = case getSeaElevation phylo of
docs = filterDocs (getDocsByDate phylo) ([period] ++ next)
diagos = filterDiago (getCoocByDate phylo) ([period] ++ next)
-- 1.2) compute the kinship similarities between pairs of source & target in parallel
pairs = parMap rpar (\source ->
pairs = parMap defaultStrategy (\source ->
let candidates = filter (\target -> (> 2) $ length
$ intersect (getGroupNgrams source) (getGroupNgrams target)) targets
in map (\target ->
......@@ -330,7 +337,7 @@ filterCliqueBySize thr l = filter (\clq -> (length $ clq ^. clustering_roots) >=
-- To filter nested Fis
filterCliqueByNested :: Map (Date, Date) [Clustering] -> Map (Date, Date) [Clustering]
filterCliqueByNested m =
let clq = parMap rpar (\l ->
let clq = parMap defaultStrategy (\l ->
foldl (\mem f -> if (any (\f' -> isNested (f' ^. clustering_roots) (f ^. clustering_roots)) mem)
then mem
else
......@@ -358,7 +365,7 @@ toSeriesOfClustering phylo phyloDocs = case (clique $ getConfig phylo) of
seriesOfClustering :: Map (Date,Date) [Clustering]
seriesOfClustering = case (clique $ getConfig phylo) of
Fis _ _ ->
let fis = parMap rpar (\(prd,docs) ->
let fis = parMap defaultStrategy (\(prd,docs) ->
case (corpusParser $ getConfig phylo) of
Tsv' _ -> let lst = toList
$ fisWithSizePolyMap' (Segment 1 20) 1 (map (\d -> (ngramsToIdx (text d) (getRoots phylo), (weight d, (sourcesToIdx (sources d) (getSources phylo))))) docs)
......@@ -370,7 +377,7 @@ toSeriesOfClustering phylo phyloDocs = case (clique $ getConfig phylo) of
$ toList phyloDocs
in fromList fis
MaxClique _ thr filterType ->
let mcl = parMap rpar (\(prd,docs) ->
let mcl = parMap defaultStrategy (\(prd,docs) ->
let cooc = map round
$ foldl sumCooc empty
$ map listToMatrix
......@@ -422,7 +429,7 @@ groupDocsByPeriodRec f prds docs acc =
groupDocsByPeriod' :: (NFData doc, Ord date, Enum date) => (doc -> date) -> [(date,date)] -> [doc] -> Map (date, date) [doc]
groupDocsByPeriod' f pds docs =
let docs' = groupBy (\d d' -> f d == f d') $ sortOn f docs
periods = parMap rpar (inPeriode f docs') pds
periods = parMap defaultStrategy (inPeriode f docs') pds
in tracePhylo ("\n" <> "-- | Group "
<> show (length docs)
<> " docs by "
......@@ -440,7 +447,7 @@ groupDocsByPeriod' f pds docs =
groupDocsByPeriod :: (NFData doc, Ord date, Enum date) => (doc -> date) -> [(date,date)] -> [doc] -> Map (date, date) [doc]
groupDocsByPeriod _ _ [] = panic "[ERR][Viz.Phylo.PhyloMaker] Empty [Documents] can not have any periods"
groupDocsByPeriod f pds es =
let periods = parMap rpar (inPeriode f es) pds
let periods = parMap defaultStrategy (inPeriode f es) pds
in tracePhylo ("\n" <> "-- | Group "
<> show (length es) <> " docs by "
......
......@@ -9,6 +9,7 @@ Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
......@@ -25,6 +26,7 @@ import Control.Lens.TH
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Maybe (fromJust)
import Data.Pool qualified as Pool
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
import Gargantext.API.Errors (BackendInternalError)
......@@ -42,7 +44,7 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, withLoggerIO)
import Gargantext.System.Logging (HasLogger(..), Logger, LogLevel(..), MonadLogger(..), withLogger, logMsg, logLocM, withLoggerIO)
import Gargantext.System.Logging.Loggers
import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(..), JobHandle )
import System.Log.FastLogger qualified as FL
......@@ -182,9 +184,21 @@ instance MonadJobStatus WorkerMonad where
type JobEventType WorkerMonad = JobLog
noJobHandle Proxy = WorkerNoJobHandle
getLatestJobStatus _ = WorkerMonad (pure noJobLog)
getLatestJobStatus WorkerNoJobHandle = pure noJobLog
getLatestJobStatus (WorkerJobHandle ji) = do
stateTVar <- asks _w_env_job_state
state' <- liftIO $ readTVarIO stateTVar
pure $ case state' of
Nothing -> noJobLog
Just wjs ->
if _wjs_job_info wjs == ji
then
_wjs_job_log wjs
else
noJobLog
withTracer _ jh n = n jh
markStarted n jh = updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markStarted n jh =
updateJobProgress jh (const $ jobLogStart $ RemainingSteps n)
markProgress steps jh = updateJobProgress jh (jobLogProgress steps)
markFailure steps mb_msg jh =
updateJobProgress jh (\latest -> case mb_msg of
......@@ -208,7 +222,9 @@ updateJobProgress (WorkerJobHandle (ji@JobInfo { _ji_message_id })) f = do
case state' of
Nothing -> pure ()
Just wjs -> do
CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs)
(CET.ce_notify $ CET.UpdateWorkerProgress ji (_wjs_job_log wjs))
`CES.catch` (\(e :: SomeException) ->
$(logLocM) WARNING $ T.pack $ displayException e)
where
updateState mwjs =
let initJobLog =
......
......@@ -47,9 +47,11 @@ sendJobWithCfg gcConfig job = do
b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd
let addDelayAfterRead = gcConfig ^. gc_worker . wsAdditionalDelayAfterRead
let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay
let sj = (W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay
, W.addDelayAfterRead = B._TimeoutS addDelayAfterRead
-- don't allow to repeat infinitely (see #495)
, W.toStrat = WT.TSDelete }
let job' = updateJobData ws job sj
withLogger (gcConfig ^. gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job'
......
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE StandaloneDeriving #-}
{-|
Module : Gargantext.Core.Worker.Types
Description : Some useful worker types
......
......@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where
import Conduit
import Control.Concurrent.Async.Lifted qualified as AsyncL
import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch)
......@@ -70,7 +71,8 @@ import Data.Text qualified as T
import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.API.Ngrams.Types (NgramsTerm)
import Gargantext.Core (Lang(..), withDefaultLanguage, NLPServerConfig)
import Gargantext.Core.Config (GargConfig(..), hasConfig)
import Gargantext.Core.Config (GargConfig(..), hasConfig, gc_worker)
import Gargantext.Core.Config.Worker (wsNlpConduitChunkSize)
import Gargantext.Core.Config.Types (APIsConfig(..))
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
......@@ -98,7 +100,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) )
import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument )
import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude
import Gargantext.Database.Class ( DBCmdWithEnv, IsDBCmd )
import Gargantext.Database.Transactional ( DBUpdate, runDBTx )
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add)
......@@ -108,8 +111,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId)
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser)
import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId )
import Gargantext.Database.Schema.Node
import Gargantext.Database.Types
import Gargantext.Database.Schema.Node ( NodePoly(_node_id, _node_hash_id), node_hyperdata )
import Gargantext.Database.Types ( Indexed(Indexed) )
import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..), markFailureNoErr )
......@@ -296,11 +299,12 @@ flow :: forall env err m a c.
-> m CorpusId
flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
cfg <- view hasConfig
let chunkSize = cfg ^. gc_worker . wsNlpConduitChunkSize
(_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c
forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs
runConduit (zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 5
.| CList.chunksOf chunkSize
.| mapM_C (addDocumentsWithProgress userCorpusId)
.| sinkNull) `CES.catches`
[ CES.Handler $ \(e :: ClientError) -> do
......@@ -544,13 +548,20 @@ extractNgramsFromDocuments :: forall doc env err m.
-> TermType Lang
-> [doc]
-> m (UncommittedNgrams doc)
extractNgramsFromDocuments nlpServer lang docs =
foldlM go mempty docs
where
go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc)
go !acc inputDoc = do
ngrams <- extractNgramsFromDocument nlpServer lang inputDoc
pure $ acc <> ngrams
extractNgramsFromDocuments nlpServer lang docs = do
ret <- AsyncL.mapConcurrently (extractNgramsFromDocument nlpServer lang) docs
-- sem <- QSemL.newQSem 10
-- let f = extractNgramsFromDocument nlpServer lang
-- ret <- AsyncL.mapConcurrently (\doc ->
-- CEL.bracket_ (QSemL.waitQSem sem) (QSemL.signalQSem sem) (f doc)
-- ) docs
pure $ foldl (<>) mempty ret
-- foldlM go mempty docs
-- where
-- go :: UncommittedNgrams doc -> doc -> m (UncommittedNgrams doc)
-- go !acc inputDoc = do
-- ngrams <- extractNgramsFromDocument nlpServer lang inputDoc
-- pure $ acc <> ngrams
commitNgramsForDocuments :: UniqParameters doc
=> UncommittedNgrams doc
......
......@@ -132,7 +132,8 @@ getOccByNgramsOnlyFast_withSample cId int nt ngs =
HM.fromListWith (+) <$> selectNgramsOccurrencesOnlyByContextUser_withSample cId int nt ngs
-- Returns occurrences of ngrams in given corpus/list (for each ngram, a list of contexts is returned)
-- | Returns occurrences of ngrams in given corpus/list (for each
-- ngram, a list of contexts is returned)
getOccByNgramsOnlyFast :: CorpusId
-> ListId
-> NgramsType
......@@ -154,34 +155,39 @@ getOccByNgramsOnlyFast cId lId nt = do
query :: DPS.Query
query = [sql|
WITH cnnv AS
( SELECT DISTINCT context_node_ngrams.context_id,
context_node_ngrams.ngrams_id,
nodes_contexts.node_id,
nodes_contexts.category
WITH nc AS (
SELECT DISTINCT context_id
FROM nodes_contexts
JOIN context_node_ngrams ON context_node_ngrams.context_id = nodes_contexts.context_id
WHERE node_id = ?
AND category > 0
),
node_context_ids AS
(SELECT context_id, ngrams_id, terms
FROM cnnv
JOIN ngrams ON cnnv.ngrams_id = ngrams.id
WHERE node_id = ? AND cnnv.category > 0
cnnv AS
( SELECT DISTINCT context_id,
ngrams_id
FROM context_node_ngrams
WHERE context_id IN (SELECT context_id FROM nc)
),
ncids_agg AS
(SELECT ngrams_id, terms, array_agg(DISTINCT context_id) AS agg
FROM node_context_ids
GROUP BY (ngrams_id, terms)),
( SELECT array_agg(DISTINCT context_id) AS agg,
ngrams_id,
terms
FROM cnnv
JOIN ngrams
ON cnnv.ngrams_id = ngrams.id
GROUP BY (ngrams_id, terms)
),
ns AS
(SELECT ngrams_id, terms
FROM node_stories
JOIN ngrams ON ngrams_id = ngrams.id
JOIN ngrams
ON ngrams_id = ngrams.id
WHERE node_id = ? AND ngrams_type_id = ?
)
SELECT ns.terms, CASE WHEN agg IS NULL THEN '{}' ELSE agg END
FROM ns
LEFT JOIN ncids_agg ON ns.ngrams_id = ncids_agg.ngrams_id
LEFT JOIN ncids_agg
ON ns.ngrams_id = ncids_agg.ngrams_id
|]
-- query = [sql|
-- WITH node_context_ids AS
......
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TypeFamilies #-}
module Gargantext.System.Logging.Types (
LogLevel(..)
......
......@@ -99,6 +99,10 @@ default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# Batch size when sending data to NLP.
# Preferably, set as much as the number of CPUs
nlp_conduit_chunk_size = 10
# NOTE This is overridden by Test.Database.Setup
[worker.database]
host = "127.0.0.1"
......
......@@ -14,7 +14,16 @@ Portability : POSIX
module Test.Utils.Jobs ( test ) where
import Async.Worker.Broker.PGMQ qualified as PGMQ
import Async.Worker.Broker.Types qualified as BT
import Data.Aeson qualified as Aeson
import Data.Maybe (fromJust)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), ScraperEvent(..))
import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.Types
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Test.Hspec
import Test.Instances () -- arbitrary instances
......@@ -27,7 +36,6 @@ import Test.Instances () -- arbitrary instances
-- via the dispatcher notifications mechanism.
test :: Spec
test = do
pure ()
-- describe "job queue" $ do
-- it "respects max runners limit" $
-- testMaxRunners
......@@ -37,13 +45,13 @@ test = do
-- testExceptions
-- it "fairly picks equal-priority-but-different-kind jobs" $
-- testFairness
-- describe "job status update and tracking" $ sequential $ do
describe "job status update and tracking" $ sequential $ do
-- it "can fetch the latest job status" $
-- testFetchJobStatus
-- it "can spin two separate jobs and track their status separately" $
-- testFetchJobStatusNoContention
-- it "marking stuff behaves as expected" $
-- testMarkProgress
it "marking stuff behaves as expected" $
testMarkProgress
......@@ -270,110 +278,104 @@ data Counts = Counts { countAs :: Int, countBs :: Int }
-- (map _scst_remaining evts2' == [Just 50])
-- ) 500
-- testMarkProgress :: IO ()
-- testMarkProgress = do
-- myEnv <- newTestEnv
-- -- evts <- newTBQueueIO 7
-- evts <- newTVarIO []
-- let expectedEvents = 7
-- let getStatus hdl = do
-- liftIO $ threadDelay 100_000
-- st <- getLatestJobStatus hdl
-- -- liftIO $ atomically $ writeTBQueue evts st
-- liftIO $ atomically $ modifyTVar evts (\xs -> xs ++ [st])
-- readAllEvents = do
-- -- We will get thread blocking if there is ANY error in the job
-- -- Hence we assert the `readAllEvents` test doesn't take too long
-- mRet <- timeout 5_000_000 $ atomically $ do
-- -- allEventsArrived <- isFullTBQueue evts
-- evts' <- readTVar evts
-- -- STM retry if things failed
-- -- check allEventsArrived
-- check (length evts' == expectedEvents)
-- -- flushTBQueue evts
-- pure evts'
-- case mRet of
-- Nothing -> Prelude.fail $ "testMarkProgress: timeout exceeded, but didn't receive all 7 required events."
-- Just xs
-- | length xs == expectedEvents
-- -> pure xs
-- | otherwise
-- -> Prelude.fail $ "testMarkProgress: received some events, but they were not of the expected number (" <> show expectedEvents <> "): " <> show xs
-- withJob_ myEnv $ \hdl _input -> do
-- markStarted 10 hdl
-- getStatus hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailureNoErr 1 hdl
-- getStatus hdl
-- markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
-- getStatus hdl
-- markComplete hdl
-- getStatus hdl
-- markStarted 5 hdl
-- markProgress 1 hdl
-- getStatus hdl
-- markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
-- getStatus hdl
-- evts' <- readAllEvents
-- -- This pattern match should never fail, because the precondition is
-- -- checked in 'readAllEvents'.
-- let [jl0, jl1, jl2, jl3, jl4, jl5, jl6] = evts'
-- -- Check the events are what we expect
-- jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 10
-- , _scst_events = Just []
-- }
-- jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 9
-- , _scst_events = Just []
-- }
-- jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 1
-- , _scst_remaining = Just 8
-- , _scst_events = Just []
-- }
-- jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 7
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
-- , _scst_failed = Just 2
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "boom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
-- jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 0
-- , _scst_remaining = Just 4
-- , _scst_events = Just []
-- }
-- jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
-- , _scst_failed = Just 4
-- , _scst_remaining = Just 0
-- , _scst_events = Just [
-- ScraperEvent { _scev_message = Just "kaboom"
-- , _scev_level = Just "ERROR"
-- , _scev_date = Nothing }
-- ]
-- }
testMarkProgress :: IO ()
testMarkProgress = do
withWorkerEnv "./test-data/test_config.toml" $ \env -> runWorkerMonad env $ do
let msgId = (fromJust $ Aeson.decode "0") :: BT.MessageId PGMQ.PGMQBroker
let hdl = WorkerJobHandle { _w_job_info = JobInfo { _ji_message_id = msgId
, _ji_mNode_id = Nothing } }
markStarted 10 hdl
jl0 <- getLatestJobStatus hdl
liftBase $ jl0 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
markProgress 1 hdl
jl1 <- getLatestJobStatus hdl
liftBase $ jl1 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 9
, _scst_events = Just []
}
markFailure 1 (Nothing :: Maybe Void) hdl
jl2 <- getLatestJobStatus hdl
liftBase $ jl2 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 1
, _scst_remaining = Just 8
, _scst_events = Just []
}
markFailure 1 (Just $ UnsafeMkHumanFriendlyErrorText "boom") hdl
jl3 <- getLatestJobStatus hdl
liftBase $ jl3 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 2
, _scst_remaining = Just 7
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
markComplete hdl
jl4 <- getLatestJobStatus hdl
liftBase $ jl4 `shouldBe` JobLog { _scst_succeeded = Just 8
, _scst_failed = Just 2
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "boom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
markStarted 5 hdl
markProgress 1 hdl
jl5 <- getLatestJobStatus hdl
liftBase $ jl5 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
markFailed (Just $ UnsafeMkHumanFriendlyErrorText "kaboom") hdl
jl6 <- getLatestJobStatus hdl
liftBase $ jl6 `shouldBe` JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 4
, _scst_remaining = Just 0
, _scst_events = Just [
ScraperEvent { _scev_message = Just "kaboom"
, _scev_level = Just "ERROR"
, _scev_date = Nothing }
]
}
let msgId2 = (fromJust $ Aeson.decode "2") :: BT.MessageId PGMQ.PGMQBroker
let hdl2 = WorkerJobHandle { _w_job_info = JobInfo { _ji_message_id = msgId2
, _ji_mNode_id = Nothing } }
addMoreSteps 11 hdl2
jl7 <- getLatestJobStatus hdl2
liftBase $ jl7 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 11
, _scst_events = Just []
}
markStarted 1 hdl2
jl8 <- getLatestJobStatus hdl2
liftBase $ jl8 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
addMoreSteps 11 hdl2
jl9 <- getLatestJobStatus hdl2
liftBase $ jl9 `shouldBe` JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 12
, _scst_events = Just []
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment