Commit f18e2dbc authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Implement a proper incremental parser for TSV documents

This commit introduces/improves the `parseTvsWithDiagnostics`
function to parse the input TSV incrementally, collecting errors
as we go, and eventually reporting them upstream via the newly
added `emitTsvParseWarning` function.
parent b0d6365f
......@@ -57,6 +57,7 @@ data-files:
test-data/issue-381/Termes_A_Ajouter_T4SC_Intellixir.tsv
test-data/issue-381/Termes_A_Ajouter_T4SC_Intellixir12.csv
test-data/issue-380/corpus.tsv
test-data/issue-380/malformed_row.tsv
.clippy.dhall
-- common options
......@@ -227,6 +228,7 @@ library
Gargantext.Core.Text.Corpus.API.OpenAlex
Gargantext.Core.Text.Corpus.API.Pubmed
Gargantext.Core.Text.Corpus.Parsers
Gargantext.Core.Text.Corpus.Parsers.Types
Gargantext.Core.Text.Corpus.Parsers.Date
Gargantext.Core.Text.Corpus.Parsers.TSV
Gargantext.Core.Text.Corpus.Query
......
......@@ -187,6 +187,8 @@ instance MonadJobStatus (GargM DevEnv err) where
markFailed _ _ = pure ()
emitWarning _ _ = pure ()
addMoreSteps _ _ = pure ()
instance HasConfig DevEnv where
......
......@@ -20,6 +20,7 @@ module Gargantext.API.Job (
, jobLogFailTotalWithMessage
, RemainingSteps(..)
, addErrorEvent
, addWarningEvent
) where
import Control.Lens (over, _Just)
......@@ -49,6 +50,9 @@ addEvent level message (JobLog { _scst_events = mEvts, .. }) = JobLog { _scst_ev
addErrorEvent :: ToHumanFriendlyError e => e -> JobLog -> JobLog
addErrorEvent message = addEvent "ERROR" (mkHumanFriendly message)
addWarningEvent :: ToHumanFriendlyError e => e -> JobLog -> JobLog
addWarningEvent message = addEvent "WARNING" (mkHumanFriendly message)
jobLogProgress :: Int -> JobLog -> JobLog
jobLogProgress n jl = over (scst_succeeded . _Just) (+ n) $
over (scst_remaining . _Just) (\x -> max 0 (x - n)) jl
......
......@@ -15,15 +15,17 @@ New corpus means either:
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE LambdaCase #-}
module Gargantext.API.Node.Corpus.New
where
import Conduit ((.|), yieldMany, mapMC, mapC, transPipe)
import Conduit ((.|), yieldMany, mapMC, transPipe)
import Control.Exception.Safe (MonadMask)
import Control.Lens ( view, non )
import Data.Conduit.Internal (zipSources)
import Data.Conduit.List (mapMaybeM)
import Data.Swagger ( ToSchema(..) )
import Data.Text qualified as T
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
......@@ -34,14 +36,15 @@ import Gargantext.API.Node.Corpus.Searx ( triggerSearxSearch )
import Gargantext.API.Node.Corpus.Types ( Datafield(Web), datafield2origin )
import Gargantext.API.Node.Corpus.Update (addLanguageToCorpus)
import Gargantext.API.Node.Types
import Gargantext.Core (withDefaultLanguage, defaultLanguage)
import Gargantext.Core.Config (gc_jobs, hasConfig)
import Gargantext.Core.Config.Types (jc_max_docs_parsers)
import Gargantext.Core.NodeStory (HasNodeStoryImmediateSaver, HasNodeArchiveStoryImmediateSaver, currentVersion, NgramsStatePatch', HasNodeStoryEnv)
import Gargantext.Core.Text.Corpus.Parsers qualified as Parser (FileType(..), parseFormatC, _ParseFormatError)
import Gargantext.Core.Text.Corpus.Parsers.Types
import Gargantext.Core.Text.Corpus.Query qualified as API
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Utils.Prefix (unPrefix)
import Gargantext.Core (withDefaultLanguage, defaultLanguage)
import Gargantext.Database.Action.Flow (flowCorpus, getDataText, flowDataText, TermType(..){-, allDataOrigins-})
import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Action.Mail (sendMail)
......@@ -52,13 +55,14 @@ import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) )
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), ParentId)
import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Database.Prelude (readLargeObject, IsDBCmd)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node (getNodeWith, getOrMkList)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(MkCorpusUserNormalCorpusIds))
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.System.Logging ( logLocM, LogLevel(..) )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (JobHandle, MonadJobStatus(..))
------------------------------------------------------------------------
......@@ -226,9 +230,7 @@ addToCorpusWithTempFile :: ( MonadMask m
-> JobHandle m
-> m ()
addToCorpusWithTempFile user cid nwtf jobHandle = do
-- printDebug "[addToCorpusWithForm] Parsing corpus: " cid
-- printDebug "[addToCorpusWithForm] fileType" ft
-- printDebug "[addToCorpusWithForm] fileFormat" ff
$(logLocM) DEBUG $ "Adding documents to corpus: " <> show cid
let l = nwtf ^. wtf_lang . non defaultLanguage
addLanguageToCorpus cid l
......@@ -256,7 +258,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
-- TODO Add progress (jobStatus) update for docs - this is a
-- long action
let docsC' = zipSources (yieldMany [1..]) docsC
let docsC' = zipSources (yieldMany [1..]) (transPipe liftBase docsC)
.| mapMC (\(idx, doc) ->
if idx > limit then do
--printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show limit)
......@@ -269,7 +271,10 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
panicTrace panicMsg
else
pure doc)
.| mapC toHyperdataDocument
.| mapMaybeM (\case
ParseRecordSucceeded r -> pure $ Just $ toHyperdataDocument r
ParseTsvRecordFailed r -> emitTsvParseWarning jobHandle r >> pure Nothing
)
--printDebug "Parsing corpus finished : " cid
--logStatus jobLog2
......@@ -282,7 +287,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
(Multi l)
(Just (nwtf ^. wtf_selection))
--(Just $ fromIntegral $ length docs, docsC')
(count, transPipe liftBase docsC') -- TODO fix number of docs
(count, docsC') -- TODO fix number of docs
--(map (map toHyperdataDocument) docs)
jobHandle
......@@ -298,6 +303,11 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
$(logLocM) ERROR $ "[addToCorpusWithTempFile] parse error: " <> Parser._ParseFormatError parseErr
markFailed (Just parseErr) jobHandle
emitTsvParseWarning :: MonadJobStatus m => JobHandle m -> AtRow (Text, ByteString) -> m ()
emitTsvParseWarning jh (AtRow row_num (t, _r)) =
let msg = UnsafeMkHumanFriendlyErrorText ("Parsing of record at row " <> T.pack (show row_num) <> " failed due to: " <> t)
in emitWarning msg jh
{-
addToCorpusWithFile :: FlowCmdM env err m
=> CorpusId
......
......@@ -13,6 +13,9 @@ Portability : POSIX
module Gargantext.API.Worker where
import Data.Aeson qualified as JSON
import Data.ByteString.Lazy qualified as BL
import Data.Text.Encoding qualified as TE
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
......@@ -41,7 +44,7 @@ serveWorkerAPIM mkJob = WorkerAPI { workerAPIPost }
where
workerAPIPost i = do
job <- mkJob i
logM DEBUG $ "[serveWorkerAPI] sending job " <> show job
logM DEBUG $ "[serveWorkerAPI] sending job " <> TE.decodeUtf8 (BL.toStrict $ JSON.encode job)
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
......
......@@ -62,6 +62,7 @@ import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Zip qualified as UZip
import Protolude ( show )
import System.FilePath (takeExtension)
import Gargantext.Core.Text.Corpus.Parsers.Types
------------------------------------------------------------------------
type ParseError = Text
......@@ -101,14 +102,14 @@ parseFormatC :: forall m. MonadBaseControl IO m
=> FileType
-> FileFormat
-> DB.ByteString
-> m (Either ParseFormatError (Integer, ConduitT () HyperdataDocument IO ()))
-> m (Either ParseFormatError (Integer, ConduitT () (ParseCorpusResult HyperdataDocument) IO ()))
parseFormatC ft ff bs0 = first ParseFormatError <$> do_parse ft ff bs0
where
do_parse :: MonadBaseControl IO m
=> FileType
-> FileFormat
-> DB.ByteString
-> m (Either DT.Text (Integer, ConduitT () HyperdataDocument IO ()))
-> m (Either DT.Text (Integer, ConduitT () (ParseCorpusResult HyperdataDocument) IO ()))
do_parse TsvGargV3 Plain bs = do
let eParsedC = parseTsvC $ DBL.fromStrict bs
pure (second (transPipe (pure . runIdentity)) <$> eParsedC)
......@@ -117,7 +118,7 @@ parseFormatC ft ff bs0 = first ParseFormatError <$> do_parse ft ff bs0
pure (second (transPipe (pure . runIdentity)) <$> eParsedC)
do_parse Istex Plain bs = do
ep <- liftBase $ parseIstex EN $ DBL.fromStrict bs
pure $ (\p -> (1, yieldMany [p])) <$> ep
pure $ (\p -> (1, yieldMany [ParseRecordSucceeded p])) <$> ep
do_parse RisPresse Plain bs = do
--docs <- enrichWith RisPresse
let eDocs = runParser' RisPresse bs
......@@ -126,7 +127,7 @@ parseFormatC ft ff bs0 = first ParseFormatError <$> do_parse ft ff bs0
, yieldMany docs
.| mapC presseEnrich
.| mapC (map $ both decodeUtf8)
.| mapMC (toDoc RIS)) ) <$> eDocs
.| mapMC (fmap ParseRecordSucceeded . toDoc RIS)) ) <$> eDocs
do_parse WOS Plain bs = do
let eDocs = runParser' WOS bs
pure $ (\docs ->
......@@ -134,7 +135,7 @@ parseFormatC ft ff bs0 = first ParseFormatError <$> do_parse ft ff bs0
, yieldMany docs
.| mapC (map $ first WOS.keys)
.| mapC (map $ both decodeUtf8)
.| mapMC (toDoc WOS)) ) <$> eDocs
.| mapMC (fmap ParseRecordSucceeded . toDoc WOS)) ) <$> eDocs
do_parse Iramuteq Plain bs = do
let eDocs = runParser' Iramuteq bs
pure $ (\docs ->
......@@ -142,13 +143,13 @@ parseFormatC ft ff bs0 = first ParseFormatError <$> do_parse ft ff bs0
, yieldMany docs
.| mapC (map $ first Iramuteq.keys)
.| mapC (map $ both decodeUtf8)
.| mapMC (toDoc Iramuteq . map (second (DT.replace "_" " ")))
.| mapMC (fmap ParseRecordSucceeded . toDoc Iramuteq . map (second (DT.replace "_" " ")))
)
)
<$> eDocs
do_parse JSON Plain bs = do
let eParsedC = parseJSONC $ DBL.fromStrict bs
pure (second (transPipe (pure . runIdentity)) <$> eParsedC)
pure (second (mapOutput ParseRecordSucceeded . transPipe (pure . runIdentity)) <$> eParsedC)
do_parse fty ZIP bs = liftBase $ UZip.withZipFileBS bs $ do
fileNames <- filter (filterZIPFileNameP ft) . DM.keys <$> getEntries
printDebug "[do_parse] fileNames" fileNames
......
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE ViewPatterns #-}
{-|
Module : Gargantext.Core.Text.Corpus.Parsers.TSV
......@@ -25,9 +27,6 @@ module Gargantext.Core.Text.Corpus.Parsers.TSV (
, writeDocs2Tsv
, readTsvHal
, TsvHal(..)
, AtRow(..)
, ParseTsvResult(..)
, parseTsvWithDiagnostics
-- * Used in tests
, testCorrectFile
, testErrorPerLine
......@@ -47,8 +46,12 @@ module Gargantext.Core.Text.Corpus.Parsers.TSV (
) where
import Conduit ( ConduitT, (.|), yieldMany, mapC )
import Control.Lens hiding ((.=))
import Data.ByteString (StrictByteString)
import Data.ByteString.Lazy qualified as BL
import Data.ByteString.Lazy.Char8 qualified as B8L
import Data.Csv
import Data.Csv.Incremental qualified as CSVI
import Data.Text (pack)
import Data.Text qualified as T
import Data.Text.Lazy qualified as TL
......@@ -57,10 +60,34 @@ import Data.Text.Read qualified as DTR
import Data.Time.Segment (jour)
import Data.Vector (Vector)
import Data.Vector qualified as V
import Gargantext.Core.Text.Corpus.Parsers.Types
import Gargantext.Database.Admin.Types.Hyperdata.Document ( HyperdataDocument(..) )
import Gargantext.Prelude hiding (length, show)
import Prelude (String)
import Protolude
import qualified Data.ByteString as B
-- | Little helper data structure to make working with the incremental
-- TSV parsing a bit nicer.
data ParsingContext a
= MkHeaderParsingContext (CSVI.HeaderParser (CSVI.Parser a))
| MkRecordParsingContext (RecordParsingContext a)
deriving Show
data RecordParsingContext a = RecordParsingContext
{ -- Either the header parser, if we just started parsing the document, or actual
-- incremental parser for the records.
-- This field will contain 'Nothing' if the parser has been drained and we need
-- to stop recursion.
_prs_ctx_parser :: Maybe (CSVI.Parser a)
, _prs_ctx_parsed_records :: [a]
-- | Keeps track of the current row we are in, so that we can insert proper error
-- diagnostics.
, _prs_ctx_row_cursor :: !Int
} deriving Show
makeLenses ''RecordParsingContext
---------------------------------------------------------------
headerTsvGargV3 :: Header
......@@ -291,6 +318,22 @@ testCorrectFile bs =
Left _err -> Left _err
Right headers -> testIfErrorInFile bl del headers
-- Detects delimiter based on the first line
detectDelimiter :: BL.ByteString -> Either Text Delimiter
detectDelimiter input =
case B8L.lines input of
(firstLine : _) ->
let candidates = [(',', count ',' firstLine), ('\t', count '\t' firstLine)]
in case fst $ maximumBy (comparing snd) candidates of
'\n' -> Right Line
'\t' -> Right Tab
',' -> Right Comma
_ -> Left $ "Invalid delimiter detected for input tsv document."
_ -> Left "Couldn't detect a valid delimiter for the input document."
-- Count occurrences of a character in a ByteString
count :: Char -> BL.ByteString -> Int64
count c = BL.count (fromIntegral (fromEnum c))
----------Test headers added to ggt
......@@ -317,41 +360,96 @@ getHeaders bl del = do
readTSVFile :: FilePath -> IO (Either Text (Header, Vector TsvDoc))
readTSVFile fp = do
file <- BL.readFile fp
case (testCorrectFile file) of
Left _err -> pure $ Left _err
case detectDelimiter file of
Left err -> pure $ Left err
Right del -> pure $ readTsvLazyBS del file
-- | Allows the parser to report errors happening at a given row in the document.
data AtRow a = AtRow Int a
data ParseTsvResult a
= ParseTsvResult
{ _ptr_parsed_records :: a
, _ptr_skipped_records :: [ AtRow (Text, Vector ByteString) ]
}
type TsvDocParser = CSVI.Parser (ParseCorpusResult TsvDoc)
parseTsvWithDiagnostics :: Delimiter
-> BL.ByteString
-> Either Text (ParseTsvResult [TsvDoc])
parseTsvWithDiagnostics d bs = case decodeWith @Record (tsvDecodeOptions d) HasHeader bs of
Left err -> Left $ pack err
Right rawRecords -> Right $ uncurry ParseTsvResult . first reverse $
foldl' parse_raw_record (mempty, mempty) (zip (V.toList rawRecords) [ 1 .. ])
-> [ParseCorpusResult TsvDoc]
parseTsvWithDiagnostics d bs =
case drainParser (BL.foldrChunks go mkHdrParser bs) of
MkHeaderParsingContext _p ->
[ ParseTsvRecordFailed (AtRow 0 ("The parsing choked on the header (delimiter was " <> show d <> "). This might be a malformed TSV we can't recover from.", mempty)) ]
MkRecordParsingContext (RecordParsingContext{..})
-> reverse $ _prs_ctx_parsed_records
where
parse_raw_record :: ([TsvDoc], [AtRow (Text, Vector ByteString)])
-> (Vector ByteString, Int)
-> ([TsvDoc], [AtRow (Text, Vector ByteString)])
parse_raw_record (!succeeded, !failed) (input_record, current_row) =
case decodeByNameWith @TsvDoc (tsvDecodeOptions d) (intercalateRecords d input_record) of
Left err -> (succeeded, AtRow current_row (pack err, input_record) : failed)
Right (_, V.toList -> [tsvDoc]) -> (tsvDoc : succeeded, failed)
-- The one below won't happen, but it's left for completeness.
Right (_, V.toList -> tsvDocs) -> (tsvDocs <> succeeded, failed)
-- | \"Reconstructs\" a line out of a parsed record, so that it can be fed back
-- into the invidual parser.
intercalateRecords :: Delimiter -> Record -> BL.ByteString
intercalateRecords d r = BL.fromStrict $ B.intercalate (B.pack [delimiter d]) (V.toList r)
drainParser :: ParsingContext (ParseCorpusResult TsvDoc)
-> ParsingContext (ParseCorpusResult TsvDoc)
drainParser = go mempty . go mempty -- step twice, once to apply the empty string, once to drain.
mkHdrParser :: ParsingContext (ParseCorpusResult TsvDoc)
mkHdrParser = MkHeaderParsingContext (CSVI.decodeByNameWith (tsvDecodeOptions d))
go :: StrictByteString
-> ParsingContext (ParseCorpusResult TsvDoc)
-> ParsingContext (ParseCorpusResult TsvDoc)
go strict_chunk ctx = case ctx of
MkHeaderParsingContext p -> go_hdr strict_chunk p
MkRecordParsingContext p -> go_rec strict_chunk p
-- Specialised parser for the header.
go_hdr :: StrictByteString
-> CSVI.HeaderParser TsvDocParser
-> ParsingContext (ParseCorpusResult TsvDoc)
go_hdr chunk p = case p of
CSVI.FailH unconsumed err ->
MkRecordParsingContext $ stepParser chunk $ toRecordParsingCtx (CSVI.Fail unconsumed err)
CSVI.PartialH continue ->
MkHeaderParsingContext (continue chunk)
CSVI.DoneH _header rec_parser ->
-- Turn this into a record parser by feeding the unconsumed plus the input.
MkRecordParsingContext $ stepParser chunk $ toRecordParsingCtx rec_parser
-- Specialised parser for the individual records.
go_rec :: StrictByteString
-> RecordParsingContext (ParseCorpusResult TsvDoc)
-> ParsingContext (ParseCorpusResult TsvDoc)
go_rec input_bs ctx = MkRecordParsingContext $ stepParser input_bs ctx
stepParser :: ByteString
-> RecordParsingContext (ParseCorpusResult TsvDoc)
-> RecordParsingContext (ParseCorpusResult TsvDoc)
stepParser input_bs ctx = case ctx ^. prs_ctx_parser of
Nothing
-> ctx
Just (CSVI.Fail unconsumed err)
-> ctx & over prs_ctx_parsed_records (mkErr ctx (T.pack err, unconsumed) :)
& prs_ctx_parser .~ Nothing
Just (CSVI.Many parsed continue)
-> (addParsedRecords parsed ctx) & prs_ctx_parser .~ (Just $ continue input_bs)
Just (CSVI.Done parsed)
-> (addParsedRecords parsed ctx) & prs_ctx_parser .~ Nothing
-- Convert a parser inside a 'ParsingContext' from a header parser into a record parser.
toRecordParsingCtx :: TsvDocParser
-> RecordParsingContext (ParseCorpusResult TsvDoc)
toRecordParsingCtx p =
RecordParsingContext
{ _prs_ctx_parser = Just p
, _prs_ctx_parsed_records = []
, _prs_ctx_row_cursor = 1
}
mkErr :: RecordParsingContext (ParseCorpusResult TsvDoc) -> (Text, ByteString) -> ParseCorpusResult TsvDoc
mkErr ctx pair = ParseTsvRecordFailed (AtRow (ctx ^. prs_ctx_row_cursor) pair)
addParsedRecords :: [Either String (ParseCorpusResult TsvDoc)]
-> RecordParsingContext (ParseCorpusResult TsvDoc)
-> RecordParsingContext (ParseCorpusResult TsvDoc)
addParsedRecords recs ctx = foldl' process_record ctx recs
process_record :: RecordParsingContext (ParseCorpusResult TsvDoc)
-> Either String (ParseCorpusResult TsvDoc)
-> RecordParsingContext (ParseCorpusResult TsvDoc)
process_record ctx result =
let ctx' = ctx & over prs_ctx_row_cursor succ
in case result of
Left err -> ctx' & over prs_ctx_parsed_records (mkErr ctx' (T.pack err, mempty) :)
Right rec -> ctx' & over prs_ctx_parsed_records (rec :)
-- | TODO use readByteStringLazy
readTsvLazyBS :: Delimiter
......@@ -519,13 +617,23 @@ parseTsv :: FilePath -> IO (Either Text [HyperdataDocument])
parseTsv fp = fmap (V.toList . V.map tsv2doc . snd) <$> readTSVFile fp
parseTsvC :: BL.ByteString
-> Either Text (Integer, ConduitT () HyperdataDocument Identity ())
parseTsvC bs =
(\(_h, rs) -> (fromIntegral $ V.length rs, yieldMany rs .| mapC tsv2doc)) <$> eResult
-> Either Text (Integer, ConduitT () (ParseCorpusResult HyperdataDocument) Identity ())
parseTsvC bs = convert_result <$> eResult
where
eResult = case testCorrectFile bs of
Left _err -> Left _err
Right del -> readTsvLazyBS del bs
eResult :: Either Text [ParseCorpusResult TsvDoc]
eResult = case detectDelimiter bs of
Left err -> Left err
Right del -> Right $ parseTsvWithDiagnostics del bs
convert_result :: [ParseCorpusResult TsvDoc]
-> (Integer, ConduitT () (ParseCorpusResult HyperdataDocument) Identity ())
convert_result rs =
(fromIntegral $ length rs, yieldMany rs .| mapC tsvResult2doc)
tsvResult2doc :: ParseCorpusResult TsvDoc -> ParseCorpusResult HyperdataDocument
tsvResult2doc = \case
ParseRecordSucceeded r -> ParseRecordSucceeded (tsv2doc r)
ParseTsvRecordFailed err -> ParseTsvRecordFailed err
------------------------------------------------------------------------
-- Tsv v3 weighted for phylo
......
module Gargantext.Core.Text.Corpus.Parsers.Types where
import Data.ByteString
import Data.Csv
import Data.Text
import Prelude
-- | Allows the parser to report errors happening at a given row in the document.
data AtRow a = AtRow Int a
deriving Show
data ParseCorpusResult a
= ParseRecordSucceeded a
| ParseTsvRecordFailed (AtRow (Text, ByteString))
deriving Show
instance FromNamedRecord a => FromNamedRecord (ParseCorpusResult a) where
parseNamedRecord m = ParseRecordSucceeded <$> parseNamedRecord m
......@@ -18,8 +18,8 @@ Portability : POSIX
module Gargantext.Core.Worker where
import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker qualified as W
import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker.Types qualified as W
import Control.Exception.Safe qualified as CES
import Control.Lens (to)
......@@ -33,14 +33,14 @@ import Gargantext.API.Ngrams.List (postAsyncJSON)
import Gargantext.API.Node.Contact (addContact)
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Corpus.New (addToCorpusWithTempFile, addToCorpusWithQuery)
import Gargantext.API.Node.DocumentsFromWriteNodes (documentsFromWriteNodes)
import Gargantext.API.Node.DocumentUpload (documentUploadAsync, remoteImportDocuments)
import Gargantext.API.Node.DocumentsFromWriteNodes (documentsFromWriteNodes)
import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Types (_wtf_file_oid)
import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Node.Update (updateNode)
import Gargantext.API.Node.Update.Types (UpdateNodeParams(..), Granularity (..))
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_worker, gc_logging)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
......@@ -56,7 +56,7 @@ import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (readLargeObject, removeLargeObject)
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger )
import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, markFailed))
import System.Posix.Signals (Handler(Catch), installHandler, keyboardSignal)
......@@ -90,7 +90,7 @@ notifyJobStarted env (W.State { name }) bm = do
let j = toA $ getMessage bm
let job = W.job j
withLogger (env ^. w_env_config . gc_logging) $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobStarted] [" <> name <> " :: " <> show mId <> "] starting job: " <> show j
$(logLoc) ioL DEBUG $ T.pack $ "[notifyJobStarted] [" <> name <> " :: " <> show mId <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji }
......
......@@ -27,7 +27,7 @@ import Data.Maybe (fromJust)
import Data.Pool qualified as Pool
import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types (JobLog, noJobLog)
import Gargantext.API.Job (RemainingSteps(..), jobLogStart, jobLogProgress, jobLogFailures, jobLogComplete, addErrorEvent, jobLogFailTotal, jobLogFailTotalWithMessage, jobLogAddMore)
import Gargantext.API.Job (RemainingSteps(..), jobLogStart, jobLogProgress, jobLogFailures, jobLogComplete, addErrorEvent, jobLogFailTotal, jobLogFailTotalWithMessage, jobLogAddMore, addWarningEvent)
import Gargantext.API.Prelude (GargM)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
......@@ -229,6 +229,7 @@ instance MonadJobStatus WorkerMonad where
updateJobProgress jh (\latest -> case mb_msg of
Nothing -> jobLogFailTotal latest
Just msg -> jobLogFailTotalWithMessage msg latest)
emitWarning msg jh = updateJobProgress jh (addWarningEvent msg)
addMoreSteps steps jh = updateJobProgress jh (jobLogAddMore steps)
......
{-# LANGUAGE TemplateHaskell #-}
{-|
Module : Gargantext.Core.Worker.Jobs
Description : Worker job definitions
......@@ -22,7 +23,7 @@ import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob)
import Gargantext.Database.Prelude (Cmd)
import Gargantext.Prelude
import Gargantext.System.Logging (logMsg, withLogger, LogLevel(..))
import Gargantext.System.Logging
sendJob :: (HasWorkerBroker, HasConfig env)
......@@ -45,7 +46,7 @@ sendJobWithCfg gcConfig job = do
let queueName = _wdQueue wd
let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay }
withLogger (gcConfig ^. gc_logging) $ \ioL ->
logMsg ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type
......
......@@ -240,9 +240,10 @@ fromField' field mb = do
where
valueToHyperdata v = case fromJSON v of
Success a -> pure a
Error _err -> returnError ConversionFailed field
Error err -> returnError ConversionFailed field
$ DL.unwords [ "cannot parse hyperdata for JSON: "
, show v
, err
]
dbCheck :: DBCmd err Bool
......
......@@ -104,6 +104,9 @@ class MonadJobStatus m where
-- message to the failure.
markFailed :: forall e. ToHumanFriendlyError e => Maybe e -> JobHandle m -> m ()
-- Logs a new event with \"WARNING\" severity.
emitWarning :: forall e. ToHumanFriendlyError e => e -> JobHandle m -> m ()
-- | Add 'n' more steps to the running computation, they will be marked as remaining.
addMoreSteps :: MonadJobStatus m => Int -> JobHandle m -> m ()
......
Title Authors Source Abstract Publication Year Publication Month Publication Day
#Ecuador | 🚨Se insta antisananews mastodon.social #Ecuador | 🚨Se instaló la audiencia de juicio contra el exalcalde de #Quito, Jorge Yunda, y 13 personas más, procesadas por #peculado en la compra de 100.000 pruebas para detectar #COVID19 y que presuntamente abrían causado un perjuicio al Estado por 2’235.491,16 dólares.#Comparta #Suscríbase↩👍Sígame enRadio Antisana Media Online: https://antisananews.blogspot.com/TikTok: https://www.tiktok.com/@antisanamediaonline?lang=esTelegram: https://t.me/AntisanaMediaOnlineVK: https://vk.com/antisanamultimediosX: https://twitter.com/AntisanaNews 2024 02 06
#NorthCarolina bann MatthewChat@mstdn.social toot.io #NorthCarolina banned immunocompromized people from wearing masks in public. This does NOT apply to the #KKK, as their is a specific exemption for them. #covid #masking 2024 05 16
'The coronation of t BenHigbie@mastodon.social fosstodon.org 'The coronation of the Serbian Tsar Stefan Dušan as East Roman Emperor' from 'The Slav Epic' by Alphonse Mucha (1926) #art #arts #artist #artists #artlover #artlovers #arthistory #illustration #painting #paintings #inspiration #artmuseum #museum #artmuseums #museums #artnet 2024 05 05
3/5 Krankenstand 202 ToveHarris mastodon.social 3/5 Krankenstand 2022 und 2023 komme Verlust von 350.000 Beschäftigten gleich. Arbeitsausfall werde derzeit durch Überstunden + erhöhte Produktivität aufgefangen. Ohne diese Leistungen der Beschäftigten gäbe es eine Lücke von 700.000 Beschäftigten.Und #CovidIsNotOver #LongCOVID #COVID #COVID19 #Coronahttps://www.vfa.de/de/wirtschaft-politik/macroscope/macroscope-hoher-krankenstand-drueckt-deutschland-in-die-rezession 2024 01 27
@ABScientist @Hidde,justafrog@mstdn.social,mastodon.social,@ABScientist @Hidde @roelgrif How about this one?https://nos.nl/artikel/2457983-viroloog-koopmans-coronagolf-in-china-nu-niet-heel-zorgelijk-voor-nederland 2023 12 28
@cassandra17lina In gemswinc counter.social @cassandra17lina In a way, Covid was a gift to introverts 2024 01 24
@erictopol This is t SpookieRobieTheCat@mastodon.social toot.io @erictopol This is the #CovidBrain that Trump and MAGA suffer from. All those #Antivaxx proponents will suffer the consequences too. And I'm ok with it. Be anti-science, be wilfully ignorant and live a life in agony, that's their choice. I shouldn't have to pay a dime for their stupidity. 2024 01 09
A bunch of maskless crowgirl@hachyderm.io toot.io "A bunch of maskless #Covid "experts" like Gregg Gonsalves are now trying to sound credible by warning about H5N1.I think H5N1 is a serious problem that requires airborne mitigations and decontaminating the food supply.And I also don't dine in restaurants. Gregg on the other hand is proud of his masklessness.Be warned. Don't let these professional Covid minimizers get away with this crap with #H5N1." 2024 06 03
......@@ -9,12 +9,17 @@ module Test.API.Private.List (
) where
import Data.Aeson.QQ
import Data.Text qualified as T
import Data.Text.IO qualified as TIO
import Fmt
import Gargantext.API.HashedResponse
import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Types qualified as APINgrams
import Gargantext.API.Node.Corpus.New.Types qualified as FType
import Gargantext.API.Node.Types
import Gargantext.Core qualified as Lang
import Gargantext.Core.Config
import Gargantext.Core.Text.List.Social
import Gargantext.Core.Types
import Gargantext.Core.Types.Individu
import Gargantext.Core.Worker.Types
......@@ -25,13 +30,23 @@ import Servant.Client.Streaming
import Test.API.Prelude (newCorpusForUser, checkEither)
import Test.API.Routes
import Test.API.Setup
import Test.API.UpdateList qualified as UpdateList
import Test.Database.Types
import Test.Hspec (Spec, it, aroundAll, describe, sequential)
import Test.Hspec.Expectations
import Test.Hspec.Wai.Internal (withApplication)
import Test.Utils
mkNewWithForm :: T.Text -> T.Text -> NewWithForm
mkNewWithForm content name = NewWithForm
{ _wf_filetype = FType.TSV
, _wf_fileformat = FType.Plain
, _wf_data = content
, _wf_lang = Just Lang.EN
, _wf_name = name
, _wf_selection = FlowSocialListWithPriority MySelfFirst
}
importTermsTSV :: SpecContext () -> String -> IO (JobInfo, CorpusId, ListId)
importTermsTSV (SpecContext testEnv port app _) name = do
cId <- liftIO $ newCorpusForUser testEnv "alice"
......@@ -58,8 +73,8 @@ importCorpusTSV (SpecContext testEnv port app _) name = do
([listId] :: [NodeId]) <- protectedJSON token "POST" (mkUrl port ("/node/" <> build cId)) [aesonQQ|{"pn_typename":"NodeList","pn_name":"Testing"}|]
-- Upload the CSV doc
simpleNgrams <- liftIO (TIO.readFile =<< getDataFileName name)
let params = UpdateList.mkNewWithForm simpleNgrams "simple.tsv"
pendingJob <- checkEither $ liftIO $ runClientM (importCorpus token listId params) clientEnv
let params = mkNewWithForm simpleNgrams "simple.tsv"
pendingJob <- checkEither $ liftIO $ runClientM (importCorpus token cId params) clientEnv
jobInfo <- pollUntilWorkFinished log_cfg port pendingJob
pure (jobInfo, cId, listId)
......@@ -85,11 +100,21 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
length terms `shouldSatisfy` (>= 1)
it "should handle dirty TSV as per issue #380" $ \ctx@(SpecContext _testEnv port app _) -> do
(_, cId, listId) <- importCorpusTSV ctx "test-data/issue-380/corpus.tsv"
(_, cId, _listId) <- importCorpusTSV ctx "test-data/issue-380/corpus.tsv"
withApplication app $ do
withValidLogin port "alice" (GargPassword "alice") $ \clientEnv token -> do
-- Now check that we can retrieve the ngrams, and the ngrams list is not empty!
liftIO $ do
eRes <- checkEither $ runClientM (get_table_ngrams token cId APINgrams.Terms listId 50 Nothing (Just MapTerm) Nothing Nothing Nothing Nothing) clientEnv
let (APINgrams.NgramsTable terms) = APINgrams._vc_data eRes
length terms `shouldSatisfy` (>= 1)
eRes <- checkEither $ runClientM (get_table token cId (Just APINgrams.Docs) Nothing Nothing Nothing Nothing Nothing) clientEnv
let (HashedResponse _ TableResult{tr_docs}) = eRes
length tr_docs `shouldBe` 7
it "should skip problematic rows" $ \ctx@(SpecContext _testEnv port app _) -> do
(_, cId, _listId) <- importCorpusTSV ctx "test-data/issue-380/malformed_row.tsv"
withApplication app $ do
withValidLogin port "alice" (GargPassword "alice") $ \clientEnv token -> do
-- Now check that we can retrieve the ngrams, and the ngrams list is not empty!
liftIO $ do
eRes <- checkEither $ runClientM (get_table token cId (Just APINgrams.Docs) Nothing Nothing Nothing Nothing Nothing) clientEnv
let (HashedResponse _ TableResult{tr_docs}) = eRes
length tr_docs `shouldBe` 6 -- it must have skipped the broken row
......@@ -89,6 +89,7 @@ instance MonadJobStatus TestMonad where
markFailure _ _ _ = TestMonad $ pure ()
markComplete _ = TestMonad $ pure ()
markFailed _ _ = TestMonad $ pure ()
emitWarning _ _ = TestMonad $ pure ()
addMoreSteps _ _ = TestMonad $ pure ()
data DBHandle = DBHandle {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment