[worker] test fixes

Start worker in tests
parent 7e3fe8f6
Pipeline #6855 failed with stages
in 111 minutes and 29 seconds
......@@ -861,6 +861,7 @@ test-suite garg-test-hspec
import:
defaults
, testDependencies
build-depends: haskell-bee
type: exitcode-stdio-1.0
main-is: drivers/hspec/Main.hs
other-modules:
......@@ -885,6 +886,7 @@ test-suite garg-test-hspec
Test.Server.ReverseProxy
Test.Types
Test.Utils
Test.Utils.Db
hs-source-dirs:
test
ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N
......@@ -903,3 +905,4 @@ benchmark garg-bench
ghc-options: "-with-rtsopts=-T -A32m"
if impl(ghc >= 8.6)
ghc-options: "-with-rtsopts=--nonmoving-gc"
......@@ -18,6 +18,8 @@ module Gargantext.API.Admin.EnvTypes (
, env_central_exchange
, env_dispatcher
, env_jwt_settings
, env_pool
, env_nodeStory
, menv_firewall
, dev_env_logger
......
{-# LANGUAGE TemplateHaskell #-}
{-|
Module : Gargantext.Core.Config.Worker
Description : Worker TOML file config
......@@ -84,3 +86,4 @@ findDefinitionByName (WorkerSettings { _wsDefinitions }) workerName =
-- wdToRedisBrokerInitParams wd = BRedis.RedisBrokerInitParams <$> (wdToRedisConnectInfo wd)
makeLenses 'WorkerSettings
......@@ -63,7 +63,7 @@ gServer (NotificationsConfig { .. }) = do
forever $ do
-- putText "[central_exchange] receiving"
r <- recv s
logMsg ioLogger INFO $ "[central_exchange] received: " <> show r
logMsg ioLogger DEBUG $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
where
......@@ -78,7 +78,7 @@ gServer (NotificationsConfig { .. }) = do
-- void $ sendNonblocking s_dispatcher r
void $ timeout 100_000 $ send s_dispatcher r
Just (UpdateTreeFirstLevel node_id) -> do
logMsg ioLogger INFO $ "[central_exchange] update tree: " <> show node_id
logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
......
......@@ -38,6 +38,24 @@ import Gargantext.Utils.Jobs.Monad ( MonadJobStatus(noJobHandle) )
initWorkerState :: (HasWorkerBroker PGMQBroker Job)
=> WorkerEnv
-> WorkerDefinition
-> IO (Worker.State PGMQBroker Job)
initWorkerState env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
pure $ Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
-- | Spawn a worker with PGMQ broker
-- TODO:
-- - reduce size of DB pool
......@@ -49,18 +67,8 @@ withPGMQWorker :: (HasWorkerBroker PGMQBroker Job)
-> WorkerDefinition
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorker env (WorkerDefinition { .. }) cb = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withPGMQWorker env wd cb = do
state' <- initWorkerState env wd
withAsync (Worker.run state') (\a -> cb a state')
......@@ -69,18 +77,8 @@ withPGMQWorkerSingle :: (HasWorkerBroker PGMQBroker Job)
-> WorkerDefinition
-> (Async () -> Worker.State PGMQBroker Job -> IO ())
-> IO ()
withPGMQWorkerSingle env (WorkerDefinition { .. }) cb = do
let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig
let state' = Worker.State { broker
, queueName = _wdQueue
, name = T.unpack _wdName
, performAction = performAction env
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing }
withPGMQWorkerSingle env wd cb = do
state' <- initWorkerState env wd
withAsync (Worker.runSingle state') (\a -> cb a state')
......
{-# LANGUAGE TupleSections #-}
{-|
Module : Gargantext.Core.Worker.Broker
Description : Broker utilities
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.Core.Worker.Broker
( initBrokerWithDBCreate )
......@@ -13,8 +22,8 @@ import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.Core.Config (GargConfig(..), gc_worker)
import Gargantext.Core.Config.Worker (WorkerSettings(..))
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Database.Prelude (createDBIfNotExists)
import Gargantext.Prelude
import Shelly qualified as SH
......@@ -27,11 +36,7 @@ initBrokerWithDBCreate gc@(GargConfig { _gc_database_config }) = do
-- By using gargantext db credentials, we create pgmq db (if needed)
let WorkerSettings { .. } = gc ^. gc_worker
let psqlDB = TE.decodeUtf8 $ PSQL.postgreSQLConnectionString _gc_database_config
-- For the \gexec trick, see:
-- https://stackoverflow.com/questions/18389124/simulate-create-database-if-not-exists-for-postgresql
(_res, _ec) <- SH.shelly $ SH.silently $ SH.escaping False $ do
let sql = "\"SELECT 'CREATE DATABASE " <> (T.pack $ PSQL.connectDatabase _wsDatabase) <> "' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '" <> (T.pack $ PSQL.connectDatabase _wsDatabase) <> "')\\gexec\""
result <- SH.run "echo" [sql, "|", "psql", "-d", "\"" <> psqlDB <> "\""]
(result,) <$> SH.lastExitCode
createDBIfNotExists psqlDB (T.pack $ PSQL.connectDatabase _wsDatabase)
initBroker $ PGMQBrokerInitParams _wsDatabase _wsDefaultVisibilityTimeout
......@@ -48,12 +48,12 @@ import System.Log.FastLogger qualified as FL
data WorkerEnv = WorkerEnv
{ _w_env_config :: !GargConfig
, _w_env_logger :: !(Logger (GargM WorkerEnv IOException))
, _w_env_pool :: !(Pool Connection)
, _w_env_nodeStory :: !NodeStoryEnv
, _w_env_mail :: !Mail.MailConfig
, _w_env_nlp :: !NLPServerMap
{ _w_env_config :: ~GargConfig
, _w_env_logger :: ~(Logger (GargM WorkerEnv IOException))
, _w_env_pool :: ~(Pool Connection)
, _w_env_nodeStory :: ~NodeStoryEnv
, _w_env_mail :: ~Mail.MailConfig
, _w_env_nlp :: ~NLPServerMap
}
......
......@@ -9,8 +9,10 @@ Portability : POSIX
-}
{-# LANGUAGE ConstraintKinds, ScopedTypeVariables #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Gargantext.Database.Prelude where
......@@ -37,6 +39,7 @@ import Opaleye (Unpackspec, showSql, FromFields, Select, runSelect, SqlJsonb, De
import Opaleye.Aggregate (countRows)
import Opaleye.Internal.Constant qualified
import Opaleye.Internal.Operators qualified
import Shelly qualified as SH
-------------------------------------------------------
class HasConnectionPool env where
......@@ -207,3 +210,17 @@ restrictMaybe :: ( Default Opaleye.Internal.Operators.IfPP b b
restrictMaybe v cond = matchMaybe v $ \case
Nothing -> toFields True
Just v' -> cond v'
-- | Creates a PostgreSQL DB if it doesn't exist.
-- Accepts a pg connection string and db name as argument.
createDBIfNotExists :: Text -> Text -> IO ()
createDBIfNotExists connStr dbName = do
-- For the \gexec trick, see:
-- https://stackoverflow.com/questions/18389124/simulate-create-database-if-not-exists-for-postgresql
(_res, _ec) <- SH.shelly $ SH.silently $ SH.escaping False $ do
let sql = "\"SELECT 'CREATE DATABASE " <> dbName <> "' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '" <> dbName <> "')\\gexec\""
result <- SH.run "echo" [sql, "|", "psql", "-d", "\"" <> connStr <> "\""]
(result,) <$> SH.lastExitCode
return ()
......@@ -46,6 +46,7 @@ max_docs_scrapers = 10000
js_job_timeout = 1800
js_id_timeout = 1800
# NOTE This is overridden by Test.Database.Setup
[database]
host = "127.0.0.1"
port = 5432
......@@ -77,6 +78,7 @@ All = "corenlp://localhost:9000"
default_visibility_timeout = 1
# NOTE This is overridden by Test.Database.Setup
[worker.database]
host = "127.0.0.1"
port = 5432
......
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
module Test.API.Private.Table (
......
......@@ -18,15 +18,12 @@ import Data.ByteString.Lazy.Char8 qualified as C8L
import Data.Cache qualified as InMemory
import Data.Streaming.Network (bindPortTCP)
import Gargantext.API (makeApp)
import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..))
import Gargantext.API.Admin.Settings
import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..), env_dispatcher)
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Config (_gc_secrets, gc_frontend_config, gc_jobs, hasConfig)
import Gargantext.Core.Config.Types (SettingsFile(..), jc_js_job_timeout, jc_js_id_timeout, fc_appPort, jwtSettings)
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.NodeStory
import Gargantext.Core.Types.Individu
import Gargantext.Database.Action.Flow
import Gargantext.Database.Action.User.New
......@@ -53,7 +50,7 @@ import Prelude
import Servant.Auth.Client ()
import Servant.Client
import Servant.Job.Async qualified as ServantAsync
import Test.Database.Setup (withTestDB, fakeTomlPath, testEnvToPgConnectionInfo)
import Test.Database.Setup (withTestDB, fakeTomlPath)
import Test.Database.Types
import UnliftIO qualified
......@@ -75,23 +72,23 @@ instance Functor SpecContext where
newTestEnv :: TestEnv -> Logger (GargM Env BackendInternalError) -> Warp.Port -> IO Env
newTestEnv testEnv logger port = do
tomlFile@(SettingsFile sf) <- fakeTomlPath
SettingsFile sf <- fakeTomlPath
!manager_env <- newTlsManager
!config_env <- readConfig tomlFile <&> (gc_frontend_config . fc_appPort) .~ port
let config_env = test_config testEnv & (gc_frontend_config . fc_appPort) .~ port
prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs")
let prios' = Jobs.applyPrios prios Jobs.defaultPrios
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
dbParam <- pure $ testEnvToPgConnectionInfo testEnv
!pool <- newPool dbParam
-- dbParam <- pure $ testEnvToPgConnectionInfo testEnv
-- !pool <- newPool dbParam
!nodeStory_env <- fromDBNodeStoryEnv pool
-- !nodeStory_env <- fromDBNodeStoryEnv pool
!scrapers_env <- ServantAsync.newJobEnv ServantAsync.defaultSettings manager_env
secret <- Jobs.genSecret
let jobs_settings = (Jobs.defaultJobSettings 1 secret)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
& Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
& Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!jobs_env <- Jobs.newJobEnv jobs_settings prios' manager_env
!_env_jwt_settings <- jwtSettings (_gc_secrets config_env)
......@@ -100,8 +97,12 @@ newTestEnv testEnv logger port = do
pure $ Env
{ _env_logger = logger
, _env_pool = pool
, _env_nodeStory = nodeStory_env
-- , _env_pool = pool
-- , _env_pool = Prelude.error "[Test.API.Setup.Env] pool not needed, but forced somewhere"
, _env_pool = _DBHandle $ test_db testEnv
-- , _env_nodeStory = nodeStory_env
-- , _env_nodeStory = Prelude.error "[Test.API.Setup.Env] env nodeStory not needed, but forced somewhere"
, _env_nodeStory = test_nodeStory testEnv
, _env_manager = manager_env
, _env_scrapers = scrapers_env
, _env_jobs = jobs_env
......@@ -150,11 +151,13 @@ withTestDBAndPort action =
withTestDBAndNotifications :: D.Dispatcher -> (((TestEnv, Warp.Port), Application) -> IO ()) -> IO ()
withTestDBAndNotifications dispatcher action = do
withTestDB $ \testEnv -> do
app <- withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
makeApp $ env { _env_dispatcher = dispatcher }
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app)
withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
<&> env_dispatcher .~ dispatcher
app <- makeApp env
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app)
-- | Starts the backend server /and/ the microservices proxy, the former at
-- a random port, the latter at a predictable port.
......
......@@ -22,6 +22,7 @@ import Data.Aeson.Types
import Gargantext.Core
import Gargantext.Core.NLP (nlpServerGet)
import Gargantext.Core.Types.Individu
import Gargantext.Core.Worker.Env () -- instance HasNodeError
import Gargantext.Database.Action.Flow
import Gargantext.Database.Action.Search
import Gargantext.Database.Admin.Types.Hyperdata.Document
......
......@@ -12,7 +12,7 @@ Portability : POSIX
module Test.Database.Operations.NodeStory where
import Control.Lens ((^.), (.~), (?~), _2)
import Control.Lens ((?~), _2)
import Control.Monad.Reader
import Data.Map.Strict qualified as Map
import Data.Map.Strict.Patch qualified as PM
......@@ -26,6 +26,7 @@ import Gargantext.Core.NodeStory
import Gargantext.Core.Text.Ngrams (NgramsType(..))
import Gargantext.Core.Types.Individu
import Gargantext.Core.Types (ListType(..), ListId, NodeId, UserId)
import Gargantext.Core.Worker.Env () -- instance HasNodeError
import Gargantext.Database.Action.User (getUserId)
import Gargantext.Database.Admin.Config (userMaster)
import Gargantext.Database.Prelude (runPGSQuery)
......
{-# LANGUAGE TupleSections #-}
module Test.Database.Setup (
withTestDB
, fakeTomlPath
, testEnvToPgConnectionInfo
) where
import Async.Worker qualified as Worker
import Data.Maybe (fromJust)
import Data.Pool hiding (withResource)
import Data.Pool qualified as Pool
import Data.String (fromString)
......@@ -15,18 +18,22 @@ import Database.PostgreSQL.Simple.Options qualified as Client
import Database.PostgreSQL.Simple.Options qualified as Opts
import Database.Postgres.Temp qualified as Tmp
import Gargantext.API.Admin.EnvTypes (Mode(Mock))
import Gargantext.API.Admin.Settings
import Gargantext.Core.Config
import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.Config.Worker (wsDatabase, wsDefinitions)
import Gargantext.Core.NodeStory (fromDBNodeStoryEnv)
import Gargantext.Core.Worker (initWorkerState)
import Gargantext.Core.Worker.Env (WorkerEnv(..))
import Gargantext.Prelude
import Gargantext.Core.Config
import Gargantext.System.Logging (withLoggerHoisted)
import Paths_gargantext
import Prelude qualified
import Shelly hiding (FilePath, run)
import Shelly qualified as SH
import Test.Database.Types
import Test.Utils.Db (tmpDBToConnInfo)
-- | Test DB settings.
dbUser, dbPassword, dbName :: Prelude.String
......@@ -41,7 +48,8 @@ gargDBSchema :: IO FilePath
gargDBSchema = getDataFileName "devops/postgres/schema.sql"
teardown :: TestEnv -> IO ()
teardown TestEnv{..} = do
teardown TestEnv{ .. } = do
killThread test_worker_tid
destroyAllResources $ _DBHandle test_db
Tmp.stop $ _DBTmp test_db
......@@ -70,19 +78,48 @@ setup = do
case res of
Left err -> Prelude.fail $ show err
Right db -> do
let connInfo = tmpDBToConnInfo db
gargConfig <- fakeTomlPath >>= readConfig
pool <- createPool (PG.connectPostgreSQL (Tmp.toConnectionString db))
(PG.close) 2 60 2
-- fix db since we're using tmp-postgres
<&> (gc_database_config .~ connInfo)
<&> (gc_worker . wsDatabase .~ (connInfo { PG.connectDatabase = "pgmq_test" }))
let idleTime = 60.0
let maxResources = 2
let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close
idleTime
maxResources
pool <- newPool (setNumStripes (Just 2) poolConfig)
bootstrapDB db pool gargConfig
ugen <- emptyCounter
test_nodeStory <- fromDBNodeStoryEnv pool
withLoggerHoisted Mock $ \logger -> do
pure $ TestEnv { test_db = DBHandle pool db
, test_config = gargConfig
, test_nodeStory
, test_usernameGen = ugen
, test_logger = logger
}
let idleTime = 60.0
let maxResources = 2
let wPoolConfig = defaultPoolConfig (PG.connect $ gargConfig ^. gc_worker . wsDatabase)
PG.close
idleTime
maxResources
wPool <- newPool (setNumStripes (Just 2) wPoolConfig)
withLoggerHoisted Mock $ \wioLogger -> do
let wEnv = WorkerEnv { _w_env_config = gargConfig
, _w_env_logger = wioLogger
, _w_env_pool = wPool
, _w_env_nodeStory = test_nodeStory
, _w_env_mail = Prelude.error "[wEnv] w_env_mail requested but not available"
, _w_env_nlp = Prelude.error "[wEnv] w_env_nlp requested but not available" }
wState <- initWorkerState wEnv (fromJust $ head $ gargConfig ^. gc_worker . wsDefinitions)
test_worker_tid <- forkIO (Worker.run wState)
pure $ TestEnv { test_db = DBHandle pool db
, test_config = gargConfig
, test_nodeStory
, test_usernameGen = ugen
, test_logger = logger
, test_worker_tid
}
withTestDB :: (TestEnv -> IO ()) -> IO ()
withTestDB = bracket setup teardown
......
......@@ -30,20 +30,19 @@ import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.Config (HasConfig(..))
import Gargantext.Core.Config
import Gargantext.Core.Config.Mail (MailConfig(..), LoginType(NoAuth), SendEmailType(LogEmailToConsole))
import Gargantext.Core.Mail.Types (HasMail(..))
import Gargantext.Core.NLP (HasNLPServer(..))
import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Database.Query.Table.Node.Error
import Gargantext.Core.Config
import Gargantext.Core.Config.Mail (MailConfig(..), LoginType(NoAuth), SendEmailType(LogEmailToConsole))
import Gargantext.System.Logging (HasLogger(..), Logger, MonadLogger(..))
import Gargantext.Utils.Jobs
import Network.URI (parseURI)
import Prelude qualified
import System.Log.FastLogger qualified as FL
newtype Counter = Counter { _Counter :: IORef Int }
deriving Eq
......@@ -62,6 +61,7 @@ data TestEnv = TestEnv {
, test_nodeStory :: !NodeStoryEnv
, test_usernameGen :: !Counter
, test_logger :: !(Logger (GargM TestEnv BackendInternalError))
, test_worker_tid :: !ThreadId
}
newtype TestMonad a = TestMonad { runTestMonad :: ReaderT TestEnv IO a }
......@@ -96,9 +96,6 @@ data DBHandle = DBHandle {
, _DBTmp :: Tmp.DB
}
instance HasNodeError IOException where
_NodeError = prism' (Prelude.userError . show) (const Nothing)
instance HasConnectionPool TestEnv where
connPool = to (_DBHandle . test_db)
......
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
module Test.Utils where
......
module Test.Utils.Db where
import Data.Maybe (fromJust)
import Database.PostgreSQL.Simple qualified as PSQL
import Database.PostgreSQL.Simple.Options qualified as PSOpts
import Database.Postgres.Temp qualified as Tmp
import Gargantext.Prelude
tmpDBToConnInfo :: Tmp.DB -> PSQL.ConnectInfo
tmpDBToConnInfo db =
PSQL.ConnectInfo { connectHost = fromJust $ getLast $ PSOpts.host opts
, connectPort = fromIntegral $ fromJust $ getLast $ PSOpts.port opts
, connectUser = fromJust $ getLast $ PSOpts.user opts
, connectPassword = fromJust $ getLast $ PSOpts.password opts
, connectDatabase = fromJust $ getLast $ PSOpts.dbname opts }
where
opts = Tmp.toConnectionOptions db
......@@ -8,7 +8,6 @@ import Control.Monad
import Data.Text (isInfixOf)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Shelly hiding (FilePath)
import System.IO
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment