Verified Commit df22d747 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

Merge branch 'dev' into 465-dev-share-url-fixes

parents cca0e2fa 397b19a5
...@@ -82,6 +82,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -82,6 +82,8 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _ac_scrapyd_url } , _ac_scrapyd_url }
, _gc_worker = WorkerSettings { _wsDefinitions = [ wd ] , _gc_worker = WorkerSettings { _wsDefinitions = [ wd ]
, _wsDefaultVisibilityTimeout = 1 , _wsDefaultVisibilityTimeout = 1
, _wsDefaultJobTimeout = 60
, _wsLongJobTimeout = 3000
, _wsDefaultDelay = 0 , _wsDefaultDelay = 0
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_logging = Config.LogConfig { , _gc_logging = Config.LogConfig {
...@@ -141,5 +143,8 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig ...@@ -141,5 +143,8 @@ defaultNotificationsConfig :: CTypes.NotificationsConfig
defaultNotificationsConfig = defaultNotificationsConfig =
CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560" CTypes.NotificationsConfig { _nc_central_exchange_bind = "tcp://*:5560"
, _nc_central_exchange_connect = "tcp://localhost:5560" , _nc_central_exchange_connect = "tcp://localhost:5560"
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "tcp://*:5561" , _nc_dispatcher_bind = "tcp://*:5561"
, _nc_dispatcher_connect = "tcp://localhost:5561" } , _nc_dispatcher_connect = "tcp://localhost:5561"
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500 }
...@@ -16,8 +16,8 @@ fi ...@@ -16,8 +16,8 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and # with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI # `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in. # cache can kick in.
expected_cabal_project_hash="c7e0466c8d4c1ca88b4f3d62d022bd29329d44afc48fffbcfacf0f65293acba8" expected_cabal_project_hash="eb8fdb1a14aa2f7a13f565cf7fa9f6ab0e2dab9212538aed0db5691015be286b"
expected_cabal_project_freeze_hash="553b98aadb35506a305bd740cdd71f5fadc1e6d55d10f91cf39daa6735a63d78" expected_cabal_project_freeze_hash="553b98aadb35506a305bd740cdd71f5fadc1e6d55d10f91cf39daa6735a63d78"
cabal --store-dir=$STORE_DIR v2-build --dry-run cabal --store-dir=$STORE_DIR v2-build --dry-run
......
...@@ -151,12 +151,12 @@ source-repository-package ...@@ -151,12 +151,12 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: 1dd92f0aa8e9f8096064e5656c336e562680f4e3 tag: 9a869df2842eccc86a0f31a69fb8dc5e5ca218a8
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 4a9c709613554eed0189b486de2126c18797088c tag: 05c39e424d15149dc32097b3318cb6007e0e7052
subdir: haskell-bee/ subdir: haskell-bee/
haskell-bee-pgmq/ haskell-bee-pgmq/
haskell-bee-tests/ haskell-bee-tests/
......
...@@ -122,9 +122,19 @@ smtp_host = "localhost" ...@@ -122,9 +122,19 @@ smtp_host = "localhost"
# HOST_password = password # HOST_password = password
[notifications] [notifications.central-exchange]
central-exchange = { bind = "tcp://*:5560", connect = "tcp://127.0.0.1:5560" } bind = "tcp://:5560"
dispatcher = { bind = "tcp://*:5561", connect = "tcp://127.0.0.1:5561" } connect = "tcp://127.0.0.1:5560"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 200
[notifications.dispatcher]
bind = "tcp://:5561"
connect = "tcp://127.0.0.1:5561"
# see https://gitlab.iscpif.fr/gargantext/haskell-gargantext/commit/77a687ea1483441675320fd2413fac52bb112a4c
send_timeout_ms = 500
# Same dispatcher messages are throttled, this is the throttle delay
throttle_ms = 500
[nlp] [nlp]
...@@ -148,6 +158,11 @@ default_visibility_timeout = 1 ...@@ -148,6 +158,11 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 0 default_delay = 0
# default timeout (in seconds)
default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# if you leave the same credentials as in [database] section above, # if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database # workers will try to set up the `gargantext_pgmq` database
# automatically # automatically
......
...@@ -21,6 +21,7 @@ rec { ...@@ -21,6 +21,7 @@ rec {
gargGhc gargGhc
cabal_install cabal_install
pkgs.haskellPackages.alex pkgs.haskellPackages.alex
pkgs.haskellPackages.ghcid
pkgs.haskellPackages.happy pkgs.haskellPackages.happy
pkgs.haskellPackages.pretty-show pkgs.haskellPackages.pretty-show
]; ];
......
...@@ -55,7 +55,7 @@ defaultSettingsFile :: SettingsFile ...@@ -55,7 +55,7 @@ defaultSettingsFile :: SettingsFile
defaultSettingsFile = SettingsFile "gargantext-settings.toml" defaultSettingsFile = SettingsFile "gargantext-settings.toml"
-- | Run Cmd Sugar for the Repl (GHCI) -- | Run Cmd Sugar for the Repl (GHCI)
runCmdRepl :: Show err => CmdRandom DevEnv err a -> IO a runCmdRepl :: (Typeable err, Show err) => CmdRandom DevEnv err a -> IO a
runCmdRepl f = withDevEnv defaultSettingsFile $ \env -> runCmdDev env f runCmdRepl f = withDevEnv defaultSettingsFile $ \env -> runCmdDev env f
runCmdReplServantErr :: CmdRandom DevEnv ServerError a -> IO a runCmdReplServantErr :: CmdRandom DevEnv ServerError a -> IO a
...@@ -65,7 +65,7 @@ runCmdReplServantErr = runCmdRepl ...@@ -65,7 +65,7 @@ runCmdReplServantErr = runCmdRepl
-- the command. -- the command.
-- This function is constrained to the DevEnv rather than -- This function is constrained to the DevEnv rather than
-- using HasConnectionPool and HasRepoVar. -- using HasConnectionPool and HasRepoVar.
runCmdDev :: Show err => DevEnv -> CmdRandom DevEnv err a -> IO a runCmdDev :: (Typeable err, Show err) => DevEnv -> CmdRandom DevEnv err a -> IO a
runCmdDev env f = runCmdDev env f =
either (fail . show) pure =<< runCmd env f either (fail . show) pure =<< runCmd env f
......
...@@ -257,6 +257,8 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do ...@@ -257,6 +257,8 @@ addToCorpusWithTempFile user cid nwtf jobHandle = do
-- TODO Add progress (jobStatus) update for docs - this is a -- TODO Add progress (jobStatus) update for docs - this is a
-- long action -- long action
markStarted (fromIntegral count) jobHandle
let docsC' = zipSources (yieldMany [1..]) (transPipe liftBase docsC) let docsC' = zipSources (yieldMany [1..]) (transPipe liftBase docsC)
.| mapMC (\(idx, doc) -> .| mapMC (\(idx, doc) ->
if idx > limit then do if idx > limit then do
......
...@@ -332,23 +332,29 @@ makeLenses ''APIsConfig ...@@ -332,23 +332,29 @@ makeLenses ''APIsConfig
data NotificationsConfig = data NotificationsConfig =
NotificationsConfig { _nc_central_exchange_bind :: ~T.Text NotificationsConfig { _nc_central_exchange_bind :: ~T.Text
, _nc_central_exchange_connect :: ~T.Text , _nc_central_exchange_connect :: ~T.Text
, _nc_dispatcher_bind :: ~T.Text , _nc_ce_send_timeout_ms :: ~Int
, _nc_dispatcher_connect :: ~T.Text } , _nc_dispatcher_bind :: ~T.Text
, _nc_dispatcher_connect :: ~T.Text
, _nc_dispatcher_send_timeout_ms :: ~Int
, _nc_dispatcher_throttle_ms :: ~Int }
deriving (Show, Eq) deriving (Show, Eq)
instance FromValue NotificationsConfig where instance FromValue NotificationsConfig where
fromValue = parseTableFromValue $ do fromValue = parseTableFromValue $ do
(_nc_central_exchange_bind, _nc_central_exchange_connect) <- (_nc_central_exchange_bind, _nc_central_exchange_connect, _nc_ce_send_timeout_ms) <-
reqKeyOf "central-exchange" $ parseTableFromValue $ do reqKeyOf "central-exchange" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "send_timeout_ms"
(_nc_dispatcher_bind, _nc_dispatcher_connect) <- pure (b, c, t)
(_nc_dispatcher_bind, _nc_dispatcher_connect, _nc_dispatcher_send_timeout_ms, _nc_dispatcher_throttle_ms) <-
reqKeyOf "dispatcher" $ parseTableFromValue $ do reqKeyOf "dispatcher" $ parseTableFromValue $ do
b <- reqKey "bind" b <- reqKey "bind"
c <- reqKey "connect" c <- reqKey "connect"
pure (b, c) t <- reqKey "send_timeout_ms"
tt <- reqKey "throttle_ms"
pure (b, c, t, tt)
return $ NotificationsConfig { .. } return $ NotificationsConfig { .. }
instance ToValue NotificationsConfig where instance ToValue NotificationsConfig where
toValue = defaultTableToValue toValue = defaultTableToValue
...@@ -356,8 +362,11 @@ instance ToTable NotificationsConfig where ...@@ -356,8 +362,11 @@ instance ToTable NotificationsConfig where
toTable (NotificationsConfig { .. }) = toTable (NotificationsConfig { .. }) =
table [ "central-exchange" .= table [ "central-exchange" .=
table [ "bind" .= _nc_central_exchange_bind table [ "bind" .= _nc_central_exchange_bind
, "connect" .= _nc_central_exchange_connect ] , "connect" .= _nc_central_exchange_connect
, "send_timeout_ms" .= _nc_ce_send_timeout_ms ]
, "dispatcher" .= , "dispatcher" .=
table [ "bind" .= _nc_dispatcher_bind table [ "bind" .= _nc_dispatcher_bind
, "connect" .= _nc_dispatcher_connect ] , "connect" .= _nc_dispatcher_connect
, "send_timeout_ms" .= _nc_dispatcher_send_timeout_ms
, "throttle" .= _nc_dispatcher_throttle_ms ]
] ]
...@@ -38,8 +38,13 @@ type WorkerName = Text ...@@ -38,8 +38,13 @@ type WorkerName = Text
data WorkerSettings = data WorkerSettings =
WorkerSettings { WorkerSettings {
_wsDatabase :: !PGS.ConnectInfo _wsDatabase :: !PGS.ConnectInfo
-- After this number of seconds, the job will be available again.
-- | default job timeout, in seconds
, _wsDefaultJobTimeout :: ~Int
-- | default "long" job timeout, in seconds
, _wsLongJobTimeout :: ~Int
-- After this number of seconds, the job will be available again.
-- You can set timeout for each job individually and this is the -- You can set timeout for each job individually and this is the
-- preferred method over using defaultVt. -- preferred method over using defaultVt.
, _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout , _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout
...@@ -53,8 +58,12 @@ instance FromValue WorkerSettings where ...@@ -53,8 +58,12 @@ instance FromValue WorkerSettings where
dbConfig <- reqKey "database" dbConfig <- reqKey "database"
_wsDefinitions <- reqKey "definitions" _wsDefinitions <- reqKey "definitions"
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout" _wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
_wsDefaultJobTimeout <- reqKey "default_job_timeout"
_wsLongJobTimeout <- reqKey "long_job_timeout"
defaultDelay <- reqKey "default_delay" defaultDelay <- reqKey "default_delay"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefaultJobTimeout
, _wsLongJobTimeout
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay } , _wsDefaultDelay = B.TimeoutS defaultDelay }
...@@ -63,6 +72,8 @@ instance ToValue WorkerSettings where ...@@ -63,6 +72,8 @@ instance ToValue WorkerSettings where
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
toTable (WorkerSettings { .. }) = toTable (WorkerSettings { .. }) =
table [ "database" .= TOMLConnectInfo _wsDatabase table [ "database" .= TOMLConnectInfo _wsDatabase
, "default_job_timeout" .= _wsDefaultJobTimeout
, "long_job_timeout" .= _wsLongJobTimeout
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay , "default_delay" .= B._TimeoutS _wsDefaultDelay
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions ]
......
...@@ -74,7 +74,7 @@ gServer cfg = do ...@@ -74,7 +74,7 @@ gServer cfg = do
-- C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig{..} = cfg ^. gc_notifications_config nc@NotificationsConfig{..} = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
worker s_dispatcher tChan = do worker s_dispatcher tChan = do
withLogger log_cfg $ \ioLogger -> do withLogger log_cfg $ \ioLogger -> do
...@@ -99,29 +99,24 @@ gServer cfg = do ...@@ -99,29 +99,24 @@ gServer cfg = do
-- process, independent of the server. -- process, independent of the server.
-- send the same message that we received -- send the same message that we received
-- void $ sendNonblocking s_dispatcher r -- void $ sendNonblocking s_dispatcher r
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just (UpdateWorkerProgress _ji _jl) -> do Just (UpdateWorkerProgress _ji _jl) -> do
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just Ping -> do Just Ping -> do
sendTimeout ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r $(logLoc) ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
-- | A static send timeout in microseconds. -- | Sends the given payload ensure the send doesn't take more than the
send_timeout_us :: Int -- 'nc_ce_send_timeout_ms', logging a message if the timeouts kicks in.
send_timeout_us = 50_000 sendTimeout :: Sender a => NotificationsConfig -> Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout (NotificationsConfig { _nc_ce_send_timeout_ms }) ioLogger sock payload = withFrozenCallStack $ do
-- | Sends the given payload ensure the send doesn't take more than the static timeoutKickedIn <- timeout (_nc_ce_send_timeout_ms * 1000) $ send sock $ payload
-- 'send_timeout_ns', logging a message if the timeouts kicks in.
sendTimeout :: Sender a => Logger IO -> Socket a -> ByteString -> IO ()
sendTimeout ioLogger sock payload = withFrozenCallStack $ do
timeoutKickedIn <- timeout send_timeout_us $ send sock $ payload
case timeoutKickedIn of case timeoutKickedIn of
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
Just () -> Just () -> pure ()
$(logLoc) ioLogger DEBUG $ "[central_exchange] message sent."
notify :: HasCallStack => GargConfig -> CEMessage -> IO () notify :: HasCallStack => GargConfig -> CEMessage -> IO ()
notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
...@@ -130,12 +125,11 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do ...@@ -130,12 +125,11 @@ notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect connectEndpoint <- connect s $ T.unpack _nc_central_exchange_connect
let do_work = do let do_work = do
let str = Aeson.encode ceMessage let str = Aeson.encode ceMessage
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending to " <> _nc_central_exchange_connect
$(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str) $(logLoc) ioLogger DEBUG $ "[central_exchange] sending: " <> (TE.decodeUtf8 $ BSL.toStrict str)
-- err <- sendNonblocking s $ BSL.toStrict str -- err <- sendNonblocking s $ BSL.toStrict str
-- putText $ "[notify] err: " <> show err -- putText $ "[notify] err: " <> show err
sendTimeout ioLogger s (BSL.toStrict str) sendTimeout nc ioLogger s (BSL.toStrict str)
do_work `finally` shutdown s connectEndpoint do_work `finally` shutdown s connectEndpoint
where where
NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config nc@NotificationsConfig { _nc_central_exchange_connect } = cfg ^. gc_notifications_config
log_cfg = cfg ^. gc_logging log_cfg = cfg ^. gc_logging
...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson ...@@ -31,16 +31,19 @@ import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.Text qualified as T import Data.Text qualified as T
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.Config
( GargConfig, LogConfig, gc_logging, gc_notifications_config )
import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging
( HasLogger(logMsg), LogLevel(..), withLogger, logLoc )
import Nanomsg (Pull(..), bind, recv, withSocket) import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set qualified as SSet import StmContainers.Set qualified as SSet
import Gargantext.Core.Config import System.Timeout (timeout)
import Gargantext.System.Logging
{- {-
...@@ -84,7 +87,8 @@ dispatcherListener config subscriptions = do ...@@ -84,7 +87,8 @@ dispatcherListener config subscriptions = do
-- NOTE I'm not sure that we need more than 1 worker here, but in -- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
Async.withAsync (throttle 500_000 throttleTChan (sendDataMessageThrottled log_cfg)) $ \_ -> do Async.withAsync (throttle (_nc_dispatcher_throttle_ms * 1000) throttleTChan
(sendDataMessageThrottled nc log_cfg)) $ \_ -> do
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do forever $ do
-- putText "[dispatcher_listener] receiving" -- putText "[dispatcher_listener] receiving"
...@@ -92,7 +96,7 @@ dispatcherListener config subscriptions = do ...@@ -92,7 +96,7 @@ dispatcherListener config subscriptions = do
-- C.putStrLn $ "[dispatcher_listener] " <> r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
NotificationsConfig { _nc_dispatcher_bind } = config ^. gc_notifications_config nc@NotificationsConfig { _nc_dispatcher_bind, _nc_dispatcher_throttle_ms } = config ^. gc_notifications_config
log_cfg = config ^. gc_logging log_cfg = config ^. gc_logging
worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do
tId <- myThreadId tId <- myThreadId
...@@ -164,11 +168,19 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -164,11 +168,19 @@ sendNotification throttleTChan ceMessage sub = do
-- | The "true" message sending to websocket. After it was withheld -- | The "true" message sending to websocket. After it was withheld
-- for a while (for throttling), it is finally sent here -- for a while (for throttling), it is finally sent here
sendDataMessageThrottled :: LogConfig -> (WS.Connection, WS.DataMessage) -> IO () sendDataMessageThrottled :: NotificationsConfig -> LogConfig -> (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled log_cfg (conn, msg) = do sendDataMessageThrottled (NotificationsConfig { _nc_dispatcher_send_timeout_ms }) log_cfg (conn, msg) = do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL -> do
logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg logMsg ioL DEBUG $ "[sendDataMessageThrottled] dispatching notification: " <> show msg
WS.sendDataMessage conn msg -- | We need a timeout here for the following reason:
-- when a message is sent and the user disconnects the WS
-- connection (e.g. refreshes the page), it seems that this message sending hangs.
-- We don't want to block the thread indefinitely.
timeoutKickedIn <- timeout (_nc_dispatcher_send_timeout_ms * 1000) $ WS.sendDataMessage conn msg
case timeoutKickedIn of
Nothing ->
$(logLoc) ioL ERROR $ "[sendMessageThrottled] couldn't send msg in timely fashion."
Just _ -> pure ()
-- | Custom filtering of list of Subscriptions based on -- | Custom filtering of list of Subscriptions based on
......
...@@ -232,7 +232,6 @@ performAction env _state bm = do ...@@ -232,7 +232,6 @@ performAction env _state bm = do
-- | Uses temporary file to add documents into corpus -- | Uses temporary file to add documents into corpus
AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do AddCorpusTempFileAsync { .. } -> runWorkerMonad env $ do
-- TODO CES.filnally
$(logLocM) DEBUG "[performAction] add to corpus with temporary file" $(logLocM) DEBUG "[performAction] add to corpus with temporary file"
CES.finally (addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh) CES.finally (addToCorpusWithTempFile _actf_user _actf_cid _actf_args jh)
(removeLargeObject $ _wtf_file_oid _actf_args) (removeLargeObject $ _wtf_file_oid _actf_args)
......
...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where ...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Types qualified as WT
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
...@@ -44,25 +45,34 @@ sendJobWithCfg gcConfig job = do ...@@ -44,25 +45,34 @@ sendJobWithCfg gcConfig job = do
Just wd -> do Just wd -> do
b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd let queueName = _wdQueue wd
let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay } let job' = (updateJobData ws job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay }
withLogger (gcConfig ^. gc_logging) $ \ioL -> withLogger (gcConfig ^. gc_logging) $ \ioL ->
$(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" $(logLoc) ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job' W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> SendJob -> SendJob updateJobData :: WorkerSettings -> Job -> SendJob -> SendJob
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddCorpusTempFileAsync {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddCorpusWithQuery {}) sj = withLongTimeout ws sj
updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddToAnnuaireWithForm {}) sj = withLongTimeout ws sj
updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = 3000 } updateJobData ws (AddWithFile {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (JSONPost {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 } updateJobData ws (DocumentsFromWriteNodes {}) sj = withLongTimeout ws sj
updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 } updateJobData ws (FrameCalcUpload {}) sj = withLongTimeout ws sj
updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 } updateJobData ws (JSONPost {}) sj = withLongTimeout ws $ sj { W.toStrat = WT.TSDelete
updateJobData (UploadDocument {}) sj = sj { W.timeout = 3000 } , W.resendOnKill = False }
updateJobData (ImportRemoteDocuments {}) sj = sj { W.timeout = 3000 } updateJobData ws (NgramsPostCharts {}) sj = withLongTimeout ws sj
updateJobData (ImportRemoteTerms {}) sj = sj { W.timeout = 3000 } updateJobData ws (RecomputeGraph {}) sj = withLongTimeout ws sj
updateJobData ws (UpdateNode {}) sj = withLongTimeout ws sj
updateJobData ws (UploadDocument {}) sj = withLongTimeout ws sj
updateJobData ws (ImportRemoteDocuments {}) sj = withLongTimeout ws sj
updateJobData ws (ImportRemoteTerms {}) sj = withLongTimeout ws sj
-- | ForgotPasswordAsync, PostNodeAsync -- | ForgotPasswordAsync, PostNodeAsync
updateJobData _ sj = sj { W.resendOnKill = False updateJobData ws _ sj = withDefaultTimeout ws $ sj { W.resendOnKill = False }
, W.timeout = 60 }
withDefaultTimeout :: WorkerSettings -> SendJob -> SendJob
withDefaultTimeout (WorkerSettings { _wsDefaultJobTimeout }) sj = sj { W.timeout = _wsDefaultJobTimeout }
withLongTimeout :: WorkerSettings -> SendJob -> SendJob
withLongTimeout (WorkerSettings { _wsLongJobTimeout }) sj = sj { W.timeout = _wsLongJobTimeout }
...@@ -32,7 +32,7 @@ import Gargantext.Database.GargDB qualified as GargDB ...@@ -32,7 +32,7 @@ import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Database.Prelude import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.Node (getNodeWith) import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node qualified as N (getNode, deleteNode) import Gargantext.Database.Query.Table.Node qualified as N (getNode, deleteNode)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError, errorWith) import Gargantext.Database.Query.Table.Node.Error (HasNodeError, nodeErrorWith)
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node
import Gargantext.Prelude import Gargantext.Prelude
...@@ -51,7 +51,7 @@ deleteNode u nodeId = do ...@@ -51,7 +51,7 @@ deleteNode u nodeId = do
(num, upd_node, cleanup) <- runDBTx $ do (num, upd_node, cleanup) <- runDBTx $ do
node' <- N.getNode nodeId node' <- N.getNode nodeId
(rows, clean_it) <- case view node_typename node' of (rows, clean_it) <- case view node_typename node' of
nt | nt == toDBid NodeUser -> errorWith "[G.D.A.D.deleteNode] Not allowed to delete NodeUser (yet)" nt | nt == toDBid NodeUser -> nodeErrorWith "[G.D.A.D.deleteNode] Not allowed to delete NodeUser (yet)"
nt | nt == toDBid NodeTeam -> do nt | nt == toDBid NodeTeam -> do
uId <- getUserId u uId <- getUserId u
if _node_user_id node' == uId if _node_user_id node' == uId
......
...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -55,6 +55,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
where where
import Conduit import Conduit
import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Exception.Safe (catch, MonadCatch) import Control.Exception.Safe (catch, MonadCatch)
import Data.Conduit qualified as C import Data.Conduit qualified as C
...@@ -173,7 +174,7 @@ flowDataText :: forall env err m. ...@@ -173,7 +174,7 @@ flowDataText :: forall env err m.
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
) )
=> User => User
...@@ -208,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m ...@@ -208,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -228,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m ...@@ -228,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -479,7 +480,7 @@ extractNgramsFromDocument :: ( UniqParameters doc ...@@ -479,7 +480,7 @@ extractNgramsFromDocument :: ( UniqParameters doc
, ExtractNgrams m doc , ExtractNgrams m doc
, IsDBCmd err env m , IsDBCmd err env m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
...@@ -525,7 +526,7 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -525,7 +526,7 @@ extractNgramsFromDocuments :: forall doc env err m.
, ExtractNgrams m doc , ExtractNgrams m doc
, IsDBCmd env err m , IsDBCmd env err m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
......
...@@ -26,7 +26,7 @@ import Gargantext.Database.Admin.Config (hasNodeType, isInNodeTypes) ...@@ -26,7 +26,7 @@ import Gargantext.Database.Admin.Config (hasNodeType, isInNodeTypes)
import Gargantext.Database.Admin.Types.Hyperdata.Any (HyperdataAny(..)) import Gargantext.Database.Admin.Types.Hyperdata.Any (HyperdataAny(..))
import Gargantext.Database.Admin.Types.Node import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getNode, getNodesWith) import Gargantext.Database.Query.Table.Node (getNode, getNodesWith)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError, errorWith) import Gargantext.Database.Query.Table.Node.Error (HasNodeError, nodeErrorWith)
import Gargantext.Database.Query.Table.User import Gargantext.Database.Query.Table.User
import Gargantext.Database.Query.Tree.Root (getRootId) import Gargantext.Database.Query.Tree.Root (getRootId)
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node
...@@ -98,10 +98,10 @@ shareNodeWith (ShareNodeWith_User NodeFolderShared u) n = do ...@@ -98,10 +98,10 @@ shareNodeWith (ShareNodeWith_User NodeFolderShared u) n = do
nodeToCheck <- getNode n nodeToCheck <- getNode n
userIdCheck <- getUserId u userIdCheck <- getUserId u
if not (hasNodeType nodeToCheck NodeTeam) if not (hasNodeType nodeToCheck NodeTeam)
then errorWith "[G.D.A.S.shareNodeWith] Can share node Team only" then nodeErrorWith "[G.D.A.S.shareNodeWith] Can share node Team only"
else else
if (view node_user_id nodeToCheck == userIdCheck) if (view node_user_id nodeToCheck == userIdCheck)
then errorWith "[G.D.A.S.shareNodeWith] Can share to others only" then nodeErrorWith "[G.D.A.S.shareNodeWith] Can share to others only"
else do else do
folderSharedId <- getFolderId u NodeFolderShared folderSharedId <- getFolderId u NodeFolderShared
ret <- shareNode (SourceId folderSharedId) (TargetId n) ret <- shareNode (SourceId folderSharedId) (TargetId n)
...@@ -111,7 +111,7 @@ shareNodeWith (ShareNodeWith_User NodeFolderShared u) n = do ...@@ -111,7 +111,7 @@ shareNodeWith (ShareNodeWith_User NodeFolderShared u) n = do
shareNodeWith (ShareNodeWith_Node NodeFolderPublic nId) n = do shareNodeWith (ShareNodeWith_Node NodeFolderPublic nId) n = do
nodeToCheck <- getNode n nodeToCheck <- getNode n
if not (isInNodeTypes nodeToCheck publicNodeTypes) if not (isInNodeTypes nodeToCheck publicNodeTypes)
then errorWith $ "[G.D.A.S.shareNodeWith] Can share this nodesTypes only: " then nodeErrorWith $ "[G.D.A.S.shareNodeWith] Can share this nodesTypes only: "
<> (show publicNodeTypes) <> (show publicNodeTypes)
else do else do
folderToCheck <- getNode nId folderToCheck <- getNode nId
...@@ -120,9 +120,9 @@ shareNodeWith (ShareNodeWith_Node NodeFolderPublic nId) n = do ...@@ -120,9 +120,9 @@ shareNodeWith (ShareNodeWith_Node NodeFolderPublic nId) n = do
ret <- shareNode (SourceId nId) (TargetId n) ret <- shareNode (SourceId nId) (TargetId n)
let msgs = [CE.UpdateTreeFirstLevel nId, CE.UpdateTreeFirstLevel n] let msgs = [CE.UpdateTreeFirstLevel nId, CE.UpdateTreeFirstLevel n]
pure (ret, msgs) pure (ret, msgs)
else errorWith "[G.D.A.S.shareNodeWith] Can share NodeWith NodeFolderPublic only" else nodeErrorWith "[G.D.A.S.shareNodeWith] Can share NodeWith NodeFolderPublic only"
shareNodeWith _ _ = errorWith "[G.D.A.S.shareNodeWith] Not implemented for this NodeType" shareNodeWith _ _ = nodeErrorWith "[G.D.A.S.shareNodeWith] Not implemented for this NodeType"
------------------------------------------------------------------------ ------------------------------------------------------------------------
getFolderId :: HasNodeError err => User -> NodeType -> DBQuery err x NodeId getFolderId :: HasNodeError err => User -> NodeType -> DBQuery err x NodeId
...@@ -130,7 +130,7 @@ getFolderId u nt = do ...@@ -130,7 +130,7 @@ getFolderId u nt = do
rootId <- getRootId u rootId <- getRootId u
s <- getNodesWith rootId HyperdataAny (Just nt) Nothing Nothing s <- getNodesWith rootId HyperdataAny (Just nt) Nothing Nothing
case head s of case head s of
Nothing -> errorWith "[G.D.A.S.getFolderId] No folder shared found" Nothing -> nodeErrorWith "[G.D.A.S.getFolderId] No folder shared found"
Just f -> pure (_node_id f) Just f -> pure (_node_id f)
------------------------------------------------------------------------ ------------------------------------------------------------------------
......
...@@ -72,7 +72,7 @@ getUsername user@(UserDBId _) = do ...@@ -72,7 +72,7 @@ getUsername user@(UserDBId _) = do
users <- getUsersWithId user users <- getUsersWithId user
case head users of case head users of
Just u -> pure $ userLight_username u Just u -> pure $ userLight_username u
Nothing -> errorWith "G.D.A.U.getUserName: User not found with that id" Nothing -> nodeErrorWith "G.D.A.U.getUserName: User not found with that id"
getUsername (RootId rid) = do getUsername (RootId rid) = do
n <- getNode rid n <- getNode rid
getUsername (UserDBId $ _node_user_id n) getUsername (UserDBId $ _node_user_id n)
......
...@@ -2,15 +2,16 @@ ...@@ -2,15 +2,16 @@
module Gargantext.Database.Class where module Gargantext.Database.Class where
import Control.Exception.Safe (MonadCatch)
import Control.Lens (Getter) import Control.Lens (Getter)
import Control.Monad.Random ( MonadRandom ) import Control.Monad.Random ( MonadRandom )
import Control.Monad.Trans.Control (MonadBaseControl) import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Pool (Pool) import Data.Pool (Pool)
import Database.PostgreSQL.Simple (Connection) import Database.PostgreSQL.Simple (Connection)
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Config (HasConfig(..)) import Gargantext.Core.Config (HasConfig(..))
import Gargantext.Core.Mail.Types (HasMail) import Gargantext.Core.Mail.Types (HasMail)
import Gargantext.Core.NLP (HasNLPServer) import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Prelude import Gargantext.Prelude
-- $typesAndConstraints -- $typesAndConstraints
...@@ -61,6 +62,13 @@ type IsCmd env err m = ...@@ -61,6 +62,13 @@ type IsCmd env err m =
( MonadReader env m ( MonadReader env m
, MonadError err m , MonadError err m
, MonadBaseControl IO m , MonadBaseControl IO m
-- These 3 instances below are needed because in the transactional code
-- we can throw 'err' as an exception, which requires 'err' to be an 'Exception'
-- and thus have a 'Show' and 'Typeable' instances. The fact that we can catch
-- exceptions in the evaluator of the 'DBTx' monad code means we need a 'MonadCatch'.
, Typeable err
, Show err
, MonadCatch m
) )
-- | Only the /minimum/ amount of class constraints required -- | Only the /minimum/ amount of class constraints required
......
...@@ -89,7 +89,8 @@ withConn k = do ...@@ -89,7 +89,8 @@ withConn k = do
pool <- view connPool pool <- view connPool
liftBase $ withResource pool (liftBase . k) liftBase $ withResource pool (liftBase . k)
runCmd :: env runCmd :: (Show err, Typeable err)
=> env
-> CmdRandom env err a -> CmdRandom env err a
-> IO (Either err a) -> IO (Either err a)
runCmd env m = runExceptT $ runReaderT m env runCmd env m = runExceptT $ runReaderT m env
......
...@@ -19,7 +19,7 @@ module Gargantext.Database.Query.Table.Node.Error ( ...@@ -19,7 +19,7 @@ module Gargantext.Database.Query.Table.Node.Error (
, HasNodeError(..) , HasNodeError(..)
-- * Functions -- * Functions
, errorWith , nodeErrorWith
, nodeError , nodeError
, nodeCreationError , nodeCreationError
, nodeLookupError , nodeLookupError
...@@ -28,14 +28,15 @@ module Gargantext.Database.Query.Table.Node.Error ( ...@@ -28,14 +28,15 @@ module Gargantext.Database.Query.Table.Node.Error (
) where ) where
import Control.Lens (Prism', (#), (^?)) import Control.Lens (Prism', (#), (^?))
import Control.Lens qualified as L
import Data.Aeson (object) import Data.Aeson (object)
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.Core.Types.Individu ( Username ) import Gargantext.Core.Types.Individu ( Username )
import Gargantext.Database.Admin.Types.Node (ListId, NodeId(..), ContextId, UserId, ParentId) import Gargantext.Database.Admin.Types.Node (ListId, NodeId(..), ContextId, UserId, ParentId)
import Gargantext.Database.Transactional
import Gargantext.Prelude hiding (sum, head) import Gargantext.Prelude hiding (sum, head)
import Prelude hiding (null, id, map, sum, show) import Prelude hiding (null, id, map, sum, show)
import Prelude qualified import Prelude qualified
import Gargantext.Database.Transactional
data NodeCreationError data NodeCreationError
= UserParentAlreadyExists UserId ParentId = UserParentAlreadyExists UserId ParentId
...@@ -89,6 +90,9 @@ data NodeError = NoListFound ListId ...@@ -89,6 +90,9 @@ data NodeError = NoListFound ListId
| MoveError NodeId NodeId T.Text | MoveError NodeId NodeId T.Text
| NodeNotExportable NodeId T.Text | NodeNotExportable NodeId T.Text
instance HasNodeError NodeError where
_NodeError = L.prism' Prelude.id Just
instance Prelude.Show NodeError instance Prelude.Show NodeError
where where
show (NoListFound {}) = "No list found" show (NoListFound {}) = "No list found"
...@@ -106,6 +110,8 @@ instance Prelude.Show NodeError ...@@ -106,6 +110,8 @@ instance Prelude.Show NodeError
show (MoveError s t reason) = "Moving " <> show s <> " to " <> show t <> " failed: " <> T.unpack reason show (MoveError s t reason) = "Moving " <> show s <> " to " <> show t <> " failed: " <> T.unpack reason
show (NodeNotExportable nid reason) = "Node " <> show nid <> " is not exportable: " <> show reason show (NodeNotExportable nid reason) = "Node " <> show nid <> " is not exportable: " <> show reason
instance Exception NodeError
instance ToJSON NodeError where instance ToJSON NodeError where
toJSON (DoesNotExist n) = toJSON (DoesNotExist n) =
object [ ( "error", "Node does not exist" ) object [ ( "error", "Node does not exist" )
...@@ -135,8 +141,8 @@ instance ToJSON NodeError where ...@@ -135,8 +141,8 @@ instance ToJSON NodeError where
class HasNodeError e where class HasNodeError e where
_NodeError :: Prism' e NodeError _NodeError :: Prism' e NodeError
errorWith :: HasNodeError e => Text -> DBTx e r a nodeErrorWith :: HasNodeError e => Text -> DBTx e r a
errorWith x = nodeError (NodeError $ toException $ userError $ T.unpack x) nodeErrorWith x = nodeError (NodeError $ toException $ userError $ T.unpack x)
nodeError :: HasNodeError e => NodeError -> DBTx e r a nodeError :: HasNodeError e => NodeError -> DBTx e r a
nodeError ne = dbFail $ _NodeError # ne nodeError ne = dbFail $ _NodeError # ne
......
...@@ -37,7 +37,7 @@ getRootId :: (HasNodeError err) => User -> DBQuery err x NodeId ...@@ -37,7 +37,7 @@ getRootId :: (HasNodeError err) => User -> DBQuery err x NodeId
getRootId u = do getRootId u = do
maybeRoot <- head <$> getRoot u maybeRoot <- head <$> getRoot u
case maybeRoot of case maybeRoot of
Nothing -> errorWith "[G.D.Q.T.R.getRootId] No root id" Nothing -> nodeErrorWith "[G.D.Q.T.R.getRootId] No root id"
Just r -> pure (_node_id r) Just r -> pure (_node_id r)
getRoot :: User -> DBQuery err x [Node HyperdataUser] getRoot :: User -> DBQuery err x [Node HyperdataUser]
...@@ -115,7 +115,7 @@ mkCorpus :: (HasNodeError err, MkCorpus a) ...@@ -115,7 +115,7 @@ mkCorpus :: (HasNodeError err, MkCorpus a)
mkCorpus cName c rootId userId = do mkCorpus cName c rootId userId = do
c' <- mk (Just cName) c rootId userId c' <- mk (Just cName) c rootId userId
_tId <- case head c' of _tId <- case head c' of
Nothing -> errorWith "[G.D.Q.T.Root.getOrMk...] mk Corpus failed" Nothing -> nodeErrorWith "[G.D.Q.T.Root.getOrMk...] mk Corpus failed"
Just c'' -> insertDefaultNode NodeTexts c'' userId Just c'' -> insertDefaultNode NodeTexts c'' userId
corpusId <- maybe (nodeError NoCorpusFound) pure (head c') corpusId <- maybe (nodeError NoCorpusFound) pure (head c')
......
...@@ -33,8 +33,10 @@ module Gargantext.Database.Transactional ( ...@@ -33,8 +33,10 @@ module Gargantext.Database.Transactional (
, mkOpaInsert , mkOpaInsert
, mkOpaDelete , mkOpaDelete
-- * Throwing errors (which allows rollbacks) -- * Throwing and catching errors (which allows rollbacks)
, dbFail , dbFail
, catchDBTxError
, handleDBTxError
) where ) where
import Control.Exception.Safe qualified as Safe import Control.Exception.Safe qualified as Safe
...@@ -42,6 +44,7 @@ import Control.Lens ...@@ -42,6 +44,7 @@ import Control.Lens
import Control.Monad.Base import Control.Monad.Base
import Control.Monad.Error.Class import Control.Monad.Error.Class
import Control.Monad.Free import Control.Monad.Free
import Control.Monad.Free.Church
import Control.Monad.Trans.Control (MonadBaseControl, control) import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Int (Int64) import Data.Int (Int64)
import Data.Pool (withResource, Pool) import Data.Pool (withResource, Pool)
...@@ -51,7 +54,12 @@ import Database.PostgreSQL.Simple.Transaction qualified as PG ...@@ -51,7 +54,12 @@ import Database.PostgreSQL.Simple.Transaction qualified as PG
import Gargantext.Database.Class import Gargantext.Database.Class
import Opaleye import Opaleye
import Prelude import Prelude
import Control.Monad.Free.Church
data DBTxException err
= RollbackRequested err
deriving (Show, Eq)
instance (Show err, Safe.Typeable err) => Safe.Exception (DBTxException err) where
data DBOperation = DBRead | DBWrite data DBOperation = DBRead | DBWrite
...@@ -133,7 +141,7 @@ type DBReadOnly err r a = DBTx err DBRead a ...@@ -133,7 +141,7 @@ type DBReadOnly err r a = DBTx err DBRead a
-- Strict constraints to perform transactional read and writes. -- Strict constraints to perform transactional read and writes.
-- Isomorphic to a DBCmd, but it doesn't impose a 'HasConfig' constraint, as -- Isomorphic to a DBCmd, but it doesn't impose a 'HasConfig' constraint, as
-- values can always be passed as parameters of a query or update. -- values can always be passed as parameters of a query or update.
type DBTxCmd err a = forall m env. (IsCmd env err m, HasConnectionPool env) => m a type DBTxCmd err a = forall m env. (IsCmd env err m, HasConnectionPool env, Safe.MonadCatch m) => m a
instance Functor (DBTransactionOp err r) where instance Functor (DBTransactionOp err r) where
fmap f = \case fmap f = \case
...@@ -179,23 +187,37 @@ withReadOnlyTransactionM conn action = ...@@ -179,23 +187,37 @@ withReadOnlyTransactionM conn action =
-- | Run a PostgreSQL transaction, suitable for operations that mixes read and writes, -- | Run a PostgreSQL transaction, suitable for operations that mixes read and writes,
-- and actually the only choice available to run 'DBUpdate' operations. -- and actually the only choice available to run 'DBUpdate' operations.
runDBTx :: DBUpdate err a -> DBTxCmd err a runDBTx :: (Show err, Safe.Typeable err) => DBUpdate err a -> DBTxCmd err a
runDBTx (DBTx m) = do runDBTx (DBTx m) = do
pool <- view connPool pool <- view connPool
withResourceM pool $ \conn -> withTransactionM conn $ foldF (evalOp conn) m withResourceM pool $ \conn ->
(withTransactionM conn $ foldF (evalOp conn) m)
-- IMPORTANT: We are catching the exception (after 'withTransactionM' has run, so rollback already
-- happened) and we are rethrowing this via 'throwError', such that application code can catch this
-- via 'catchDBTxError'.
-- /NOTA BENE/: the parenthesis around 'withTransactionM' ARE NOT OPTIONAL! If we remove them, we
-- would be catching this exception from 'foldF', meaning that we wouldn't let 'withTransactionM'
-- handle it, resulting in ROLLBACK NOT HAPPENING!
`Safe.catches`
[ Safe.Handler $ \(RollbackRequested err) -> throwError err ]
-- | Runs a DB query. -- | Runs a DB query.
-- /NOTE/ the input type is 'DBReadOnly', i.e. a transaction where /all/ -- /NOTE/ the input type is 'DBReadOnly', i.e. a transaction where /all/
-- the operations are 'DBRead'. This makes impossible to sneak in updates -- the operations are 'DBRead'. This makes impossible to sneak in updates
-- into otherwise read-only queries. -- into otherwise read-only queries.
runDBQuery :: DBReadOnly err r a -> DBTxCmd err a runDBQuery :: (Show err, Safe.Typeable err) => DBReadOnly err r a -> DBTxCmd err a
runDBQuery (DBTx m) = do runDBQuery (DBTx m) = do
pool <- view connPool pool <- view connPool
withResourceM pool $ \conn -> withReadOnlyTransactionM conn $ foldF (evalOp conn) m withResourceM pool $ \conn ->
(withReadOnlyTransactionM conn $ foldF (evalOp conn) m)
-- IMPORTANT: Same proviso as for 'runDBTx'. Technically speaking we wouldn't need
-- to throw and catch things for a query, but we are doing so for consistency with 'runDBTx'.
`Safe.catches`
[ Safe.Handler $ \(RollbackRequested err) -> throwError err ]
-- | The main evaluator, turns our pure operations into side-effects that run into the -- | The main evaluator, turns our pure operations into side-effects that run into the
-- 'DBCmd'. -- 'DBCmd'.
evalOp :: PG.Connection -> DBTransactionOp err r a -> DBTxCmd err a evalOp :: (Show err, Safe.Typeable err) => PG.Connection -> DBTransactionOp err r a -> DBTxCmd err a
evalOp conn = \case evalOp conn = \case
PGQuery qr q cc -> cc <$> liftBase (PG.query conn qr q) PGQuery qr q cc -> cc <$> liftBase (PG.query conn qr q)
PGUpdate qr a cc -> cc <$> liftBase (PG.execute conn qr a) PGUpdate qr a cc -> cc <$> liftBase (PG.execute conn qr a)
...@@ -206,7 +228,7 @@ evalOp conn = \case ...@@ -206,7 +228,7 @@ evalOp conn = \case
OpaInsert ins cc -> cc <$> liftBase (runInsert conn ins) OpaInsert ins cc -> cc <$> liftBase (runInsert conn ins)
OpaUpdate upd cc -> cc <$> liftBase (runUpdate conn upd) OpaUpdate upd cc -> cc <$> liftBase (runUpdate conn upd)
OpaDelete del cc -> cc <$> liftBase (runDelete conn del) OpaDelete del cc -> cc <$> liftBase (runDelete conn del)
DBFail err -> throwError err DBFail err -> liftBase (Safe.throwIO $ RollbackRequested err)
evalOpaCountQuery :: PG.Connection -> Select a -> IO Int evalOpaCountQuery :: PG.Connection -> Select a -> IO Int
evalOpaCountQuery conn sel = do evalOpaCountQuery conn sel = do
...@@ -228,6 +250,49 @@ queryOne conn q v = do ...@@ -228,6 +250,49 @@ queryOne conn q v = do
[ ] -> Safe.throwIO $ userError "queryOne: no result returned. Check your SQL!" [ ] -> Safe.throwIO $ userError "queryOne: no result returned. Check your SQL!"
_ -> Safe.throwIO $ userError "queryOne: more than one result returned. Have you used the 'RETURNING' directive?" _ -> Safe.throwIO $ userError "queryOne: more than one result returned. Have you used the 'RETURNING' directive?"
{-
Throwing and catching exceptions in a DBTx monad
================================================
It's /VERY/ important to understand the proper way to throw and catch exceptions in a DBTx monad,
as not doing so might lead to footguns.
We need to remember that when we are composing 'DBTx' operations, we are just writing a DSL which
won't get evaluated until we call either 'runDBTx' or 'runDBQuery', therefore if some parts of
our transaction throw an error, we wouldn't know until there.
There are two types of errors we might have, and it's important to be precise in terminology:
1. IO Exception: these are being thrown by the evaluators for SQL queries, i.e. we might have
IO errors being thrown by wrongly-formatted SQL queries or the Postgres DB dying on us for any reason;
These exceptions get caught by 'withTransactionM' which allows proper rollback behavior, but crucially
these kind of exceptions gets rethrown by 'withTransactionM' and must be caught via the classic
exception handlers in upstream code, but the crucial point is that even if we don't catch them, the
transaction has been rolled back successfully;
2. Domain-specific ERRORS (not exceptions, ERRORS!) being thrown within a transaction itself via things like
'nodeError' and friends. These are errors which can be thrown because our transaction code didn't go as
planned (look for the implementation of 'insertNodeWithHyperdata' for a concrete example). These errors
are translated into the evaluator as proper exception but then caught and rethrown via 'throwError', which
is crucial, because it means that them being thrown as an exception means 'withTransactionM' can rollback
as we expect to, but upstream application code can still handle these errors via 'catchError' and friends.
In order to facilitate the handling of this, we expose the 'catchDBTxError' and 'handleDBTxError', which are
just wrappers over 'catchError' -- this is what users should be using if they want to handle domain-specific errors.
But the crucial bit, and let's state this again, is that rollbacks will happen in both scenario, which is
what we want.
-}
catchDBTxError :: DBTxCmd err a
-> (err -> DBTxCmd err a)
-> DBTxCmd err a
catchDBTxError = catchError
handleDBTxError :: (err -> DBTxCmd err a)
-> DBTxCmd err a
-> DBTxCmd err a
handleDBTxError = flip catchError
-- --
-- Smart constructors -- Smart constructors
-- --
...@@ -235,9 +300,6 @@ queryOne conn q v = do ...@@ -235,9 +300,6 @@ queryOne conn q v = do
-- we are not exposing for information hiding purposes. -- we are not exposing for information hiding purposes.
-- --
dbFail :: err -> DBTx err r b
dbFail = DBTx . liftF . DBFail
mkPGQuery :: (PG.ToRow q, PG.FromRow a) mkPGQuery :: (PG.ToRow q, PG.FromRow a)
=> PG.Query => PG.Query
-> q -> q
...@@ -270,3 +332,6 @@ mkOpaInsert a = DBTx $ liftF (OpaInsert a id) ...@@ -270,3 +332,6 @@ mkOpaInsert a = DBTx $ liftF (OpaInsert a id)
mkOpaDelete :: Delete a -> DBUpdate err a mkOpaDelete :: Delete a -> DBUpdate err a
mkOpaDelete a = DBTx $ liftF (OpaDelete a id) mkOpaDelete a = DBTx $ liftF (OpaDelete a id)
dbFail :: err -> DBTx err r b
dbFail = DBTx . liftF . DBFail
...@@ -174,15 +174,15 @@ ...@@ -174,15 +174,15 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git" git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs: subdirs:
- "gargantext-graph-core" - "gargantext-graph-core"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-pgmq/" - "haskell-bee-pgmq/"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee-tests/" - "haskell-bee-tests/"
- commit: 4a9c709613554eed0189b486de2126c18797088c - commit: 05c39e424d15149dc32097b3318cb6007e0e7052
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- "haskell-bee/" - "haskell-bee/"
...@@ -198,7 +198,7 @@ ...@@ -198,7 +198,7 @@
git: "https://gitlab.iscpif.fr/gargantext/haskell-infomap.git" git: "https://gitlab.iscpif.fr/gargantext/haskell-infomap.git"
subdirs: subdirs:
- . - .
- commit: 1dd92f0aa8e9f8096064e5656c336e562680f4e3 - commit: 9a869df2842eccc86a0f31a69fb8dc5e5ca218a8
git: "https://gitlab.iscpif.fr/gargantext/haskell-pgmq" git: "https://gitlab.iscpif.fr/gargantext/haskell-pgmq"
subdirs: subdirs:
- . - .
......
...@@ -66,11 +66,18 @@ from = "" ...@@ -66,11 +66,18 @@ from = ""
login_type = "Normal" login_type = "Normal"
[notifications]
# We do not hardcode the bind and connect here, because the test infrastructure # We do not hardcode the bind and connect here, because the test infrastructure
# will randomize the connection endpoints via IPC. # will randomize the connection endpoints via IPC.
central-exchange = { bind = "", connect = "" } [notifications.central-exchange]
dispatcher = { bind = "", connect = "" } bind = ""
connect = ""
send_timeout_ms = 200
[notifications.dispatcher]
bind = ""
connect = ""
send_timeout_ms = 500
throttle_ms = 500
[nlp] [nlp]
...@@ -85,6 +92,11 @@ default_visibility_timeout = 1 ...@@ -85,6 +92,11 @@ default_visibility_timeout = 1
# default delay before job is visible to the worker # default delay before job is visible to the worker
default_delay = 1 default_delay = 1
# default timeout (in seconds)
default_job_timeout = 60
# default timeout for "long" jobs (in seconds)
long_job_timeout = 3000
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
......
...@@ -31,7 +31,7 @@ import Gargantext.Database.Prelude ...@@ -31,7 +31,7 @@ import Gargantext.Database.Prelude
import Gargantext.Database.Query.Facet import Gargantext.Database.Query.Facet
import Gargantext.Database.Query.Table.Node import Gargantext.Database.Query.Table.Node
import Gargantext.Database.Query.Table.Node.Error (HasNodeError) import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node.Error (errorWith) import Gargantext.Database.Query.Table.Node.Error (nodeErrorWith)
import Gargantext.Database.Query.Tree.Root import Gargantext.Database.Query.Tree.Root
import Gargantext.Database.Query.Table.NodeContext (selectCountDocs) import Gargantext.Database.Query.Table.NodeContext (selectCountDocs)
import Gargantext.Database.Schema.Node (NodePoly(..)) import Gargantext.Database.Schema.Node (NodePoly(..))
...@@ -119,7 +119,7 @@ getCorporaWithParentIdOrFail parentId = do ...@@ -119,7 +119,7 @@ getCorporaWithParentIdOrFail parentId = do
xs <- getCorporaWithParentId parentId xs <- getCorporaWithParentId parentId
case xs of case xs of
[corpus] -> pure corpus [corpus] -> pure corpus
_ -> errorWith $ "getCorporaWithParentIdOrFail, impossible: " <> T.pack (show xs) _ -> nodeErrorWith $ "getCorporaWithParentIdOrFail, impossible: " <> T.pack (show xs)
addCorpusDocuments :: TestEnv -> IO TestEnv addCorpusDocuments :: TestEnv -> IO TestEnv
addCorpusDocuments env = runTestMonad env $ do addCorpusDocuments env = runTestMonad env $ do
......
...@@ -87,8 +87,11 @@ withTestNotificationConfig cfg action = do ...@@ -87,8 +87,11 @@ withTestNotificationConfig cfg action = do
action $ cfg & gc_notifications_config action $ cfg & gc_notifications_config
.~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp .~ NotificationsConfig { _nc_central_exchange_bind = "ipc://" <> ce_fp
, _nc_central_exchange_connect = "ipc://" <> ce_fp , _nc_central_exchange_connect = "ipc://" <> ce_fp
, _nc_ce_send_timeout_ms = 200
, _nc_dispatcher_bind = "ipc://" <> ds_fp , _nc_dispatcher_bind = "ipc://" <> ds_fp
, _nc_dispatcher_connect = "ipc://" <> ds_fp , _nc_dispatcher_connect = "ipc://" <> ds_fp
, _nc_dispatcher_send_timeout_ms = 500
, _nc_dispatcher_throttle_ms = 500
} }
setup :: IO TestEnv setup :: IO TestEnv
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
{-# LANGUAGE TupleSections #-} {-# LANGUAGE TupleSections #-}
{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-| Tests for the transactional DB API -} {-| Tests for the transactional DB API -}
...@@ -16,6 +17,7 @@ import Control.Exception.Safe ...@@ -16,6 +17,7 @@ import Control.Exception.Safe
import Control.Exception.Safe qualified as Safe import Control.Exception.Safe qualified as Safe
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Control import Control.Monad.Trans.Control
import Data.List.NonEmpty qualified as NE
import Data.Pool import Data.Pool
import Data.Profunctor.Product.TH (makeAdaptorAndInstance) import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
import Data.String import Data.String
...@@ -28,20 +30,24 @@ import Database.PostgreSQL.Simple.Options qualified as Client ...@@ -28,20 +30,24 @@ import Database.PostgreSQL.Simple.Options qualified as Client
import Database.PostgreSQL.Simple.SqlQQ (sql) import Database.PostgreSQL.Simple.SqlQQ (sql)
import Database.PostgreSQL.Simple.ToField import Database.PostgreSQL.Simple.ToField
import Database.Postgres.Temp qualified as Tmp import Database.Postgres.Temp qualified as Tmp
import Gargantext.API.Errors.Types (BackendInternalError) import Gargantext.Core.Types.Individu
import Gargantext.Database.Query.Table.Node.Error (errorWith) import Gargantext.Database.Query.Table.User
import Gargantext.Database.Schema.Prelude (Table (..)) import Gargantext.Database.Schema.Prelude (Table (..))
import Gargantext.Database.Transactional import Gargantext.Database.Transactional
import Gargantext.Prelude import Gargantext.Prelude hiding (throwIO, catch)
import Opaleye (selectTable, requiredTableField, SqlInt4) import Opaleye (selectTable, requiredTableField, SqlInt4)
import Opaleye qualified as O import Opaleye qualified as O
import Prelude qualified import Prelude qualified
import Shelly as SH import Shelly as SH
import System.Random.Stateful import System.Random.Stateful
import Test.API.Setup (setupEnvironment)
import Test.Database.Setup
import Test.Database.Types hiding (Counter) import Test.Database.Types hiding (Counter)
import Test.Hspec import Test.Hspec
import Test.Tasty.HUnit hiding (assert) import Test.Tasty.HUnit hiding (assert)
import Text.RawString.QQ import Text.RawString.QQ
import Gargantext.Database.Action.User
import Gargantext.Database.Query.Table.Node.Error
-- --
-- For these tests we do not want to test the normal GGTX database queries, but rather -- For these tests we do not want to test the normal GGTX database queries, but rather
...@@ -79,9 +85,9 @@ countersTable = ...@@ -79,9 +85,9 @@ countersTable =
) )
newtype TestDBTxMonad a = TestDBTxMonad { _TestDBTxMonad :: TestMonadM DBHandle BackendInternalError a } newtype TestDBTxMonad a = TestDBTxMonad { _TestDBTxMonad :: TestMonadM DBHandle IOException a }
deriving ( Functor, Applicative, Monad deriving ( Functor, Applicative, Monad
, MonadReader DBHandle, MonadError BackendInternalError , MonadReader DBHandle, MonadError IOException
, MonadBase IO , MonadBase IO
, MonadBaseControl IO , MonadBaseControl IO
, MonadFail , MonadFail
...@@ -91,8 +97,12 @@ newtype TestDBTxMonad a = TestDBTxMonad { _TestDBTxMonad :: TestMonadM DBHandle ...@@ -91,8 +97,12 @@ newtype TestDBTxMonad a = TestDBTxMonad { _TestDBTxMonad :: TestMonadM DBHandle
, MonadThrow , MonadThrow
) )
runTestDBTxMonad :: DBHandle -> TestMonadM DBHandle BackendInternalError a -> IO a runTestDBTxMonad :: DBHandle -> TestMonadM DBHandle IOException a -> IO a
runTestDBTxMonad env = flip runReaderT env . _TestMonad runTestDBTxMonad env m = do
res <- flip runReaderT env . runExceptT . _TestMonad $ m
case res of
Left err -> throwIO $ Prelude.userError ("runTestDBTxMonad: " <> displayException err)
Right x -> pure x
setup :: IO DBHandle setup :: IO DBHandle
setup = do setup = do
...@@ -163,23 +173,23 @@ teardown test_db = do ...@@ -163,23 +173,23 @@ teardown test_db = do
instance PG.FromRow Counter where instance PG.FromRow Counter where
fromRow = Counter <$> field <*> field fromRow = Counter <$> field <*> field
getCounterById :: CounterId -> DBQuery BackendInternalError r Counter getCounterById :: CounterId -> DBQuery IOException r Counter
getCounterById (CounterId cid) = do getCounterById (CounterId cid) = do
xs <- mkPGQuery [sql| SELECT * FROM public.ggtx_test_counter_table WHERE id = ?; |] (PG.Only cid) xs <- mkPGQuery [sql| SELECT * FROM public.ggtx_test_counter_table WHERE id = ?; |] (PG.Only cid)
case xs of case xs of
[c] -> pure c [c] -> pure c
rst -> errorWith $ "getCounterId returned more than one result: " <> T.pack (show rst) rst -> dbFail $ Prelude.userError $ "getCounterId returned more than one result: " <> show rst
insertCounter :: DBUpdate BackendInternalError Counter insertCounter :: DBUpdate IOException Counter
insertCounter = do insertCounter = do
mkPGUpdateReturningOne [sql| INSERT INTO public.ggtx_test_counter_table(counter_value) VALUES(0) RETURNING id, counter_value|] () mkPGUpdateReturningOne [sql| INSERT INTO public.ggtx_test_counter_table(counter_value) VALUES(0) RETURNING id, counter_value|] ()
updateCounter :: CounterId -> Int -> DBUpdate BackendInternalError Counter updateCounter :: CounterId -> Int -> DBUpdate IOException Counter
updateCounter cid x = do updateCounter cid x = do
mkPGUpdateReturningOne [sql| UPDATE public.ggtx_test_counter_table SET counter_value = ? WHERE id = ? RETURNING *|] (x, cid) mkPGUpdateReturningOne [sql| UPDATE public.ggtx_test_counter_table SET counter_value = ? WHERE id = ? RETURNING *|] (x, cid)
-- | We deliberately write this as a composite operation. -- | We deliberately write this as a composite operation.
stepCounter :: CounterId -> DBUpdate BackendInternalError Counter stepCounter :: CounterId -> DBUpdate IOException Counter
stepCounter cid = do stepCounter cid = do
Counter{..} <- getCounterById cid Counter{..} <- getCounterById cid
mkPGUpdateReturningOne [sql| UPDATE public.ggtx_test_counter_table SET counter_value = ? WHERE id = ? RETURNING *|] (counterValue + 1, cid) mkPGUpdateReturningOne [sql| UPDATE public.ggtx_test_counter_table SET counter_value = ? WHERE id = ? RETURNING *|] (counterValue + 1, cid)
...@@ -189,8 +199,15 @@ stepCounter cid = do ...@@ -189,8 +199,15 @@ stepCounter cid = do
-- --
tests :: Spec tests :: Spec
tests = parallel $ around withTestCounterDB $ tests = describe "Database Transactions" $ do
describe "Database Transactions" $ do counterDBTests
ggtxDBTests
-- | Testing the transactional behaviour outside the classic GGTX operations.
-- We test that throwing exceptions in IO leads to rollbacks.
counterDBTests :: Spec
counterDBTests = parallel $ around withTestCounterDB $
describe "Counter Transactions" $ do
describe "Opaleye count queries" $ do describe "Opaleye count queries" $ do
it "Supports counting rows" opaCountQueries it "Supports counting rows" opaCountQueries
describe "Pure PG Queries" $ do describe "Pure PG Queries" $ do
...@@ -206,6 +223,14 @@ tests = parallel $ around withTestCounterDB $ ...@@ -206,6 +223,14 @@ tests = parallel $ around withTestCounterDB $
describe "Read/Write Consistency" $ do describe "Read/Write Consistency" $ do
it "should return a consistent state to different actors" testConsistency it "should return a consistent state to different actors" testConsistency
-- | Testing the transactional behaviour inside the classic GGTX operations.
-- We test that throwing something like a 'NodeError' results in a proper rollback.
ggtxDBTests :: Spec
ggtxDBTests = parallel $ around withTestDB $ beforeWith (\ctx -> setupEnvironment ctx >>= (const $ pure ctx)) $
describe "GGTX Transactions" $ do
describe "Rollback support" $ do
it "can rollback if a ggtx error gets thrown" testGGTXErrorRollback
simplePGQueryWorks :: DBHandle -> Assertion simplePGQueryWorks :: DBHandle -> Assertion
simplePGQueryWorks env = runTestDBTxMonad env $ do simplePGQueryWorks env = runTestDBTxMonad env $ do
x <- runDBQuery $ getCounterById (CounterId 1) x <- runDBQuery $ getCounterById (CounterId 1)
...@@ -239,9 +264,9 @@ testRollback env = runTestDBTxMonad env $ do ...@@ -239,9 +264,9 @@ testRollback env = runTestDBTxMonad env $ do
liftIO $ counterValue initialCounter `shouldBe` 1 liftIO $ counterValue initialCounter `shouldBe` 1
-- Let's do another transaction where at the very last instruction we -- Let's do another transaction where at the very last instruction we
-- fail. -- fail.
Safe.handle (\(_ :: SomeException) -> pure ()) $ runDBTx $ do handleDBTxError (\(_ :: IOException) -> pure ()) $ runDBTx $ do
_x' <- stepCounter (counterId initialCounter) _x' <- stepCounter (counterId initialCounter)
errorWith "urgh" dbFail $ Prelude.userError "urgh"
-- Let's check that the second 'stepCounter' didn't actually modified the counter's value. -- Let's check that the second 'stepCounter' didn't actually modified the counter's value.
finalCounter <- runDBTx $ getCounterById (counterId initialCounter) finalCounter <- runDBTx $ getCounterById (counterId initialCounter)
...@@ -277,3 +302,26 @@ opaCountQueries env = runTestDBTxMonad env $ do ...@@ -277,3 +302,26 @@ opaCountQueries env = runTestDBTxMonad env $ do
_ <- insertCounter _ <- insertCounter
mkOpaCountQuery (selectTable countersTable) mkOpaCountQuery (selectTable countersTable)
liftIO $ num @?= 3 liftIO $ num @?= 3
-- | In this simple test we create a user node in GGTX, we try
-- to update it, and check that if we throw an error in the update
-- transaction, the changes are not propagated
testGGTXErrorRollback :: TestEnv -> Assertion
testGGTXErrorRollback env = runTestMonadM @NodeError env $ do
let ur = NewUser "alfredo" "alfredo@foo.com" (GargPassword "mypass")
let newUsers = ur NE.:| []
hashed <- liftIO $ mapM toUserHash newUsers
void $ runDBTx $ insertNewUsers hashed
-- Retrieve the user, check the details
insertedUr <- runDBQuery $ getUserLightDB (UserName "alfredo")
liftIO $ userLight_username insertedUr `shouldBe` "alfredo"
-- CRUCIAL bit: try to update the email, throw an exception in the same tx block
void $ (runDBTx $ do
void $ updateUserEmail (insertedUr { userLight_email = "alfredo@bar.com" })
nodeError $ NoRootFound -- it doesn't matter which exception
) `catchDBTxError` \(_e :: NodeError) -> pure () -- swallow it.
-- let's check that the email hasn't been changed.
insertedUr' <- runDBQuery $ getUserLightDB (UserName "alfredo")
liftIO $ userLight_email insertedUr' `shouldBe` "alfredo@foo.com"
...@@ -17,7 +17,6 @@ module Test.Database.Types where ...@@ -17,7 +17,6 @@ module Test.Database.Types where
import Control.Exception.Safe import Control.Exception.Safe
import Control.Lens import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader import Control.Monad.Reader
import Control.Monad.Trans.Control import Control.Monad.Trans.Control
import Data.IORef import Data.IORef
...@@ -25,8 +24,7 @@ import Data.Map qualified as Map ...@@ -25,8 +24,7 @@ import Data.Map qualified as Map
import Data.Pool import Data.Pool
import Database.PostgreSQL.Simple qualified as PG import Database.PostgreSQL.Simple qualified as PG
import Database.Postgres.Temp qualified as Tmp import Database.Postgres.Temp qualified as Tmp
import GHC.IO.Exception (userError) import Gargantext hiding (throwIO, to)
import Gargantext hiding (to)
import Gargantext.API.Admin.Orchestrator.Types import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Prelude import Gargantext.API.Prelude
...@@ -41,6 +39,7 @@ import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) ...@@ -41,6 +39,7 @@ import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Network.URI (parseURI) import Network.URI (parseURI)
import Prelude qualified import Prelude qualified
import System.Log.FastLogger qualified as FL import System.Log.FastLogger qualified as FL
import System.IO.Error (userError)
newtype Counter = Counter { _Counter :: IORef Int } newtype Counter = Counter { _Counter :: IORef Int }
...@@ -64,10 +63,11 @@ data TestEnv = TestEnv { ...@@ -64,10 +63,11 @@ data TestEnv = TestEnv {
, test_worker_tid :: !ThreadId , test_worker_tid :: !ThreadId
} }
newtype TestMonadM env err a = TestMonad { _TestMonad :: ReaderT env IO a } newtype TestMonadM env err a = TestMonad { _TestMonad :: ExceptT err (ReaderT env IO) a }
deriving ( Functor, Applicative, Monad deriving ( Functor, Applicative, Monad
, MonadReader env , MonadReader env
, MonadBase IO , MonadBase IO
, MonadError err
, MonadBaseControl IO , MonadBaseControl IO
, MonadFail , MonadFail
, MonadIO , MonadIO
...@@ -76,10 +76,10 @@ newtype TestMonadM env err a = TestMonad { _TestMonad :: ReaderT env IO a } ...@@ -76,10 +76,10 @@ newtype TestMonadM env err a = TestMonad { _TestMonad :: ReaderT env IO a }
, MonadThrow , MonadThrow
) )
instance HasLogger (TestMonadM TestEnv BackendInternalError) where instance HasLogger (TestMonadM TestEnv err) where
data instance Logger (TestMonadM TestEnv BackendInternalError) = TestLogger { _IOLogger :: IOStdLogger } data instance Logger (TestMonadM TestEnv err) = TestLogger { _IOLogger :: IOStdLogger }
type instance LogInitParams (TestMonadM TestEnv BackendInternalError) = LogConfig type instance LogInitParams (TestMonadM TestEnv err) = LogConfig
type instance LogPayload (TestMonadM TestEnv BackendInternalError) = Prelude.String type instance LogPayload (TestMonadM TestEnv err) = Prelude.String
initLogger cfg = fmap TestLogger $ (liftIO $ ioStdLogger cfg) initLogger cfg = fmap TestLogger $ (liftIO $ ioStdLogger cfg)
destroyLogger = liftIO . _iosl_destroy . _IOLogger destroyLogger = liftIO . _iosl_destroy . _IOLogger
logMsg (TestLogger ioLogger) lvl msg = liftIO $ _iosl_log_msg ioLogger lvl msg logMsg (TestLogger ioLogger) lvl msg = liftIO $ _iosl_log_msg ioLogger lvl msg
...@@ -89,18 +89,19 @@ instance MonadLogger (TestMonadM TestEnv BackendInternalError) where ...@@ -89,18 +89,19 @@ instance MonadLogger (TestMonadM TestEnv BackendInternalError) where
getLogger = TestMonad $ do getLogger = TestMonad $ do
initLogger @(TestMonadM TestEnv BackendInternalError) (LogConfig Nothing ERROR) initLogger @(TestMonadM TestEnv BackendInternalError) (LogConfig Nothing ERROR)
runTestMonadM :: env -> TestMonadM env err a -> IO a runTestMonadM :: Show err => env -> TestMonadM env err a -> IO a
runTestMonadM env = flip runReaderT env . _TestMonad runTestMonadM env m = do
res <- flip runReaderT env . runExceptT . _TestMonad $ m
case res of
Left err -> throwIO $ userError (show err)
Right x -> pure x
runTestMonad :: TestEnv -> TestMonadM TestEnv BackendInternalError a -> IO a runTestMonad :: TestEnv -> TestMonadM TestEnv BackendInternalError a -> IO a
runTestMonad env = flip runReaderT env . _TestMonad runTestMonad env m = do
res <- flip runReaderT env . runExceptT . _TestMonad $ m
-- | Shoehorn a BackendInternalError into an IOException, suitable case res of
-- for testing. Left err -> throwIO $ userError ("runTestMonad: " <> show err)
instance MonadError BackendInternalError (TestMonadM env BackendInternalError) where Right x -> pure x
throwError e = TestMonad $ throwError (userError $ show e)
catchError (TestMonad m) hdl =
TestMonad $ ReaderT $ \e -> catchError (flip runReaderT e m) (\e' -> runTestMonadM e $ hdl (InternalWorkerError e'))
type TestMonad = TestMonadM TestEnv BackendInternalError type TestMonad = TestMonadM TestEnv BackendInternalError
data TestJobHandle = TestNoJobHandle data TestJobHandle = TestNoJobHandle
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment