[corpus] support adding file with temp storage (in postgresql large object)

Related to
#444

The rationale behind this is that we don't want to pollute worker job
queue with large file blobs. Instead, upon API request, we create a
pg_largeobject and use that in worker. After job is finished (with
error or not), the object is removed.
parent b44997b3
......@@ -37,15 +37,15 @@ import Prelude qualified
workerCLI :: CLIWorker -> IO ()
workerCLI (CLIW_run (WorkerArgs { .. })) = do
workerCLI (CLIW_run (WorkerArgs { .. })) = withLogger () $ \ioLogger -> do
let ___ = putStrLn ((List.concat
$ List.take 72
$ List.cycle ["_"]) :: Prelude.String)
___
putText "GarganText worker"
putText $ "worker_name: " <> worker_name
putText $ "worker toml: " <> T.pack (_SettingsFile worker_toml)
logMsg ioLogger INFO "GarganText worker"
logMsg ioLogger INFO $ "worker_name: " <> T.unpack worker_name
logMsg ioLogger INFO $ "worker toml: " <> _SettingsFile worker_toml
___
withWorkerEnv worker_toml $ \env -> do
......@@ -56,9 +56,9 @@ workerCLI (CLIW_run (WorkerArgs { .. })) = do
let availableWorkers = T.intercalate ", " workerNames
putText $ "Worker definition not found! Available workers: " <> availableWorkers
Just wd -> do
putText $ "Starting worker '" <> worker_name <> "'"
putText $ "gc config: " <> show (env ^. hasConfig)
putText $ "Worker settings: " <> show ws
logMsg ioLogger INFO $ "Starting worker '" <> T.unpack worker_name <> "'"
logMsg ioLogger DEBUG $ "gc config: " <> show (env ^. hasConfig)
logMsg ioLogger DEBUG $ "Worker settings: " <> show ws
___
if worker_run_single then
withPGMQWorkerSingleCtrlC env wd $ \a _state -> do
......
......@@ -9,8 +9,6 @@ Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Ngrams.List.Types where
import Data.Aeson
......
......@@ -36,7 +36,7 @@ import Gargantext.API.Metrics
import Gargantext.API.Ngrams.Types (TabType(..))
import Gargantext.API.Node.DocumentUpload qualified as DocumentUpload
import Gargantext.API.Node.DocumentsFromWriteNodes qualified as DFWN
import Gargantext.API.Node.File ( fileApi, fileAsyncApi, tempFileAsyncApi )
import Gargantext.API.Node.File ( fileApi, fileAsyncApi )
import Gargantext.API.Node.FrameCalcUpload qualified as FrameCalcUpload
import Gargantext.API.Node.New ( postNode, postNodeAsyncAPI )
import Gargantext.API.Node.Share qualified as Share
......
......@@ -22,14 +22,14 @@ module Gargantext.API.Node.Corpus.New
import Conduit ((.|), yieldMany, mapMC, mapC, transPipe)
import Control.Lens ( view, non )
import Data.ByteString.Base64 qualified as BSB64
import Data.Conduit.Internal (zipSources)
import Data.Swagger ( ToSchema(..) )
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types qualified as API
import Gargantext.API.Ngrams (commitStatePatch, Versioned(..))
import Gargantext.API.Node.Corpus.New.Types ( FileFormat(..), FileType(..) )
import Gargantext.API.Node.Corpus.New.Types ( FileType(..) )
import Gargantext.API.Node.Corpus.Searx ( triggerSearxSearch )
import Gargantext.API.Node.Corpus.Types ( Datafield(Web), datafield2origin )
import Gargantext.API.Node.Corpus.Update (addLanguageToCorpus)
......@@ -51,6 +51,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(
import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) )
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), ParentId)
import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Database.Prelude (mkCmd)
import Gargantext.Database.Query.Table.Node (getNodeWith, getOrMkList)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(MkCorpusUserNormalCorpusIds))
......@@ -212,28 +213,28 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
$(logLocM) ERROR $ "[addToCorpusWithQuery] error: " <> show err -- log the full error
markFailed (Just err) jobHandle
addToCorpusWithForm :: ( FlowCmdM env err m
, MonadJobStatus m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
)
=> User
-> CorpusId
-> NewWithForm
-> JobHandle m
-> m ()
addToCorpusWithForm user cid nwf jobHandle = do
addToCorpusWithTempFile :: ( FlowCmdM env err m
, MonadJobStatus m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
)
=> User
-> CorpusId
-> NewWithTempFile
-> JobHandle m
-> m ()
addToCorpusWithTempFile user cid nwtf jobHandle = do
-- printDebug "[addToCorpusWithForm] Parsing corpus: " cid
-- printDebug "[addToCorpusWithForm] fileType" ft
-- printDebug "[addToCorpusWithForm] fileFormat" ff
let l = nwf ^. wf_lang . non defaultLanguage
let l = nwtf ^. wtf_lang . non defaultLanguage
addLanguageToCorpus cid l
limit' <- view $ hasConfig . gc_jobs . jc_max_docs_parsers
let limit = fromIntegral limit' :: Integer
let
parseC = case (nwf ^. wf_filetype) of
parseC = case nwtf ^. wtf_filetype of
TSV -> Parser.parseFormatC Parser.TsvGargV3
TSV_HAL -> Parser.parseFormatC Parser.TsvHal
Iramuteq -> Parser.parseFormatC Parser.Iramuteq
......@@ -243,12 +244,13 @@ addToCorpusWithForm user cid nwf jobHandle = do
WOS -> Parser.parseFormatC Parser.WOS
-- TODO granularity of the logStatus
let data' = case (nwf ^. wf_fileformat) of
Plain -> cs (nwf ^. wf_data)
ZIP -> case BSB64.decode $ TE.encodeUtf8 (nwf ^. wf_data) of
Left err -> panicTrace $ T.pack "[addToCorpusWithForm] error decoding base64: " <> T.pack err
Right decoded -> decoded
eDocsC <- liftBase $ parseC (nwf ^. wf_fileformat) data'
data' <- mkCmd $ \c -> PSQL.withTransaction c $ do
let oId = PSQL.Oid $ fromIntegral $ nwtf ^. wtf_file_oid
loFd <- PSQL.loOpen c oId PSQL.ReadMode
-- TODO: Chunks?
size <- PSQL.loTell c loFd
PSQL.loRead c loFd size
eDocsC <- liftBase $ parseC (nwtf ^. wtf_fileformat) data'
case eDocsC of
Right (count, docsC) -> do
-- TODO Add progress (jobStatus) update for docs - this is a
......@@ -278,7 +280,7 @@ addToCorpusWithForm user cid nwf jobHandle = do
_cid' <- flowCorpus (MkCorpusUserNormalCorpusIds user [cid])
(Multi l)
(Just (nwf ^. wf_selection))
(Just (nwtf ^. wtf_selection))
--(Just $ fromIntegral $ length docs, docsC')
(count, transPipe liftBase docsC') -- TODO fix number of docs
--(map (map toHyperdataDocument) docs)
......
......@@ -24,6 +24,7 @@ import Gargantext.API.Node.File.Types
import Gargantext.API.Node.Types ( NewWithFile(NewWithFile) )
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.File qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Action.Node (mkNodeWithParent)
......@@ -58,9 +59,7 @@ fileDownload nId = do
Contents c <- GargDB.readGargFile $ T.unpack path
let (mMime, _) = DMT.guessType DMT.defaultmtd False $ T.unpack name'
mime = case mMime of
Just m -> m
Nothing -> "text/plain"
mime = fromMaybe "text/plain" mMime
pure $ addHeader (T.pack mime) $ BSResponse c
......
......@@ -15,15 +15,15 @@ Portability : POSIX
module Gargantext.API.Node.FrameCalcUpload where
import Data.ByteString.Lazy qualified as BSL
import Data.ByteString.UTF8 qualified as BSU8
import Data.Text qualified as T
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
import Gargantext.API.Admin.Auth.Types ( auth_node_id, AuthenticatedUser )
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types ( BackendInternalError )
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.Corpus.New (addToCorpusWithTempFile)
import Gargantext.API.Node.Corpus.New.Types (FileFormat(..), FileType(..))
import Gargantext.API.Node.FrameCalcUpload.Types
import Gargantext.API.Node.Types (NewWithForm(..))
import Gargantext.API.Node.Types (NewWithTempFile(..))
import Gargantext.API.Prelude ( GargM )
import Gargantext.API.Routes.Named.FrameCalc qualified as Named
import Gargantext.API.Worker (serveWorkerAPI)
......@@ -35,6 +35,7 @@ import Gargantext.Database.Action.Flow.Types ( FlowCmdM )
import Gargantext.Database.Admin.Types.Hyperdata.Frame ( HyperdataFrame(..) )
import Gargantext.Database.Admin.Types.Node ( NodeId, NodeType(NodeCorpus) )
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Prelude (createLargeObject)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..), markFailureNoErr)
......@@ -65,7 +66,7 @@ frameCalcUploadAsync :: ( HasConfig env
-> FrameCalcUpload
-> JobHandle m
-> m ()
frameCalcUploadAsync authenticatedUser nId (FrameCalcUpload _wf_lang _wf_selection) jobHandle = do
frameCalcUploadAsync authenticatedUser nId (FrameCalcUpload _wtf_lang _wtf_selection) jobHandle = do
markStarted 5 jobHandle
-- printDebug "[frameCalcUploadAsync] uId" uId
......@@ -82,7 +83,8 @@ frameCalcUploadAsync authenticatedUser nId (FrameCalcUpload _wf_lang _wf_selecti
manager <- newManager tlsManagerSettings
req <- parseRequest $ T.unpack csvUrl
httpLbs req manager
let body = T.pack $ BSU8.toString $ BSL.toStrict $ responseBody res
let body = BSL.toStrict $ responseBody res
PSQL.Oid oId <- createLargeObject body
-- printDebug "body" body
mCId <- getClosestParentIdByType nId NodeCorpus
......@@ -92,14 +94,14 @@ frameCalcUploadAsync authenticatedUser nId (FrameCalcUpload _wf_lang _wf_selecti
Nothing -> markFailureNoErr 1 jobHandle
Just cId ->
-- FIXME(adn) Audit this conversion.
addToCorpusWithForm (RootId userNodeId)
cId
(NewWithForm { _wf_filetype = TSV
, _wf_fileformat = Plain
, _wf_data = body
, _wf_lang
, _wf_name = "calc-upload.csv"
, _wf_selection }) jobHandle
addToCorpusWithTempFile (RootId userNodeId)
cId
(NewWithTempFile { _wtf_filetype = TSV
, _wtf_fileformat = Plain
, _wtf_file_oid = fromIntegral oId
, _wtf_lang
, _wtf_name = "calc-upload.csv"
, _wtf_selection }) jobHandle
markComplete jobHandle
where
......
......@@ -31,6 +31,9 @@ import Gargantext.Prelude
import Web.FormUrlEncoded (FromForm, ToForm)
-------------------------------------------------------
-- | A file is uploaded with this type. Then, for internal job
-- creation for the worker, 'NewWithTempFile' is used with a large
-- object oid
data NewWithForm = NewWithForm
{ _wf_filetype :: !FileType
, _wf_fileformat :: !FileFormat
......@@ -51,24 +54,24 @@ instance ToSchema NewWithForm where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_wf_")
-------------------------------------------------------
data NewTempWithForm = NewTempWithForm
{ _twf_filetype :: !FileType
, _twf_fileformat :: !FileFormat
, _twf_file_oid :: !Int
, _twf_lang :: !(Maybe Lang)
, _twf_name :: !Text
, _twf_selection :: !FlowSocialListWith
data NewWithTempFile = NewWithTempFile
{ _wtf_filetype :: !FileType
, _wtf_fileformat :: !FileFormat
, _wtf_file_oid :: !Int
, _wtf_lang :: !(Maybe Lang)
, _wtf_name :: !Text
, _wtf_selection :: !FlowSocialListWith
} deriving (Eq, Show, Generic)
makeLenses ''NewTempWithForm
instance FromForm NewTempWithForm
instance ToForm NewTempWithForm
instance FromJSON NewTempWithForm where
parseJSON = genericParseJSON $ jsonOptions "_twf_"
instance ToJSON NewTempWithForm where
toJSON = genericToJSON $ jsonOptions "_twf_"
instance ToSchema NewTempWithForm where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_twf_")
makeLenses ''NewWithTempFile
instance FromForm NewWithTempFile
instance ToForm NewWithTempFile
instance FromJSON NewWithTempFile where
parseJSON = genericParseJSON $ jsonOptions "_wtf_"
instance ToJSON NewWithTempFile where
toJSON = genericToJSON $ jsonOptions "_wtf_"
instance ToSchema NewWithTempFile where
declareNamedSchema = genericDeclareNamedSchema (unPrefixSwagger "_wtf_")
-------------------------------------------------------
......
......@@ -16,22 +16,24 @@ Portability : POSIX
module Gargantext.API.Routes where
import Data.ByteString.Base64 qualified as BSB64
import Data.Text qualified as T
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser, auth_user_id)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Errors.Types (BackendInternalError)
import Gargantext.API.Node.Types (NewWithForm(..), NewTempWithForm(..))
import Gargantext.API.Prelude (GargServer, GargM, IsGargServer)
import Gargantext.API.Node.Corpus.New.Types ( FileFormat(..) )
import Gargantext.API.Node.Types (NewWithForm(..), NewWithTempFile(..))
import Gargantext.API.Prelude (GargServer, GargM)
import Gargantext.API.Routes.Named.Annuaire qualified as Named
import Gargantext.API.Routes.Named.Corpus qualified as Named
import Gargantext.API.Worker (serveWorkerAPI, serveWorkerAPIm, WorkerAPI)
import Gargantext.Core (Lang)
import Gargantext.API.Worker (serveWorkerAPI, serveWorkerAPIm)
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Admin.Types.Node (CorpusId)
import Gargantext.Database.Prelude (mkCmd)
import Gargantext.Database.Prelude (createLargeObject)
import Gargantext.Prelude
import Servant (Get, FormUrlEncoded, JSON)
import Servant (Get, JSON)
import Servant.Server.Generic (AsServerT)
----------------------------------------------------------------------
......@@ -68,45 +70,33 @@ addCorpusWithQuery user =
, Jobs._acq_cid = cId }
}
addCorpusWithForm :: User -> Named.AddWithForm (AsServerT (GargM Env BackendInternalError))
addCorpusWithForm user =
Named.AddWithForm {
addWithFormEp = \cId -> serveWorkerAPI $ \p ->
-- /NOTE(adinapoli)/ Track the initial steps outside 'addToCorpusWithForm', because it's
-- called in a few places, and the job status might be different between invocations.
-- markStarted 3 jHandle
-- New.addToCorpusWithForm user cid i jHandle
Jobs.AddCorpusFormAsync { Jobs._acf_args = p
, Jobs._acf_user = user
, Jobs._acf_cid = cId }
-- | Uses temporary file stored in postgres to add that file to a corpus
addWithTempFileApi :: AuthenticatedUser
-> Named.AddWithTempFile (AsServerT (GargM Env BackendInternalError))
-- -> WorkerAPI '[FormUrlEncoded] NewWithForm (AsServerT m)
addWithTempFileApi authenticatedUser =
Named.AddWithTempFile {
addWithTempFileEp = \cId ->
serveWorkerAPIm $ \(NewWithForm { .. }) -> do
let bs = case _wf_fileformat of
Plain -> cs _wf_data
ZIP -> case BSB64.decode $ TE.encodeUtf8 _wf_data of
Left err -> panicTrace $ T.pack "[addWithTempFileApi] error decoding base64: " <> T.pack err
Right decoded -> decoded
(PSQL.Oid oId) <- createLargeObject bs
let args = NewWithTempFile { _wtf_filetype = _wf_filetype
, _wtf_fileformat = _wf_fileformat
, _wtf_file_oid = fromIntegral oId
, _wtf_lang = _wf_lang
, _wtf_name = _wf_name
, _wtf_selection = _wf_selection }
pure $ Jobs.AddCorpusTempFileAsync { _actf_args = args
, _actf_cid = cId
, _actf_user = userId }
}
-- | Same as 'addCorpusWithForm' but uses temporary file stored in postgres, so that
addWithTempFileApi :: (IsGargServer env err m)
=> AuthenticatedUser
-> CorpusId
-> WorkerAPI '[FormUrlEncoded] NewWithForm (AsServerT m)
addWithTempFileApi authenticatedUser cId =
serveWorkerAPIm $ \(NewWithForm { .. }) -> do
(PSQL.Oid oId) <- mkCmd (createLargeObject bs)
let args = NewTempWithForm { _twf_filetype = _wf_filetype
, _twf_fileformat = _wf_fileformat
, _twf_file_oid = fromIntegral oid
, _twf_lang = _wf_lang
, _twf_name = _wf_name
, _twf_selection = _wf_selection }
pure $ Jobs.AddCorpusTempFileAsync { _actf_args = args
, _actf_cid = cId
, _actf_user = userId }
where
userId = authenticatedUser ^. auth_user_id
createLargeObject bs c = do
oId <- PSQL.loCreat c
loFd <- PSQL.loOpen c oId PSQL.WriteMode
_ <- PSQL.loWrite c loFd bs
PSQL.loClose c loFd
pure oId
userId = UserDBId $ authenticatedUser ^. auth_user_id
addAnnuaireWithForm :: Named.AddAnnuaireWithForm (AsServerT (GargM Env BackendInternalError))
addAnnuaireWithForm =
......
......@@ -14,7 +14,6 @@ Portability : POSIX
module Gargantext.API.Routes.Named.Corpus (
-- * Routes types
CorpusExportAPI(..)
, AddWithForm(..)
, AddWithTempFile(..)
, AddWithQuery(..)
, MakeSubcorpusAPI(..)
......@@ -44,25 +43,15 @@ newtype CorpusExportAPI mode = CorpusExportAPI
:> Get '[JSON] (Headers '[Servant.Header "Content-Disposition" Text] Corpus)
} deriving Generic
newtype AddWithForm mode = AddWithForm
{ addWithFormEp :: mode :- Summary "Add with FormUrlEncoded to corpus endpoint"
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "add"
:> "form"
:> "async"
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
} deriving Generic
data AddWithTempFile mode = AddWithTempFile
{ addWithTempFileEp :: mode :- Summary "Add with form via temp file"
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "add"
:> "temp-file"
:> "async"
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "add"
:> "temp-file"
:> "async"
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
} deriving Generic
newtype AddWithQuery mode = AddWithQuery
......
......@@ -45,7 +45,7 @@ import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Types ( RenameNode(..), NodesToScore(..), NodesToCategory(..) )
import Gargantext.API.Node.Update.Types ( UpdateNodeParams(..), Charts(..), Granularity(..), Method(..) )
import Gargantext.API.Routes.Named.Document (DocumentsFromWriteNodesAPI, DocumentUploadAPI)
import Gargantext.API.Routes.Named.File (FileAsyncAPI, FileAPI, TempFileAsyncAPI)
import Gargantext.API.Routes.Named.File (FileAsyncAPI, FileAPI)
import Gargantext.API.Routes.Named.FrameCalc (FrameCalcAPI)
import Gargantext.API.Routes.Named.Metrics (ChartAPI, PieAPI, ScatterAPI, TreeAPI)
import Gargantext.API.Routes.Named.Publish (PublishAPI)
......
......@@ -30,7 +30,7 @@ import Gargantext.API.Admin.Auth.Types (AuthenticatedUser)
import Gargantext.API.Auth.PolicyCheck (PolicyChecked)
import Gargantext.API.Routes.Named.Contact (ContactAPI)
import Gargantext.API.Routes.Named.Context (ContextAPI)
import Gargantext.API.Routes.Named.Corpus (AddWithForm, AddWithQuery, CorpusExportAPI, MakeSubcorpusAPI)
import Gargantext.API.Routes.Named.Corpus (AddWithTempFile, AddWithQuery, CorpusExportAPI, MakeSubcorpusAPI)
import Gargantext.API.Routes.Named.Count (CountAPI, Query)
import Gargantext.API.Routes.Named.Document (DocumentExportAPI)
import Gargantext.API.Routes.Named.List (GETAPI, JSONAPI, TSVAPI)
......@@ -93,7 +93,6 @@ data GargPrivateAPI' mode = GargPrivateAPI'
:> Capture "tree_id" NodeId
:> NamedRoutes TreeFlatAPI
, membersAPI :: mode :- "members" :> Summary "Team node members" :> NamedRoutes MembersAPI
, addWithFormAPI :: mode :- NamedRoutes AddWithForm
, addWithTempFile :: mode :- NamedRoutes AddWithTempFile
, addWithQueryEp :: mode :- NamedRoutes AddWithQuery
, makeSubcorpusAPI :: mode :- NamedRoutes MakeSubcorpusAPI
......
......@@ -18,7 +18,7 @@ import Gargantext.API.Node.Document.Export (documentExportAPI)
import Gargantext.API.Node.Phylo.Export qualified as PhyloExport
import Gargantext.API.Node.ShareURL ( shareURL )
import Gargantext.API.Prelude (GargM)
import Gargantext.API.Routes (addCorpusWithForm, addCorpusWithQuery, addWithTempFileApi)
import Gargantext.API.Routes (addWithTempFileApi, addCorpusWithQuery)
import Gargantext.API.Routes.Named.Private qualified as Named
import Gargantext.API.Server.Named.Ngrams (apiNgramsTableDoc)
import Gargantext.API.Server.Named.Viz qualified as Viz
......@@ -59,8 +59,7 @@ serverPrivateGargAPI' authenticatedUser@(AuthenticatedUser userNodeId userId)
, treeAPI = Tree.treeAPI authenticatedUser
, treeFlatAPI = Tree.treeFlatAPI authenticatedUser
, membersAPI = members
, addWithFormAPI = addCorpusWithForm (RootId userNodeId)
, addWithTempFile = addWithTempFileApi (RootId userNodeId)
, addWithTempFile = addWithTempFileApi authenticatedUser
, addWithQueryEp = addCorpusWithQuery (RootId userNodeId)
, makeSubcorpusAPI = Subcorpus.makeSubcorpus userId
, listGetAPI = List.getAPI
......
......@@ -312,7 +312,8 @@ getMultipleLinefile bl del headers res x = do
then checkNextLine bl del headers res x
else
if (length tmp > length headers) || (V.length bl == (x + 1))
then Left (pack $ "Cannot parse the file at line " <> show x <> ". Maybe because of a delimiter")
then
Left (pack $ "Cannot parse the file at line " <> show x <> ". Maybe because of a delimiter")
else do
case BL.append res <$> ((V.!?) bl (x+1)) of
Nothing -> Left "getMultipleLinefile"
......@@ -591,7 +592,7 @@ parseTsv' bs = (V.toList . V.map tsv2doc . snd) <$> readTsvLazyBS Comma bs
parseTsv' :: BL.ByteString -> Either Text [HyperdataDocument]
parseTsv' bs = do
let
result = case (testCorrectFile bs) of
result = case testCorrectFile bs of
Left _err -> Left _err
Right del -> readTsvLazyBS del bs
V.toList . V.map tsv2doc . snd <$> result
......@@ -601,7 +602,7 @@ parseTsvC :: BL.ByteString
parseTsvC bs =
(\(_h, rs) -> (fromIntegral $ V.length rs, yieldMany rs .| mapC tsv2doc)) <$> eResult
where
eResult = case (testCorrectFile bs) of
eResult = case testCorrectFile bs of
Left _err -> Left _err
Right del -> readTsvLazyBS del bs
......
......@@ -21,6 +21,7 @@ module Gargantext.Core.Worker where
import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Control.Exception.Safe qualified as CES
import Control.Lens (to)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
......@@ -28,12 +29,13 @@ import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
import Gargantext.API.Ngrams.List (postAsyncJSON)
import Gargantext.API.Node.Corpus.Annuaire qualified as Annuaire
import Gargantext.API.Node.Contact (addContact)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm, addToCorpusWithQuery)
import Gargantext.API.Node.Corpus.New (addToCorpusWithTempFile, addToCorpusWithQuery)
import Gargantext.API.Node.DocumentsFromWriteNodes (documentsFromWriteNodes)
import Gargantext.API.Node.DocumentUpload (documentUploadAsync)
import Gargantext.API.Node.FrameCalcUpload (frameCalcUploadAsync)
import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Types (_wtf_file_oid)
import Gargantext.API.Node.Update (updateNode)
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_notifications_config, gc_worker)
......@@ -47,6 +49,7 @@ import Gargantext.Core.Worker.Env
import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Prelude (removeLargeObject)
import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger )
......@@ -223,15 +226,12 @@ performAction env _state bm = do
$(logLocM) DEBUG $ "[performAction] add contact"
addContact _ac_user _ac_node_id _ac_args jh
-- | Send a file with documents and index them in corpus
AddCorpusFormAsync { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] add corpus form"
addToCorpusWithForm _acf_user _acf_cid _acf_args jh
-- | Uses temporary file to add documents into corpus
AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] add to corpus with temporary file"
addWithFile _awf_authenticatedUser _awf_node_id _awf_args jh
CES.finally (do
$(logLocM) DEBUG "[performAction] add to corpus with temporary file"
addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh)
(removeLargeObject $ _wtf_file_oid _actf_args)
-- | Perform external API search query and index documents in corpus
AddCorpusWithQuery { .. } -> runWorkerMonad env $ do
......
......@@ -18,6 +18,7 @@ module Gargantext.Core.Worker.Env where
import Control.Concurrent.STM.TVar (TVar, modifyTVar, newTVarIO, readTVarIO)
import Control.Exception.Safe qualified as CES
import Control.Lens (prism', to, view)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Maybe (fromJust)
......@@ -177,7 +178,10 @@ newtype WorkerMonad a =
, MonadBase IO
, MonadBaseControl IO
, MonadError IOException
, MonadFail )
, MonadFail
, CES.MonadThrow
, CES.MonadCatch
, CES.MonadMask )
instance HasLogger WorkerMonad where
data instance Logger WorkerMonad =
......
......@@ -50,7 +50,6 @@ sendJobWithCfg gcConfig job = do
-- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> SendJob -> SendJob
updateJobData (AddCorpusFormAsync {}) sj = sj { W.timeout = 3000 }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 }
updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 }
updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000 }
......
......@@ -24,7 +24,7 @@ import Gargantext.API.Node.DocumentUpload.Types (DocumentUpload)
import Gargantext.API.Node.FrameCalcUpload.Types (FrameCalcUpload)
import Gargantext.API.Node.New.Types ( PostNode(..) )
import Gargantext.API.Node.Update.Types (UpdateNodeParams)
import Gargantext.API.Node.Types (NewWithFile, NewWithForm, NewTempWithForm, WithQuery(..))
import Gargantext.API.Node.Types (NewWithFile, NewWithTempFile, WithQuery(..))
import Gargantext.Core.Types.Individu (User)
import Gargantext.Database.Admin.Types.Node (AnnuaireId, CorpusId, ListId, NodeId(UnsafeMkNodeId))
import Gargantext.Prelude
......@@ -35,10 +35,7 @@ data Job =
| AddContact { _ac_args :: AddContactParams
, _ac_node_id :: NodeId
, _ac_user :: User }
| AddCorpusFormAsync { _acf_args :: NewWithForm
, _acf_user :: User
, _acf_cid :: CorpusId }
| AddCorpusTempFileAsync { _actf_args :: NewTempWithForm
| AddCorpusTempFileAsync { _actf_args :: NewWithTempFile
, _actf_user :: User
, _actf_cid :: CorpusId }
| AddCorpusWithQuery { _acq_args :: WithQuery
......@@ -79,11 +76,6 @@ instance FromJSON Job where
_ac_node_id <- o .: "node_id"
_ac_user <- o .: "user"
return $ AddContact { .. }
"AddCorpusFormAsync" -> do
_acf_args <- o .: "args"
_acf_user <- o .: "user"
_acf_cid <- o .: "cid"
return $ AddCorpusFormAsync { .. }
"AddCorpusTempFileAsync" -> do
_actf_args <- o .: "args"
_actf_user <- o .: "user"
......@@ -148,11 +140,6 @@ instance ToJSON Job where
, "args" .= _ac_args
, "user" .= _ac_user
, "node_id" .= _ac_node_id ]
toJSON (AddCorpusFormAsync { .. }) =
object [ "type" .= ("AddCorpusFormAsync" :: Text)
, "args" .= _acf_args
, "user" .= _acf_user
, "cid" .= _acf_cid ]
toJSON (AddCorpusTempFileAsync { .. }) =
object [ "type" .= ("AddCorpusTempFileAsync" :: Text)
, "args" .= _actf_args
......@@ -223,7 +210,6 @@ instance ToJSON Job where
getWorkerMNodeId :: Job -> Maybe NodeId
getWorkerMNodeId Ping = Nothing
getWorkerMNodeId (AddContact { _ac_node_id }) = Just _ac_node_id
getWorkerMNodeId (AddCorpusFormAsync { _acf_args, _acf_cid }) = Just _acf_cid
getWorkerMNodeId (AddCorpusTempFileAsync { _actf_cid }) = Just _actf_cid
getWorkerMNodeId (AddCorpusWithQuery { _acq_args = WithQuery { _wq_node_id }}) = Just $ UnsafeMkNodeId _wq_node_id
getWorkerMNodeId (AddToAnnuaireWithForm { _aawf_annuaire_id }) = Just _aawf_annuaire_id
......
......@@ -51,6 +51,8 @@ module Gargantext.Database.Prelude
, fromField'
, mkCmd
, restrictMaybe
, createLargeObject
, removeLargeObject
)
where
......@@ -59,7 +61,7 @@ import Control.Lens (Getter, view)
import Control.Monad.Random ( MonadRandom )
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson (Result(..))
import Data.ByteString qualified as DB
import Data.ByteString qualified as BS
import Data.List qualified as DL
import Data.Pool (Pool, withResource)
import Data.Profunctor.Product.Default (Default)
......@@ -67,6 +69,7 @@ import Database.PostgreSQL.Simple (Connection)
import Database.PostgreSQL.Simple qualified as PGS
import Database.PostgreSQL.Simple.FromField ( Conversion, ResultError(ConversionFailed), fromField, returnError)
import Database.PostgreSQL.Simple.Internal (Field)
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
import Database.PostgreSQL.Simple.Types (Query(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Config (HasConfig(..))
......@@ -199,7 +202,7 @@ runCountOpaQuery q = do
-- countRows is guaranteed to return a list with exactly one row so DL.head is safe here
pure $ fromInt64ToInt $ DL.head counts
formatPGSQuery :: PGS.ToRow a => PGS.Query -> a -> DBCmd err DB.ByteString
formatPGSQuery :: PGS.ToRow a => PGS.Query -> a -> DBCmd err BS.ByteString
formatPGSQuery q a = mkCmd $ \conn -> PGS.formatQuery conn q a
runPGSQuery :: ( PGS.FromRow r, PGS.ToRow q )
......@@ -224,7 +227,7 @@ execPGSQuery :: PGS.ToRow a => PGS.Query -> a -> DBCmd err Int64
execPGSQuery q a = mkCmd $ \conn -> PGS.execute conn q a
fromField' :: (Typeable b, FromJSON b) => Field -> Maybe DB.ByteString -> Conversion b
fromField' :: (Typeable b, FromJSON b) => Field -> Maybe BS.ByteString -> Conversion b
fromField' field mb = do
v <- fromField field mb
valueToHyperdata v
......@@ -263,3 +266,15 @@ createDBIfNotExists connStr dbName = do
(result,) <$> SH.lastExitCode
return ()
createLargeObject :: BS.ByteString -> DBCmd err PSQL.Oid
createLargeObject bs = mkCmd $ \c -> PGS.withTransaction c $ do
oId <- PSQL.loCreat c
loFd <- PSQL.loOpen c oId PSQL.WriteMode
_ <- PSQL.loWrite c loFd bs
PSQL.loClose c loFd
pure oId
removeLargeObject :: Int -> DBCmd err ()
removeLargeObject oId = mkCmd $ \c -> do
PSQL.loUnlink c $ PSQL.Oid $ fromIntegral oId
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment