[large object] trying to make the tests pass, but still no luck

parent 70c413a8
Pipeline #7313 failed with stages
in 65 minutes and 10 seconds
......@@ -21,11 +21,13 @@ module Gargantext.API.Node.Corpus.New
import Conduit ((.|), yieldMany, mapMC, mapC, transPipe)
import Control.Exception.Safe (MonadMask)
import Control.Lens ( view, non )
import Data.ByteString.Lazy qualified as BSL
import Data.Conduit.Internal (zipSources)
import Data.Swagger ( ToSchema(..) )
import Data.Text qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
import Gargantext.API.Admin.Orchestrator.Types qualified as API
import Gargantext.API.Ngrams (commitStatePatch, Versioned(..))
......@@ -51,7 +53,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(
import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) )
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), ParentId)
import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Database.Prelude (mkCmd)
import Gargantext.Database.Prelude (readLargeObjectViaTempFile)
import Gargantext.Database.Query.Table.Node (getNodeWith, getOrMkList)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(MkCorpusUserNormalCorpusIds))
......@@ -213,7 +215,8 @@ addToCorpusWithQuery user cid (WithQuery { _wq_query = q
$(logLocM) ERROR $ "[addToCorpusWithQuery] error: " <> show err -- log the full error
markFailed (Just err) jobHandle
addToCorpusWithTempFile :: ( FlowCmdM env err m
addToCorpusWithTempFile :: ( MonadMask m
, FlowCmdM env err m
, MonadJobStatus m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
......@@ -244,12 +247,11 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
WOS -> Parser.parseFormatC Parser.WOS
-- TODO granularity of the logStatus
data' <- mkCmd $ \c -> PSQL.withTransaction c $ do
let oId = PSQL.Oid $ fromIntegral $ nwtf ^. wtf_file_oid
loFd <- PSQL.loOpen c oId PSQL.ReadMode
-- TODO: Chunks?
size <- PSQL.loTell c loFd
PSQL.loRead c loFd size
(data'', size) <- readLargeObjectViaTempFile oId
let data' = BSL.toStrict data''
$(logLocM) DEBUG $ "[addToCorpusWithTempFile] size: " <> show size
$(logLocM) DEBUG $ "[addToCorpusWithTempFile] data': " <> TE.decodeUtf8 data'
eDocsC <- liftBase $ parseC (nwtf ^. wtf_fileformat) data'
case eDocsC of
Right (count, docsC) -> do
......@@ -260,7 +262,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
.| mapMC (\(idx, doc) ->
if idx > limit then do
--printDebug "[addToCorpusWithForm] number of docs exceeds the limit" (show limit)
let panicMsg' = [ "[addToCorpusWithForm] number of docs "
let panicMsg' = [ "[addToCorpusWithTempFile] number of docs "
, "exceeds the MAX_DOCS_PARSERS limit ("
, show limit
, ")" ]
......@@ -295,7 +297,7 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
markComplete jobHandle
Left parseErr -> do
$(logLocM) ERROR $ "[addToCorpusWithForm] parse error: " <> (Parser._ParseFormatError parseErr)
$(logLocM) ERROR $ "[addToCorpusWithTempFile] parse error: " <> (Parser._ParseFormatError parseErr)
markFailed (Just parseErr) jobHandle
{-
......
......@@ -14,6 +14,7 @@ Portability : POSIX
module Gargantext.API.Node.FrameCalcUpload where
import Control.Exception.Safe (MonadMask)
import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T
import Database.PostgreSQL.Simple.LargeObjects qualified as PSQL
......@@ -55,7 +56,8 @@ api authenticatedUser nId =
frameCalcUploadAsync :: ( HasConfig env
frameCalcUploadAsync :: ( MonadMask m
, HasConfig env
, FlowCmdM env err m
, MonadJobStatus m
, HasNodeArchiveStoryImmediateSaver env
......
......@@ -11,6 +11,7 @@ Portability : POSIX
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
......@@ -33,6 +34,7 @@ import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.Database.Prelude (createLargeObject)
import Gargantext.Prelude
import Gargantext.System.Logging ( logLocM, LogLevel(..) )
import Servant (Get, JSON)
import Servant.Server.Generic (AsServerT)
......@@ -84,6 +86,7 @@ addWithTempFileApi authenticatedUser =
Left err -> panicTrace $ T.pack "[addWithTempFileApi] error decoding base64: " <> T.pack err
Right decoded -> decoded
(PSQL.Oid oId) <- createLargeObject bs
$(logLocM) DEBUG $ "[addWithTempFileApi] oId': " <> show oId
let args = NewWithTempFile { _wtf_filetype = _wf_filetype
, _wtf_fileformat = _wf_fileformat
, _wtf_file_oid = fromIntegral oId
......
......@@ -49,7 +49,7 @@ data AddWithTempFile mode = AddWithTempFile
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "add"
:> "temp-file"
:> "form"
:> "async"
:> NamedRoutes (WorkerAPI '[FormUrlEncoded] NewWithForm)
} deriving Generic
......
......@@ -32,14 +32,7 @@ data WorkerAPI contentType input mode = WorkerAPI
serveWorkerAPI :: IsGargServer env err m
=> (input -> Job)
-> WorkerAPI contentType input (AsServerT m)
serveWorkerAPI f = WorkerAPI { workerAPIPost }
where
workerAPIPost i = do
let job = f i
logM DDEBUG $ "[serveWorkerAPI] sending job " <> show job
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
serveWorkerAPI f = serveWorkerAPIm (pure . f)
serveWorkerAPIEJob :: (MonadError err m, IsGargServer env err m)
=> (input -> Either err Job)
......@@ -62,7 +55,7 @@ serveWorkerAPIm f = WorkerAPI { workerAPIPost }
where
workerAPIPost i = do
job <- f i
logM DDEBUG $ "[serveWorkerAPI] sending job " <> show job
logM DDEBUG $ "[serveWorkerAPIm] sending job " <> show job
mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job }
......@@ -21,7 +21,7 @@ module Gargantext.Core.Worker where
import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Control.Exception.Safe qualified as CES
-- import Control.Exception.Safe qualified as CES
import Control.Lens (to)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword)
......@@ -228,10 +228,13 @@ performAction env _state bm = do
-- | Uses temporary file to add documents into corpus
AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do
CES.finally (do
-- TODO CES.filnally
$(logLocM) DEBUG "[performAction] add to corpus with temporary file"
addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh)
(removeLargeObject $ _wtf_file_oid _actf_args)
addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh
let oId = _wtf_file_oid _actf_args
$(logLocM) DEBUG $ "[performAction] removing large object: " <> show oId
removeLargeObject oId
-- | Perform external API search query and index documents in corpus
AddCorpusWithQuery { .. } -> runWorkerMonad env $ do
......
......@@ -52,16 +52,19 @@ module Gargantext.Database.Prelude
, mkCmd
, restrictMaybe
, createLargeObject
, readLargeObject
, readLargeObjectViaTempFile
, removeLargeObject
)
where
import Control.Exception.Safe (throw)
import Control.Exception.Safe qualified as CES
import Control.Lens (Getter, view)
import Control.Monad.Random ( MonadRandom )
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson (Result(..))
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BSL
import Data.List qualified as DL
import Data.Pool (Pool, withResource)
import Data.Profunctor.Product.Default (Default)
......@@ -81,6 +84,8 @@ import Opaleye.Aggregate (countRows)
import Opaleye.Internal.Constant qualified
import Opaleye.Internal.Operators qualified
import Shelly qualified as SH
import System.Directory (removeFile)
import System.IO.Temp (emptySystemTempFile)
-- $typesAndConstraints
......@@ -212,7 +217,7 @@ runPGSQuery q a = mkCmd $ \conn -> catch (PGS.query conn q a) (printError conn)
printError c (SomeException e) = do
q' <- PGS.formatQuery c q a
hPutStrLn stderr q'
throw (SomeException e)
CES.throw (SomeException e)
-- | TODO catch error
runPGSQuery_ :: ( PGS.FromRow r )
......@@ -221,7 +226,7 @@ runPGSQuery_ q = mkCmd $ \conn -> catch (PGS.query_ conn q) printError
where
printError (SomeException e) = do
hPutStrLn stderr (fromQuery q)
throw (SomeException e)
CES.throw (SomeException e)
execPGSQuery :: PGS.ToRow a => PGS.Query -> a -> DBCmd err Int64
execPGSQuery q a = mkCmd $ \conn -> PGS.execute conn q a
......@@ -267,6 +272,11 @@ createDBIfNotExists connStr dbName = do
return ()
------------------------------
-- PostgreSQL Large Object functionality
-- https://www.postgresql.org/docs/17/largeobjects.html
createLargeObject :: BS.ByteString -> DBCmd err PSQL.Oid
createLargeObject bs = mkCmd $ \c -> PGS.withTransaction c $ do
oId <- PSQL.loCreat c
......@@ -275,6 +285,32 @@ createLargeObject bs = mkCmd $ \c -> PGS.withTransaction c $ do
PSQL.loClose c loFd
pure oId
readLargeObject :: PSQL.Oid -> DBCmd err (BSL.ByteString, Int)
readLargeObject oId = mkCmd $ \c -> PGS.withTransaction c $ do
loFd <- PSQL.loOpen c oId PSQL.ReadMode
let chunkSize = 1024
let readChunks tell = do
c' <- PSQL.loRead c loFd chunkSize
tell' <- PSQL.loTell c loFd
putText $ "[readLargeObject] tell': " <> show tell'
if tell == tell' then
pure ([c'], tell)
else do
(cs', tell'') <- readChunks tell'
pure (c':cs', tell'')
(chunks, size) <- readChunks 0
pure (BSL.fromChunks chunks, size)
readLargeObjectViaTempFile :: (CES.MonadMask m, IsDBCmd env err m)
=> PSQL.Oid -> m (BSL.ByteString, Int)
readLargeObjectViaTempFile oId =
CES.bracket (liftBase $ emptySystemTempFile "large-object")
(liftBase . removeFile)
(\fp -> do
mkCmd $ \c -> PSQL.loExport c oId fp
c <- liftBase $ BSL.readFile fp
pure (c, fromIntegral $ BSL.length c))
removeLargeObject :: Int -> DBCmd err ()
removeLargeObject oId = mkCmd $ \c -> do
PSQL.loUnlink c $ PSQL.Oid $ fromIntegral oId
......@@ -46,7 +46,7 @@ import Gargantext.API.Ngrams.Types
import Gargantext.API.Node.Corpus.New.Types qualified as FType
import Gargantext.API.Node.Types
import Gargantext.API.Routes.Named
import Gargantext.API.Routes.Named.Corpus
import Gargantext.API.Routes.Named.Corpus (addWithTempFileEp)
import Gargantext.API.Routes.Named.Node
import Gargantext.API.Routes.Named.Private
import Gargantext.API.Worker (workerAPIPost)
......@@ -388,8 +388,8 @@ add_file_async (toServantToken -> token) corpusId nwf =
& gargPrivateAPI
& mkPrivateAPI
& ($ token)
& addWithFormAPI
& addWithFormEp
& addWithTempFile
& addWithTempFileEp
& ($ corpusId)
& workerAPIPost
& (\submitForm -> submitForm nwf)
......
......@@ -89,12 +89,12 @@ setup = do
-- putText $ "[setup] database: " <> show (gargConfig ^. gc_database_config)
-- putText $ "[setup] worker db: " <> show (gargConfig ^. gc_worker . wsDatabase)
let idleTime = 60.0
let maxResources = 2
let maxResources = 5
let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
pool <- newPool (setNumStripes (Just 2) poolConfig)
pool <- newPool (setNumStripes (Just 4) poolConfig)
bootstrapDB db pool gargConfig
ugen <- emptyCounter
test_nodeStory <- fromDBNodeStoryEnv pool
......@@ -104,7 +104,7 @@ setup = do
PG.close
idleTime
maxResources
wPool <- newPool (setNumStripes (Just 2) wPoolConfig)
wPool <- newPool (setNumStripes (Just 4) wPoolConfig)
wNodeStory <- fromDBNodeStoryEnv wPool
_w_env_job_state <- newTVarIO Nothing
withLoggerHoisted Mock $ \wioLogger -> do
......
......@@ -39,7 +39,7 @@ import Gargantext.API.Node.Get (GetNodeParams)
import Gargantext.API.Node.New.Types (PostNode(..))
import Gargantext.API.Node.Share.Types (ShareNodeParams(..))
import Gargantext.API.Node.Update.Types qualified as NU
import Gargantext.API.Node.Types (NewWithForm, RenameNode(..), WithQuery)
import Gargantext.API.Node.Types (NewWithForm, NewWithTempFile(..), RenameNode(..), WithQuery)
import Gargantext.API.Public.Types (PublicData(..))
import Gargantext.API.Routes.Named.Publish (PublishRequest(..))
import Gargantext.API.Search.Types (SearchQuery(..), SearchResult(..), SearchResultTypes(..), SearchType(..))
......@@ -569,6 +569,14 @@ genFrontendErr be = do
pure $ Errors.mkFrontendErr' txt $ Errors.FE_job_generic_exception err
instance Arbitrary NewWithTempFile where
arbitrary = NewWithTempFile <$> arbitrary -- _wtf_filetype
<*> arbitrary -- _wtf_fileformat
<*> arbitrary -- _wtf_file_oid
<*> arbitrary -- _wtf_lang
<*> arbitrary -- _wtf_name
<*> arbitrary -- _wtf_selection
instance Arbitrary Job where
arbitrary = oneof [ pure Ping
......@@ -588,7 +596,7 @@ instance Arbitrary Job where
, uploadDocumentGen ]
where
addContactGen = AddContact <$> arbitrary <*> arbitrary <*> arbitrary
addCorpusFormAsyncGen = AddCorpusFormAsync <$> arbitrary <*> arbitrary <*> arbitrary
addCorpusFormAsyncGen = AddCorpusTempFileAsync <$> arbitrary <*> arbitrary <*> arbitrary
addCorpusWithQueryGen = AddCorpusWithQuery <$> arbitrary <*> arbitrary <*> arbitrary
-- addWithFileGen = AddWithFile <$> arbitrary <*> arbitrary <*> arbitrary
addToAnnuaireWithFormGen = AddToAnnuaireWithForm <$> arbitrary <*> arbitrary
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment