Commit 0ce35337 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Fix the 'happiness' bug

Fixes #258

This commit fixes the issue of the "empty Ngrams". The culprit was a bug
in the `arxiv-api` package, which was fixed by:

gargantext/crawlers/arxiv-api!2 (commits)

In a nutshell, due to the fact the relevant `Conduit` was never
completing, it was blocking the `flow` function, which was endlessly
waiting for the last part of the results to arrive.

Hence we were never calling the `flowCorpusUser`, which is the function
responsible for generating the Ngrams.
parent 052f4bf8
Pipeline #4513 failed with stages
in 9 minutes and 51 seconds
......@@ -79,7 +79,7 @@ source-repository-package
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/crawlers/arxiv-api.git
tag: 2d7e5753cbbce248b860b571a0e9885415c846f7
tag: eb130c71fa17adaceed6ff66beefbccb13df51ba
source-repository-package
type: git
......
......@@ -227,6 +227,8 @@ instance Jobs.MonadJobStatus (GargM Env err) where
Just msg -> jobLogFailTotalWithMessage msg latest
)
addMoreSteps steps jh = updateJobProgress jh (jobLogAddMore steps)
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
}
......@@ -292,6 +294,8 @@ instance Jobs.MonadJobStatus (GargM DevEnv err) where
markFailed _ _ = pure ()
addMoreSteps _ _ = pure ()
instance HasConfig DevEnv where
hasConfig = dev_env_config
......
......@@ -31,7 +31,7 @@ addErrorEvent message = addEvent "ERROR" message
jobLogProgress :: Int -> JobLog -> JobLog
jobLogProgress n jl = over (scst_succeeded . _Just) (+ n) $
over (scst_remaining . _Just) (\x -> x - n) jl
over (scst_remaining . _Just) (\x -> max 0 (x - n)) jl
-- | Mark a job as completely done, by adding the 'remaining' into 'succeeded'.
-- At the end 'scst_remaining' will be 0, and 'scst_succeeded' will be 'oldvalue + remaining'.
......@@ -41,6 +41,9 @@ jobLogComplete jl =
in jl & over scst_succeeded (Just . maybe remainingNow ((+) remainingNow))
& over scst_remaining (const (Just 0))
jobLogAddMore :: Int -> JobLog -> JobLog
jobLogAddMore moreSteps jl = jl & over (scst_remaining . _Just) (+ moreSteps)
jobLogFailures :: Int -> JobLog -> JobLog
jobLogFailures n jl = over (scst_failed . _Just) (+ n) $
over (scst_remaining . _Just) (\x -> x - n) jl
......
......@@ -236,7 +236,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
case eTxt of
Right txt -> do
-- TODO Sum lenghts of each txt elements
$(logLocM) DEBUG "Processing dataText results"
markProgress 1 jobHandle
corpusId <- flowDataText user txt (Multi l) cid (Just flw) jobHandle
......@@ -248,6 +248,7 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
Left err -> do
-- printDebug "Error: " err
$(logLocM) ERROR (T.pack $ show err)
markFailed (Just $ T.pack (show err)) jobHandle
type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
......
......@@ -55,9 +55,11 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
import Conduit
import Control.Lens hiding (elements, Indexed)
import Control.Monad (void)
import Data.Aeson.TH (deriveJSON)
import Data.Conduit.Internal (zipSources)
import Data.Either
import Data.Foldable (for_)
import Data.HashMap.Strict (HashMap)
import Data.Hashable (Hashable)
import Data.List (concat)
......@@ -69,6 +71,7 @@ import Data.Set (Set)
import Data.Swagger
import Data.Tuple.Extra (first, second)
import GHC.Generics (Generic)
import GHC.Num (fromInteger)
import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.Core (Lang(..), PosTagAlgo(..))
import Gargantext.Core (withDefaultLanguage)
......@@ -97,7 +100,7 @@ import Gargantext.Database.Action.Metrics (updateNgramsOccurrences, updateContex
import Gargantext.Database.Action.Search (searchDocInDatabase)
import Gargantext.Database.Admin.Config (userMaster, corpusMasterName)
import Gargantext.Database.Admin.Types.Hyperdata
import Gargantext.Database.Admin.Types.Node -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Admin.Types.Node hiding (DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.ContextNodeNgrams2
import Gargantext.Database.Query.Table.Ngrams
......@@ -113,26 +116,28 @@ import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Database.Types
import Gargantext.Prelude
import Gargantext.Prelude.Crypto.Hash (Hash)
import Gargantext.System.Logging
import Gargantext.Utils.Jobs (JobHandle, MonadJobStatus(..))
import System.FilePath (FilePath)
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.List as CList
import qualified Data.HashMap.Strict as HashMap
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Gargantext.API.Ngrams.Types as NT
import qualified Gargantext.Core.Text.Corpus.API as API
import qualified Gargantext.Data.HashMap.Strict.Utils as HashMap
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
import qualified Gargantext.Database.Query.Table.Node.Document.Add as Doc (add)
import qualified PUBMED.Types as PUBMED
------------------------------------------------------------------------
-- Imports for upgrade function
import Gargantext.Database.Query.Tree.Root (getRootId)
import Gargantext.Database.Query.Tree (findNodesId)
import qualified Data.List as List
------------------------------------------------------------------------
-- TODO use internal with API name (could be old data)
data DataOrigin = InternalOrigin { _do_api :: API.ExternalAPIs }
......@@ -205,12 +210,15 @@ flowDataText :: forall env err m.
-> JobHandle m
-> m CorpusId
flowDataText u (DataOld ids) tt cid mfslw _ = do
$(logLocM) DEBUG $ T.pack $ "Found " <> show (length ids) <> " old node IDs"
(_userId, userCorpusId, listId) <- createNodes u (Right [cid]) corpusType
_ <- Doc.add userCorpusId ids
flowCorpusUser (_tt_lang tt) u userCorpusId listId corpusType mfslw
where
corpusType = (Nothing :: Maybe HyperdataCorpus)
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle =
flowDataText u (DataNew (mLen, txtC)) tt cid mfslw jobHandle = do
$(logLocM) DEBUG $ T.pack $ "Found " <> show mLen <> " new documents to process"
for_ (mLen <&> fromInteger) (`addMoreSteps` jobHandle)
flowCorpus u (Right [cid]) tt mfslw (mLen, (transPipe liftBase txtC)) jobHandle
------------------------------------------------------------------------
......@@ -279,59 +287,23 @@ flow :: forall env err m a c.
flow c u cn la mfslw (mLength, docsC) jobHandle = do
(_userId, userCorpusId, listId) <- createNodes u cn c
-- TODO if public insertMasterDocs else insertUserDocs
_ <- runConduit $ zipSources (yieldMany [1..]) docsC
runConduit $ zipSources (yieldMany [1..]) docsC
.| CList.chunksOf 100
.| mapMC insertDocs'
.| mapM_C (\ids' -> do
_ <- Doc.add userCorpusId ids'
pure ())
.| sinkList
_ <- flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
-- ids <- traverse (\(idx, doc) -> do
-- id <- insertMasterDocs c la doc
-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ length docs - idx
-- , _scst_events = Just []
-- }
-- pure id
-- ) (zip [1..] docs)
--printDebug "[flow] calling flowCorpusUser" (0 :: Int)
pure userCorpusId
--flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
.| mapM_C (\docs -> void $ insertDocs' docs >>= Doc.add userCorpusId)
.| sinkNull
$(logLocM) DEBUG "Calling flowCorpusUser"
flowCorpusUser (la ^. tt_lang) u userCorpusId listId c mfslw
where
insertDocs' :: [(Integer, a)] -> m [NodeId]
insertDocs' [] = pure []
insertDocs' docs = do
-- printDebug "[flow] calling insertDoc, ([idx], mLength) = " (fst <$> docs, mLength)
$(logLocM) DEBUG $ T.pack $ "calling insertDoc, ([idx], mLength) = " <> show (fst <$> docs, mLength)
ids <- insertMasterDocs c la (snd <$> docs)
let maxIdx = maximum (fst <$> docs)
case mLength of
Nothing -> pure ()
Just _len -> do
let succeeded = fromIntegral (1 + maxIdx)
-- let remaining = fromIntegral (len - maxIdx)
-- Reconstruct the correct update state by using 'markStarted' and the other primitives.
-- We do this slightly awkward arithmetic such that when we call 'markProgress' we reduce
-- the number of 'remaining' of exactly '1 + maxIdx', and we will end up with a 'JobLog'
-- looking like this:
-- JobLog
-- { _scst_succeeded = Just $ fromIntegral $ 1 + maxIdx
-- , _scst_failed = Just 0
-- , _scst_remaining = Just $ fromIntegral $ len - maxIdx
-- , _scst_events = Just []
-- }
-- markStarted (remaining + succeeded) jobHandle
markProgress succeeded jobHandle
markProgress (length docs) jobHandle
pure ids
------------------------------------------------------------------------
createNodes :: ( FlowCmdM env err m
, MkCorpus c
......
......@@ -39,6 +39,13 @@ add pId ns = runPGSQuery queryAdd (Only $ Values fields inputData)
fields = map (\t-> QualifiedIdentifier Nothing t) inputSqlTypes
inputData = prepare pId ns
-- | Adds a single document. Useful for debugging purposes, but
-- not as efficient as adding documents in bulk via 'add'.
add_one :: CorpusId -> ContextId -> Cmd err [Only Int]
add_one pId ctxId = runPGSQuery queryAdd (Only $ Values fields [InputData pId ctxId])
where
fields = map (\t-> QualifiedIdentifier Nothing t) inputSqlTypes
add_debug :: CorpusId -> [ContextId] -> Cmd err ByteString
add_debug pId ns = formatPGSQuery queryAdd (Only $ Values fields inputData)
where
......
......@@ -212,3 +212,6 @@ class MonadJobStatus m where
-- | Finish tracking a job by marking all the remaining steps as failed. Attach an optional
-- message to the failure.
markFailed :: Maybe T.Text -> JobHandle m -> m ()
-- | Add 'n' more steps to the running computation, they will be marked as remaining.
addMoreSteps :: MonadJobStatus m => Int -> JobHandle m -> m ()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment