[worker] tests pass now

parent dc45dd45
Pipeline #6943 failed with stages
in 59 minutes and 4 seconds
...@@ -79,7 +79,7 @@ wosToDocs limit patterns time path = do ...@@ -79,7 +79,7 @@ wosToDocs limit patterns time path = do
tsvToDocs :: CorpusParser -> Patterns -> TimeUnit -> FilePath -> IO [Document] tsvToDocs :: CorpusParser -> Patterns -> TimeUnit -> FilePath -> IO [Document]
tsvToDocs parser patterns time path = tsvToDocs parser patterns time path =
case parser of case parser of
Wos _ -> Prelude.error "tsvToDocs: unimplemented" Wos _ -> errorTrace "tsvToDocs: unimplemented"
Tsv limit -> Vector.toList Tsv limit -> Vector.toList
<$> Vector.take limit <$> Vector.take limit
<$> Vector.map (\row -> Document (toPhyloDate (Tsv.fromMIntOrDec Tsv.defaultYear $ tsv_publication_year row) (fromMaybe Tsv.defaultMonth $ tsv_publication_month row) (fromMaybe Tsv.defaultDay $ tsv_publication_day row) time) <$> Vector.map (\row -> Document (toPhyloDate (Tsv.fromMIntOrDec Tsv.defaultYear $ tsv_publication_year row) (fromMaybe Tsv.defaultMonth $ tsv_publication_month row) (fromMaybe Tsv.defaultDay $ tsv_publication_day row) time)
...@@ -136,7 +136,7 @@ readListV4 path = do ...@@ -136,7 +136,7 @@ readListV4 path = do
case listJson of case listJson of
Left err -> do Left err -> do
putStrLn err putStrLn err
Prelude.error "readListV4 unimplemented" errorTrace "readListV4 unimplemented"
Right listV4 -> pure listV4 Right listV4 -> pure listV4
...@@ -173,7 +173,7 @@ seaToLabel config = case (seaElevation config) of ...@@ -173,7 +173,7 @@ seaToLabel config = case (seaElevation config) of
sensToLabel :: PhyloConfig -> [Char] sensToLabel :: PhyloConfig -> [Char]
sensToLabel config = case (similarity config) of sensToLabel config = case (similarity config) of
Hamming _ _ -> Prelude.error "sensToLabel: unimplemented" Hamming _ _ -> errorTrace "sensToLabel: unimplemented"
WeightedLogJaccard s _ -> ("WeightedLogJaccard_" <> show s) WeightedLogJaccard s _ -> ("WeightedLogJaccard_" <> show s)
WeightedLogSim s _ -> ( "WeightedLogSim-sens_" <> show s) WeightedLogSim s _ -> ( "WeightedLogSim-sens_" <> show s)
......
...@@ -18,7 +18,7 @@ fi ...@@ -18,7 +18,7 @@ fi
# with the `sha256sum` result calculated on the `cabal.project` and # with the `sha256sum` result calculated on the `cabal.project` and
# `cabal.project.freeze`. This ensures the files stay deterministic so that CI # `cabal.project.freeze`. This ensures the files stay deterministic so that CI
# cache can kick in. # cache can kick in.
expected_cabal_project_hash="32e003b7964ba9de82aed8c09b290b089f0f205f76c5f18169aee2ed38cf518b" expected_cabal_project_hash="3b00795e0b1c97372e72a3ef464aa809ca90d8c3f1ab580d6a956526c94c160c"
expected_cabal_project_freeze_hash="30dd1cf2cb2015351dd0576391d22b187443b1935c2be23599b821ad1ab95f23" expected_cabal_project_freeze_hash="30dd1cf2cb2015351dd0576391d22b187443b1935c2be23599b821ad1ab95f23"
......
...@@ -61,11 +61,11 @@ services: ...@@ -61,11 +61,11 @@ services:
# volumes: # volumes:
# - pgadmin:/var/lib/pgadmin # - pgadmin:/var/lib/pgadmin
# corenlp: corenlp:
# #image: 'cgenie/corenlp-garg:latest' #image: 'cgenie/corenlp-garg:latest'
# image: 'cgenie/corenlp-garg:4.5.4' image: 'cgenie/corenlp-garg:4.5.4'
# ports: ports:
# - 9000:9000 - 9000:9000
# johnsnownlp: # johnsnownlp:
# image: 'johnsnowlabs/nlp-server:latest' # image: 'johnsnowlabs/nlp-server:latest'
......
...@@ -878,6 +878,7 @@ test-suite garg-test-hspec ...@@ -878,6 +878,7 @@ test-suite garg-test-hspec
Test.API.Routes Test.API.Routes
Test.API.Setup Test.API.Setup
Test.API.UpdateList Test.API.UpdateList
Test.API.Worker
Test.Database.Operations Test.Database.Operations
Test.Database.Operations.DocumentSearch Test.Database.Operations.DocumentSearch
Test.Database.Operations.NodeStory Test.Database.Operations.NodeStory
......
...@@ -18,6 +18,7 @@ import Gargantext.Core.Worker.Jobs (sendJob) ...@@ -18,6 +18,7 @@ import Gargantext.Core.Worker.Jobs (sendJob)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId) import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (logM, LogLevel(..))
import Servant.API ((:>), (:-), JSON, Post, ReqBody) import Servant.API ((:>), (:-), JSON, Post, ReqBody)
import Servant.Server.Generic (AsServerT) import Servant.Server.Generic (AsServerT)
...@@ -35,6 +36,7 @@ serveWorkerAPI f = WorkerAPI { workerAPIPost } ...@@ -35,6 +36,7 @@ serveWorkerAPI f = WorkerAPI { workerAPIPost }
where where
workerAPIPost i = do workerAPIPost i = do
let job = f i let job = f i
logM DDEBUG $ "[serveWorkerAPI] sending job " <> show job
mId <- sendJob job mId <- sendJob job
pure $ JobInfo { _ji_message_id = mId pure $ JobInfo { _ji_message_id = mId
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
......
...@@ -98,6 +98,8 @@ gServer (NotificationsConfig { .. }) = do ...@@ -98,6 +98,8 @@ gServer (NotificationsConfig { .. }) = do
Just (UpdateWorkerProgress _ji _jl) -> do Just (UpdateWorkerProgress _ji _jl) -> do
-- logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl -- logMsg ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
void $ timeout 100_000 $ send s_dispatcher r void $ timeout 100_000 $ send s_dispatcher r
Just Ping -> do
void $ timeout 100_000 $ send s_dispatcher r
Nothing -> Nothing ->
logMsg ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r logMsg ioLogger ERROR $ "[central_exchange] cannot decode message: " <> show r
......
...@@ -40,10 +40,12 @@ data CEMessage = ...@@ -40,10 +40,12 @@ data CEMessage =
UpdateWorkerProgress JobInfo JobLog UpdateWorkerProgress JobInfo JobLog
-- | Update tree for given nodeId -- | Update tree for given nodeId
| UpdateTreeFirstLevel NodeId | UpdateTreeFirstLevel NodeId
| Ping
instance Prelude.Show CEMessage where instance Prelude.Show CEMessage where
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl -- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
show Ping = "Ping"
instance FromJSON CEMessage where instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type" type_ <- o .: "type"
...@@ -57,18 +59,20 @@ instance FromJSON CEMessage where ...@@ -57,18 +59,20 @@ instance FromJSON CEMessage where
"update_tree_first_level" -> do "update_tree_first_level" -> do
node_id <- o .: "node_id" node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id pure $ UpdateTreeFirstLevel node_id
"ping" -> pure Ping
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where instance ToJSON CEMessage where
toJSON (UpdateWorkerProgress ji jl) = object [ toJSON (UpdateWorkerProgress ji jl) = object [
"type" .= toJSON ("update_worker_progress" :: Text) "type" .= ("update_worker_progress" :: Text)
, "ji" .= toJSON ji , "ji" .= ji
, "jl" .= toJSON jl , "jl" .= jl
-- , "node_id" .= toJSON nodeId -- , "node_id" .= toJSON nodeId
] ]
toJSON (UpdateTreeFirstLevel nodeId) = object [ toJSON (UpdateTreeFirstLevel nodeId) = object [
"type" .= toJSON ("update_tree_first_level" :: Text) "type" .= ("update_tree_first_level" :: Text)
, "node_id" .= toJSON nodeId , "node_id" .= nodeId
] ]
toJSON Ping = object [ "type" .= ("ping" :: Text) ]
class HasCentralExchangeNotification env where class HasCentralExchangeNotification env where
......
...@@ -102,7 +102,7 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions = ...@@ -102,7 +102,7 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions =
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange" logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do Just ceMessage -> do
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage logMsg ioL DDEBUG $ "[dispatcher_listener] received " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions -- subs <- atomically $ readTVar subscriptions
filteredSubs <- atomically $ do filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
...@@ -148,6 +148,8 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -148,6 +148,8 @@ sendNotification throttleTChan ceMessage sub = do
if nodeId == nodeId' if nodeId == nodeId'
then Just $ NUpdateTree nodeId then Just $ NUpdateTree nodeId
else Nothing else Nothing
(Ping, CETypes.Ping) ->
Just NPing
_ -> Nothing _ -> Nothing
case mNotification of case mNotification of
...@@ -183,3 +185,5 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic } ...@@ -183,3 +185,5 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }
|| Just s_topic == (UpdateTree <$> _ji_mNode_id ji) || Just s_topic == (UpdateTree <$> _ji_mNode_id ji)
ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) = ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId s_topic == UpdateTree nodeId
ceMessageSubPred CETypes.Ping (Subscription { s_topic }) =
s_topic == Ping
...@@ -60,13 +60,16 @@ data Topic = ...@@ -60,13 +60,16 @@ data Topic =
-- | Given parent node id, trigger update of the node and its -- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus) -- children (e.g. list is automatically created in a corpus)
| UpdateTree NodeId | UpdateTree NodeId
| Ping
deriving (Eq, Ord) deriving (Eq, Ord)
instance Prelude.Show Topic where instance Prelude.Show Topic where
show (UpdateWorkerProgress ji) = "UpdateWorkerProgress " <> show ji show (UpdateWorkerProgress ji) = "UpdateWorkerProgress " <> show ji
show (UpdateTree nodeId) = "UpdateTree " <> show nodeId show (UpdateTree nodeId) = "UpdateTree " <> show nodeId
show Ping = "Ping"
instance Hashable Topic where instance Hashable Topic where
hashWithSalt salt (UpdateWorkerProgress ji) = hashWithSalt salt ("update-worker-progress" :: Text, Aeson.encode ji) hashWithSalt salt (UpdateWorkerProgress ji) = hashWithSalt salt ("update-worker-progress" :: Text, Aeson.encode ji)
hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId) hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId)
hashWithSalt salt Ping = hashWithSalt salt ("ping" :: Text)
instance FromJSON Topic where instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do parseJSON = Aeson.withObject "Topic" $ \o -> do
type_ <- o .: "type" type_ <- o .: "type"
...@@ -77,16 +80,18 @@ instance FromJSON Topic where ...@@ -77,16 +80,18 @@ instance FromJSON Topic where
"update_tree" -> do "update_tree" -> do
node_id <- o .: "node_id" node_id <- o .: "node_id"
pure $ UpdateTree node_id pure $ UpdateTree node_id
"ping" -> pure Ping
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON Topic where instance ToJSON Topic where
toJSON (UpdateWorkerProgress ji) = Aeson.object [ toJSON (UpdateWorkerProgress ji) = Aeson.object [
"type" .= toJSON ("update_worker_progress" :: Text) "type" .= ("update_worker_progress" :: Text)
, "ji" .= toJSON ji , "ji" .= ji
] ]
toJSON (UpdateTree node_id) = Aeson.object [ toJSON (UpdateTree node_id) = Aeson.object [
"type" .= toJSON ("update_tree" :: Text) "type" .= ("update_tree" :: Text)
, "node_id" .= toJSON node_id , "node_id" .= node_id
] ]
toJSON Ping = Aeson.object [ "type" .= ("ping" :: Text) ]
-- | A job status message -- | A job status message
-- newtype MJobStatus = MJobStatus (JobStatus 'Safe JobLog) -- newtype MJobStatus = MJobStatus (JobStatus 'Safe JobLog)
...@@ -208,12 +213,14 @@ data Notification = ...@@ -208,12 +213,14 @@ data Notification =
| NUpdateTree NodeId | NUpdateTree NodeId
| NWorkerJobStarted NodeId JobInfo | NWorkerJobStarted NodeId JobInfo
| NWorkerJobFinished NodeId JobInfo | NWorkerJobFinished NodeId JobInfo
| NPing
instance Prelude.Show Notification where instance Prelude.Show Notification where
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog -- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
show (NUpdateWorkerProgress jobInfo mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show mJobLog show (NUpdateWorkerProgress jobInfo mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show mJobLog
show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId
show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji
show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji
show NPing = "NPing"
instance ToJSON Notification where instance ToJSON Notification where
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [ -- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
toJSON (NUpdateWorkerProgress jobInfo mJobLog) = Aeson.object [ toJSON (NUpdateWorkerProgress jobInfo mJobLog) = Aeson.object [
...@@ -236,6 +243,7 @@ instance ToJSON Notification where ...@@ -236,6 +243,7 @@ instance ToJSON Notification where
, "node_id" .= toJSON nodeId , "node_id" .= toJSON nodeId
, "ji" .= toJSON ji , "ji" .= toJSON ji
] ]
toJSON NPing = Aeson.object [ "type" .= ("ping" :: Text) ]
-- We don't need to decode notifications, this is for tests only -- We don't need to decode notifications, this is for tests only
instance FromJSON Notification where instance FromJSON Notification where
parseJSON = Aeson.withObject "Notification" $ \o -> do parseJSON = Aeson.withObject "Notification" $ \o -> do
...@@ -258,4 +266,5 @@ instance FromJSON Notification where ...@@ -258,4 +266,5 @@ instance FromJSON Notification where
nodeId <- o .: "node_id" nodeId <- o .: "node_id"
ji <- o .: "ji" ji <- o .: "ji"
pure $ NWorkerJobFinished nodeId ji pure $ NWorkerJobFinished nodeId ji
"ping" -> pure NPing
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
...@@ -14,11 +14,13 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -14,11 +14,13 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
module Gargantext.Core.Notifications.Dispatcher.WebSocket where module Gargantext.Core.Notifications.Dispatcher.WebSocket where
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
import Control.Exception.Safe qualified as Exc
import Control.Lens (view) import Control.Lens (view)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.UUID.V4 as UUID import Data.UUID.V4 as UUID
...@@ -29,7 +31,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types ...@@ -29,7 +31,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Notifications.Dispatcher (Dispatcher, dispatcherSubscriptions) import Gargantext.Core.Notifications.Dispatcher (Dispatcher, dispatcherSubscriptions)
import Gargantext.Core.Config (HasJWTSettings(jwtSettings)) import Gargantext.Core.Config (HasJWTSettings(jwtSettings))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger) import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger, logM)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Servant import Servant
import Servant.API.WebSocket qualified as WS (WebSocketPending) import Servant.API.WebSocket qualified as WS (WebSocketPending)
...@@ -43,12 +45,15 @@ newtype WSAPI mode = WSAPI { ...@@ -43,12 +45,15 @@ newtype WSAPI mode = WSAPI {
} deriving Generic } deriving Generic
wsServer :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env ) => WSAPI (AsServerT m) wsServer :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env, Exc.MonadCatch m ) => WSAPI (AsServerT m)
wsServer = WSAPI { wsAPIServer = streamData } wsServer = WSAPI { wsAPIServer = streamData }
where where
streamData :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env ) -- NOTE Exc.catches is required by tests, otherwise disconnectin
-- via ws doesn't work. But it does work "normally" when the
-- server is running...
streamData :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env, Exc.MonadCatch m )
=> WS.PendingConnection -> m () => WS.PendingConnection -> m ()
streamData pc = do streamData pc = Exc.catches (do
jwtS <- view jwtSettings jwtS <- view jwtSettings
d <- view hasDispatcher d <- view hasDispatcher
let subscriptions = dispatcherSubscriptions d let subscriptions = dispatcherSubscriptions d
...@@ -58,6 +63,10 @@ wsServer = WSAPI { wsAPIServer = streamData } ...@@ -58,6 +63,10 @@ wsServer = WSAPI { wsAPIServer = streamData }
_ <- liftBase $ Async.concurrently (wsLoop jwtS subscriptions ws) (pingLoop ws) _ <- liftBase $ Async.concurrently (wsLoop jwtS subscriptions ws) (pingLoop ws)
-- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws) -- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure () pure ()
) [ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of
WS.ConnectionClosed -> logM DEBUG $ "[wsServer] connection closed"
_ -> Exc.throw err ]
-- | Send a ping control frame periodically, otherwise the -- | Send a ping control frame periodically, otherwise the
......
...@@ -27,7 +27,6 @@ import Data.Conduit.Combinators qualified as CC ...@@ -27,7 +27,6 @@ import Data.Conduit.Combinators qualified as CC
import Data.XML.Types qualified as XML import Data.XML.Types qualified as XML
import Gargantext.Core.Viz.Graph.Types qualified as G import Gargantext.Core.Viz.Graph.Types qualified as G
import Gargantext.Prelude import Gargantext.Prelude
import Prelude qualified
import Servant (MimeRender(..), MimeUnrender(..)) import Servant (MimeRender(..), MimeUnrender(..))
import Servant.XML.Conduit (XML) import Servant.XML.Conduit (XML)
import Text.XML.Stream.Render qualified as XML import Text.XML.Stream.Render qualified as XML
...@@ -90,4 +89,4 @@ instance MimeRender XML G.Graph where ...@@ -90,4 +89,4 @@ instance MimeRender XML G.Graph where
-- just to be able to derive a client for the entire gargantext API, -- just to be able to derive a client for the entire gargantext API,
-- we however want to avoid sollicitating this instance -- we however want to avoid sollicitating this instance
instance MimeUnrender XML G.Graph where instance MimeUnrender XML G.Graph where
mimeUnrender _ _ = Prelude.error "MimeUnrender Graph: not defined, just a placeholder" mimeUnrender _ _ = errorTrace "MimeUnrender Graph: not defined, just a placeholder"
...@@ -20,6 +20,7 @@ module Gargantext.Core.Worker where ...@@ -20,6 +20,7 @@ module Gargantext.Core.Worker where
import Async.Worker.Broker.Types (toA, getMessage, messageId) import Async.Worker.Broker.Types (toA, getMessage, messageId)
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Types qualified as W import Async.Worker.Types qualified as W
import Control.Lens (to)
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.API.Admin.Auth (forgotUserPassword) import Gargantext.API.Admin.Auth (forgotUserPassword)
import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..)) import Gargantext.API.Admin.Auth.Types (ForgotPasswordAsyncParams(..))
...@@ -34,9 +35,11 @@ import Gargantext.API.Node.File (addWithFile) ...@@ -34,9 +35,11 @@ import Gargantext.API.Node.File (addWithFile)
import Gargantext.API.Node.New (postNode') import Gargantext.API.Node.New (postNode')
import Gargantext.API.Node.Update (updateNode) import Gargantext.API.Node.Update (updateNode)
import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync) import Gargantext.API.Server.Named.Ngrams (tableNgramsPostChartsAsync)
import Gargantext.Core.Config (hasConfig, gc_jobs) import Gargantext.Core.Config (hasConfig, gc_database_config, gc_jobs, gc_notifications_config, gc_worker)
import Gargantext.Core.Config.Types (jc_max_docs_scrapers) import Gargantext.Core.Config.Types (jc_max_docs_scrapers)
import Gargantext.Core.Config.Worker (WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Viz.Graph.API (graphRecompute) import Gargantext.Core.Viz.Graph.API (graphRecompute)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Env import Gargantext.Core.Worker.Env
...@@ -44,7 +47,7 @@ import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState) ...@@ -44,7 +47,7 @@ import Gargantext.Core.Worker.PGMQTypes (BrokerMessage, HasWorkerBroker, WState)
import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId) import Gargantext.Core.Worker.Jobs.Types (Job(..), getWorkerMNodeId)
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Database.Query.Table.User (getUsersWithEmail) import Gargantext.Database.Query.Table.User (getUsersWithEmail)
import Gargantext.Prelude import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(..), logMsg, withLogger )
import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..)) import Gargantext.Utils.Jobs.Error (HumanFriendlyErrorText(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, markFailed)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(markStarted, markComplete, markFailed))
...@@ -57,7 +60,7 @@ initWorkerState :: HasWorkerBroker ...@@ -57,7 +60,7 @@ initWorkerState :: HasWorkerBroker
-> IO WState -> IO WState
initWorkerState env (WorkerDefinition { .. }) = do initWorkerState env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig broker <- initBrokerWithDBCreate (gargConfig ^. gc_database_config) (gargConfig ^. gc_worker)
pure $ W.State { broker pure $ W.State { broker
, queueName = _wdQueue , queueName = _wdQueue
...@@ -216,6 +219,7 @@ performAction env _state bm = do ...@@ -216,6 +219,7 @@ performAction env _state bm = do
case job of case job of
Ping -> runWorkerMonad env $ do Ping -> runWorkerMonad env $ do
$(logLocM) DEBUG "[performAction] ping" $(logLocM) DEBUG "[performAction] ping"
liftIO $ CE.notify (env ^. (to _w_env_config) . gc_notifications_config) CET.Ping
AddContact { .. } -> runWorkerMonad env $ do AddContact { .. } -> runWorkerMonad env $ do
$(logLocM) DEBUG $ "[performAction] add contact" $(logLocM) DEBUG $ "[performAction] add contact"
addContact _ac_user _ac_node_id _ac_args jh addContact _ac_user _ac_node_id _ac_args jh
......
...@@ -18,7 +18,6 @@ import Async.Worker.Broker.Types (initBroker) ...@@ -18,7 +18,6 @@ import Async.Worker.Broker.Types (initBroker)
import Data.Text qualified as T import Data.Text qualified as T
import Data.Text.Encoding qualified as TE import Data.Text.Encoding qualified as TE
import Database.PostgreSQL.Simple qualified as PSQL import Database.PostgreSQL.Simple qualified as PSQL
import Gargantext.Core.Config (GargConfig(..), gc_worker)
import Gargantext.Core.Config.Worker (WorkerSettings(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, Broker) import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, Broker)
import Gargantext.Database.Prelude (createDBIfNotExists) import Gargantext.Database.Prelude (createDBIfNotExists)
...@@ -29,13 +28,18 @@ import Gargantext.Prelude ...@@ -29,13 +28,18 @@ import Gargantext.Prelude
-- | Create DB if not exists, then run 'initBroker' (which, in -- | Create DB if not exists, then run 'initBroker' (which, in
-- particular, creates the pgmq extension, if needed) -- particular, creates the pgmq extension, if needed)
initBrokerWithDBCreate :: HasWorkerBroker initBrokerWithDBCreate :: HasWorkerBroker
=> GargConfig => PSQL.ConnectInfo
-> WorkerSettings
-> IO Broker -> IO Broker
initBrokerWithDBCreate gc@(GargConfig { _gc_database_config }) = do initBrokerWithDBCreate pivotDb ws = do
-- By using gargantext db credentials, we create pgmq db (if needed) -- By using gargantext db credentials, we create pgmq db (if needed)
let WorkerSettings { .. } = gc ^. gc_worker let psqlDB = TE.decodeUtf8 $ PSQL.postgreSQLConnectionString pivotDb
let psqlDB = TE.decodeUtf8 $ PSQL.postgreSQLConnectionString _gc_database_config
createDBIfNotExists psqlDB (T.pack $ PSQL.connectDatabase _wsDatabase) let brokerDb = _wsDatabase ws
-- Using the pivotDb credentials, create ws Db (if this is the same db host/port)
when (PSQL.connectHost pivotDb == PSQL.connectHost brokerDb &&
PSQL.connectPort pivotDb == PSQL.connectPort brokerDb) $ do
createDBIfNotExists psqlDB (T.pack $ PSQL.connectDatabase brokerDb)
initBroker $ PGMQBrokerInitParams _wsDatabase _wsDefaultVisibilityTimeout initBroker $ PGMQBrokerInitParams brokerDb $ _wsDefaultVisibilityTimeout ws
...@@ -13,39 +13,43 @@ Portability : POSIX ...@@ -13,39 +13,43 @@ Portability : POSIX
module Gargantext.Core.Worker.Jobs where module Gargantext.Core.Worker.Jobs where
import Async.Worker.Broker.Types (MessageId)
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_worker, HasConfig(..)) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
import Gargantext.Core.Worker.Jobs.Types (Job(..)) import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Core.Worker.PGMQTypes (HasWorkerBroker, MessageId, SendJob)
import Gargantext.Database.Prelude (Cmd') import Gargantext.Database.Prelude (Cmd')
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (logMsg, withLogger, LogLevel(..))
sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env) sendJob :: (HasWorkerBroker, HasConfig env)
=> Job => Job
-> Cmd' env err (MessageId PGMQBroker) -> Cmd' env err MessageId
sendJob job = do sendJob job = do
gcConfig <- view $ hasConfig gcConfig <- view $ hasConfig
let WorkerSettings { _wsDefinitions, _wsDefaultDelay } = gcConfig ^. gc_worker liftBase $ sendJobCfg gcConfig job
sendJobCfg :: GargConfig -> Job -> IO MessageId
sendJobCfg gcConfig job = do
let ws@WorkerSettings { _wsDefinitions, _wsDefaultDelay } = gcConfig ^. gc_worker
-- TODO Try to guess which worker should get this job -- TODO Try to guess which worker should get this job
-- let mWd = findDefinitionByName ws workerName -- let mWd = findDefinitionByName ws workerName
let mWd = head _wsDefinitions let mWd = head _wsDefinitions
case mWd of case mWd of
Nothing -> panicTrace "No worker definitions available" Nothing -> panicTrace "No worker definitions available"
Just wd -> liftBase $ do Just wd -> do
b <- initBrokerWithDBCreate gcConfig b <- initBrokerWithDBCreate (gcConfig ^. gc_database_config) ws
let queueName = _wdQueue wd let queueName = _wdQueue wd
let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay } let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay }
putText $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")" withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job' W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> W.SendJob PGMQBroker Job -> W.SendJob PGMQBroker Job updateJobData :: Job -> SendJob -> SendJob
updateJobData (AddCorpusFormAsync {}) sj = sj { W.timeout = 300 } updateJobData (AddCorpusFormAsync {}) sj = sj { W.timeout = 300 }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 } updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 }
updateJobData _ sj = sj { W.resendOnKill = False } updateJobData _ sj = sj { W.resendOnKill = False }
...@@ -16,6 +16,7 @@ module Gargantext.Core.Worker.PGMQTypes where ...@@ -16,6 +16,7 @@ module Gargantext.Core.Worker.PGMQTypes where
import Async.Worker.Broker.PGMQ (PGMQBroker) import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker.Broker.Types qualified as BT import Async.Worker.Broker.Types qualified as BT
import Async.Worker qualified as W
import Async.Worker.Types qualified as W import Async.Worker.Types qualified as W
import Gargantext.Core.Worker.Jobs.Types (Job) import Gargantext.Core.Worker.Jobs.Types (Job)
...@@ -23,4 +24,6 @@ import Gargantext.Core.Worker.Jobs.Types (Job) ...@@ -23,4 +24,6 @@ import Gargantext.Core.Worker.Jobs.Types (Job)
type HasWorkerBroker = W.HasWorkerBroker PGMQBroker Job type HasWorkerBroker = W.HasWorkerBroker PGMQBroker Job
type Broker = BT.Broker PGMQBroker (W.Job Job) type Broker = BT.Broker PGMQBroker (W.Job Job)
type BrokerMessage = BT.BrokerMessage PGMQBroker (W.Job Job) type BrokerMessage = BT.BrokerMessage PGMQBroker (W.Job Job)
type MessageId = BT.MessageId PGMQBroker
type SendJob = W.SendJob PGMQBroker Job
type WState = W.State PGMQBroker Job type WState = W.State PGMQBroker Job
...@@ -260,7 +260,7 @@ ...@@ -260,7 +260,7 @@
git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git" git: "https://gitlab.iscpif.fr/gargantext/gargantext-graph.git"
subdirs: subdirs:
- . - .
- commit: 239a5eca1f11f802f4ae3cc1c80c390f7c6896ac - commit: d3c0b658aae5dedce04f4f1605e4a6605efebd31
git: "https://gitlab.iscpif.fr/gargantext/haskell-bee" git: "https://gitlab.iscpif.fr/gargantext/haskell-bee"
subdirs: subdirs:
- . - .
......
...@@ -67,6 +67,8 @@ login_type = "Normal" ...@@ -67,6 +67,8 @@ login_type = "Normal"
[notifications] [notifications]
central-exchange = { bind = "tcp://*:15560", connect = "tcp://localhost:15560" } central-exchange = { bind = "tcp://*:15560", connect = "tcp://localhost:15560" }
dispatcher = { bind = "tcp://*:15561", connect = "tcp://localhost:15561" } dispatcher = { bind = "tcp://*:15561", connect = "tcp://localhost:15561" }
# central-exchange = { bind = "ipc:///tmp/ce.ipc", connect = "ipc:///tmp/ce.ipc" }
# dispatcher = { bind = "ipc:///tmp/d.ipc", connect = "ipc:///tmp/d.ipc" }
[nlp] [nlp]
...@@ -78,6 +80,9 @@ All = "corenlp://localhost:9000" ...@@ -78,6 +80,9 @@ All = "corenlp://localhost:9000"
default_visibility_timeout = 1 default_visibility_timeout = 1
# default delay before job is visible to the worker
default_delay = 1
# NOTE This is overridden by Test.Database.Setup # NOTE This is overridden by Test.Database.Setup
[worker.database] [worker.database]
host = "127.0.0.1" host = "127.0.0.1"
......
module Test.API where module Test.API where
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Config.Types (NotificationsConfig)
import Prelude import Prelude
import Test.Hspec import Test.Hspec
import qualified Test.API.Authentication as Auth import qualified Test.API.Authentication as Auth
...@@ -11,14 +9,14 @@ import qualified Test.API.GraphQL as GraphQL ...@@ -11,14 +9,14 @@ import qualified Test.API.GraphQL as GraphQL
import qualified Test.API.Notifications as Notifications import qualified Test.API.Notifications as Notifications
import qualified Test.API.Private as Private import qualified Test.API.Private as Private
import qualified Test.API.UpdateList as UpdateList import qualified Test.API.UpdateList as UpdateList
import qualified Test.API.Worker as Worker
tests :: NotificationsConfig -> D.Dispatcher -> Spec tests :: Spec
tests nc dispatcher = describe "API" $ do tests = describe "API" $ do
Auth.tests Auth.tests
Private.tests Private.tests
GraphQL.tests GraphQL.tests
Errors.tests Errors.tests
UpdateList.tests UpdateList.tests
-- | TODO This would work if I managed to get forking dispatcher & Notifications.tests
-- exchange listeners properly Worker.tests
Notifications.tests nc dispatcher
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wno-orphans #-} {-# OPTIONS_GHC -Wno-orphans #-}
...@@ -9,22 +10,26 @@ module Test.API.Authentication ( ...@@ -9,22 +10,26 @@ module Test.API.Authentication (
) where ) where
import Control.Lens import Control.Lens
import Data.Aeson qualified as Aeson
import Data.Aeson.QQ
import Data.Text as T import Data.Text as T
import Gargantext.API.Admin.Auth.Types import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Routes.Named
import Gargantext.Core.Types import Gargantext.Core.Types
import Gargantext.Core.Types.Individu import Gargantext.Core.Types.Individu
import Gargantext.Database.Action.User.New import Gargantext.Database.Action.User.New
import Gargantext.Prelude import Gargantext.Prelude
import Network.HTTP.Client hiding (Proxy) import Network.HTTP.Client hiding (Proxy)
import Network.HTTP.Types.Status (status403)
import Prelude qualified import Prelude qualified
import Servant.Auth.Client () import Servant.Auth.Client ()
import Servant.Client import Servant.Client
import Servant.Client.Core.Response qualified as SR
import Servant.Client.Generic (genericClient)
import Test.API.Routes (auth_api) import Test.API.Routes (auth_api)
import Test.API.Setup (withTestDBAndPort, setupEnvironment, SpecContext (..)) import Test.API.Setup (withTestDBAndPort, setupEnvironment, SpecContext (..))
import Test.Database.Types import Test.Database.Types
import Test.Hspec import Test.Hspec
import Gargantext.API.Routes.Named
import Servant.Client.Generic (genericClient)
cannedToken :: T.Text cannedToken :: T.Text
cannedToken = "eyJhbGciOiJIUzUxMiJ9.eyJkYXQiOnsiaWQiOjF9fQ.t49zZSqkPAulEkYEh4pW17H2uwrkyPTdZKwHyG3KUJ0hzU2UUoPBNj8vdv087RCVBJ4tXgxNbP4j0RBv3gxdqg" cannedToken = "eyJhbGciOiJIUzUxMiJ9.eyJkYXQiOnsiaWQiOjF9fQ.t49zZSqkPAulEkYEh4pW17H2uwrkyPTdZKwHyG3KUJ0hzU2UUoPBNj8vdv087RCVBJ4tXgxNbP4j0RBv3gxdqg"
...@@ -69,5 +74,22 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -69,5 +74,22 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
it "denies login for user 'alice' if password is invalid" $ \(SpecContext _testEnv port _app _) -> do it "denies login for user 'alice' if password is invalid" $ \(SpecContext _testEnv port _app _) -> do
let authPayload = AuthRequest "alice" (GargPassword "wrong") let authPayload = AuthRequest "alice" (GargPassword "wrong")
result <- runClientM (auth_api authPayload) (clientEnv port) result <- runClientM (auth_api authPayload) (clientEnv port)
putText $ "result: " <> show result -- putText $ "result: " <> show result
-- result `shouldBe` (Left $ InvalidUsernameOrPassword) -- result `shouldBe` (Left $ InvalidUsernameOrPassword)
result `shouldSatisfy` isLeft
{-
Left (FailureResponse (Request {requestPath = (BaseUrl {baseUrlScheme = Http, baseUrlHost = "localhost", baseUrlPort = 43009, baseUrlPath = ""},"/api/v1.0/auth"), requestQueryString = fromList [], requestBody = Just ((),application/json;charset=utf-8), requestAccept = fromList [application/json;charset=utf-8,application/json], requestHeaders = fromList [("X-Garg-Error-Scheme","new")], requestHttpVersion = HTTP/1.1, requestMethod = "POST"}) (Response {responseStatusCode = Status {statusCode = 403, statusMessage = "Invalid username or password."}, responseHeaders = fromList [("Transfer-Encoding","chunked"),("Date","Tue, 05 Nov 2024 09:40:35 GMT"),("Server","Warp/3.3.31")], responseHttpVersion = HTTP/1.1, responseBody = "{\"data\":{},\"diagnostic\":\"Invalid username or password.\",\"type\":\"EC_403__login_failed_invalid_username_or_password\"}"}))
-}
let (Left result') = result
result' `shouldSatisfy` isFailureResponse
let (FailureResponse _ res) = result'
SR.responseStatusCode res `shouldBe` status403
SR.responseBody res `shouldBe`
(Aeson.encode [aesonQQ| { "data": {}
, "diagnostic": "Invalid username or password."
, "type": "EC_403__login_failed_invalid_username_or_password" } |])
isFailureResponse :: ClientError -> Bool
isFailureResponse (FailureResponse _ _) = True
isFailureResponse _ = False
{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
module Test.API.Errors (tests) where module Test.API.Errors (tests) where
import Gargantext.API.Routes.Named.Node import Gargantext.API.Routes.Named.Node
......
...@@ -21,37 +21,42 @@ module Test.API.Notifications ( ...@@ -21,37 +21,42 @@ module Test.API.Notifications (
import Control.Concurrent (threadDelay) import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync) import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Exception.Safe qualified as Exc import Control.Lens ((^.))
import Control.Monad (void)
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString qualified as BS
import Data.Maybe (isJust) import Data.Maybe (isJust)
import Gargantext.Core.Config (gc_notifications_config)
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET import Gargantext.Core.Notifications.CentralExchange.Types qualified as CET
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Config.Types (NotificationsConfig(..)) import Gargantext.System.Logging (logMsg, LogLevel(DEBUG), withLogger)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Prelude import Prelude
import Test.API.Setup (withTestDBAndNotifications) import System.Timeout qualified as Timeout
import Test.API.Setup (SpecContext(..), withTestDBAndPort)
import Test.Database.Types (test_config)
import Test.Hspec import Test.Hspec
import Test.Instances () import Test.Instances ()
import Test.Utils.Notifications import Test.Utils.Notifications
tests :: NotificationsConfig -> D.Dispatcher -> Spec -- tests :: D.Dispatcher -> Spec
tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatcher) $ do tests :: Spec
describe "Dispatcher, Central Exchange, WebSockets" $ do tests = sequential $ aroundAll withTestDBAndPort $ do
it "simple WS notification works" $ \((testEnv, port), _) -> do describe "Notifications" $ do
it "simple WS notification works" $ \(SpecContext testEnv port _app _) -> do
let nc = (test_config testEnv) ^. gc_notifications_config
let topic = DT.UpdateTree 0 let topic = DT.UpdateTree 0
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification)) tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection -- setup a websocket connection
let wsConnect = let wsConnect =
withWSConnection ("127.0.0.1", port) $ \conn -> do withWSConnection ("127.0.0.1", port) $ \conn -> do
-- We wait a bit before the server settles -- We wait a bit before the server settles
threadDelay (100 * millisecond) -- threadDelay (100 * millisecond)
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[wsConnect] subscribing topic: " <> show topic
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic) WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification let dec = Aeson.decode d :: Maybe DT.Notification
...@@ -66,7 +71,10 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc ...@@ -66,7 +71,10 @@ tests nc dispatcher = sequential $ aroundAll (withTestDBAndNotifications dispatc
let nodeId = 0 let nodeId = 0
CE.notify nc $ CET.UpdateTreeFirstLevel nodeId CE.notify nc $ CET.UpdateTreeFirstLevel nodeId
mTimeout <- Timeout.timeout (5 * 1000000) $ do
md <- atomically $ readTChan tchan
md <- atomically $ readTChan tchan md `shouldBe` Just (DT.NUpdateTree nodeId)
md `shouldBe` Just (DT.NUpdateTree nodeId) mTimeout `shouldSatisfy` isJust
...@@ -9,8 +9,8 @@ module Test.API.Private ( ...@@ -9,8 +9,8 @@ module Test.API.Private (
import Gargantext.API.Routes.Named.Node import Gargantext.API.Routes.Named.Node
import Gargantext.API.Routes.Named.Private import Gargantext.API.Routes.Named.Private
import Gargantext.Core.Types.Individu
import Gargantext.Core.Types (Node) import Gargantext.Core.Types (Node)
import Gargantext.Core.Types.Individu
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataUser) import Gargantext.Database.Admin.Types.Hyperdata (HyperdataUser)
import Gargantext.Prelude hiding (get) import Gargantext.Prelude hiding (get)
import Network.HTTP.Client hiding (Proxy) import Network.HTTP.Client hiding (Proxy)
...@@ -27,6 +27,7 @@ import Test.Hspec.Wai.Internal (withApplication) ...@@ -27,6 +27,7 @@ import Test.Hspec.Wai.Internal (withApplication)
import Test.Hspec.Wai.JSON (json) import Test.Hspec.Wai.JSON (json)
import Test.Utils (protected, shouldRespondWithFragment, withValidLogin) import Test.Utils (protected, shouldRespondWithFragment, withValidLogin)
privateTests :: SpecWith (SpecContext a) privateTests :: SpecWith (SpecContext a)
privateTests = privateTests =
describe "Private API" $ do describe "Private API" $ do
......
...@@ -5,12 +5,12 @@ module Test.API.Private.Table ( ...@@ -5,12 +5,12 @@ module Test.API.Private.Table (
) where ) where
import Gargantext.API.HashedResponse import Gargantext.API.HashedResponse
import Gargantext.API.Ngrams.Types qualified as APINgrams
import Gargantext.Core.Text.Corpus.Query import Gargantext.Core.Text.Corpus.Query
import Gargantext.Core.Types import Gargantext.Core.Types
import Gargantext.Core.Types.Individu import Gargantext.Core.Types.Individu
import Gargantext.Database.Query.Facet qualified as Facet
import Gargantext.Prelude import Gargantext.Prelude
import qualified Gargantext.API.Ngrams.Types as APINgrams
import qualified Gargantext.Database.Query.Facet as Facet
import Servant.Client import Servant.Client
import Test.API.Routes import Test.API.Routes
import Test.API.Setup import Test.API.Setup
......
...@@ -8,11 +8,12 @@ module Test.API.Routes where ...@@ -8,11 +8,12 @@ module Test.API.Routes where
import Data.Text.Encoding qualified as TE import Data.Text.Encoding qualified as TE
import Fmt (Builder, (+|), (|+)) import Fmt (Builder, (+|), (|+))
import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, Token) import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, Token)
import Gargantext.API.Admin.Orchestrator.Types (JobLog, asyncJobsAPI')
import Gargantext.API.Errors import Gargantext.API.Errors
import Gargantext.API.HashedResponse (HashedResponse) import Gargantext.API.HashedResponse (HashedResponse)
import Gargantext.API.Ngrams.List.Types (WithJsonFile, WithTextFile)
import Gargantext.API.Ngrams.Types ( NgramsTable, NgramsTablePatch, OrderBy, TabType, Versioned, VersionedWithCount ) import Gargantext.API.Ngrams.Types ( NgramsTable, NgramsTablePatch, OrderBy, TabType, Versioned, VersionedWithCount )
import Gargantext.API.Routes.Named import Gargantext.API.Routes.Named
import Gargantext.API.Routes.Named.List (updateListJSONEp, updateListTSVEp)
import Gargantext.API.Routes.Named.Node import Gargantext.API.Routes.Named.Node
import Gargantext.API.Routes.Named.Private hiding (tableNgramsAPI) import Gargantext.API.Routes.Named.Private hiding (tableNgramsAPI)
import Gargantext.API.Routes.Named.Table import Gargantext.API.Routes.Named.Table
...@@ -28,13 +29,11 @@ import Gargantext.Database.Query.Facet qualified as Facet ...@@ -28,13 +29,11 @@ import Gargantext.Database.Query.Facet qualified as Facet
import Gargantext.Prelude import Gargantext.Prelude
import Network.HTTP.Types qualified as H import Network.HTTP.Types qualified as H
import Network.Wai.Handler.Warp (Port) import Network.Wai.Handler.Warp (Port)
import Servant ((:<|>)(..))
import Servant.API.WebSocket qualified as WS import Servant.API.WebSocket qualified as WS
import Servant.Auth.Client qualified as S import Servant.Auth.Client qualified as S
import Servant.Client (ClientM) import Servant.Client (ClientM)
import Servant.Client.Core (RunClient, HasClient(..), Request) import Servant.Client.Core (RunClient, HasClient(..), Request)
import Servant.Client.Generic ( genericClient, AsClientT ) import Servant.Client.Generic ( genericClient, AsClientT )
import Servant.Job.Async
instance RunClient m => HasClient m WS.WebSocketPending where instance RunClient m => HasClient m WS.WebSocketPending where
...@@ -101,6 +100,47 @@ update_node (toServantToken -> token) nodeId params = ...@@ -101,6 +100,47 @@ update_node (toServantToken -> token) nodeId params =
& workerAPIPost & workerAPIPost
& (\submitForm -> submitForm params) & (\submitForm -> submitForm params)
add_form_to_list :: Token
-> ListId
-> WithJsonFile
-> ClientM JobInfo
add_form_to_list (toServantToken -> token) listId params =
clientRoutes & apiWithCustomErrorScheme
& ($ GES_new)
& backendAPI
& backendAPI'
& mkBackEndAPI
& gargAPIVersion
& gargPrivateAPI
& mkPrivateAPI
& ($ token)
& listJsonAPI
& updateListJSONEp
& ($ listId)
& workerAPIPost
& (\submitForm -> submitForm params)
add_tsv_to_list :: Token
-> ListId
-> WithTextFile
-> ClientM JobInfo
add_tsv_to_list (toServantToken -> token) listId params =
clientRoutes & apiWithCustomErrorScheme
& ($ GES_new)
& backendAPI
& backendAPI'
& mkBackEndAPI
& gargAPIVersion
& gargPrivateAPI
& mkPrivateAPI
& ($ token)
& listTsvAPI
& updateListTSVEp
& ($ listId)
& workerAPIPost
& (\submitForm -> submitForm params)
get_table_ngrams :: Token get_table_ngrams :: Token
-> NodeId -> NodeId
-> TabType -> TabType
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
module Test.API.Setup ( module Test.API.Setup (
SpecContext(..) SpecContext(..)
, withTestDBAndPort , withTestDBAndPort
, withTestDBAndNotifications
, withBackendServerAndProxy , withBackendServerAndProxy
, setupEnvironment , setupEnvironment
, createAliceAndBob , createAliceAndBob
...@@ -22,9 +21,9 @@ import Gargantext.API (makeApp) ...@@ -22,9 +21,9 @@ import Gargantext.API (makeApp)
import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..), env_dispatcher) import Gargantext.API.Admin.EnvTypes (Mode(Mock), Env (..), env_dispatcher)
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core.Notifications.Dispatcher qualified as D import Gargantext.Core.Notifications (withNotifications)
import Gargantext.Core.Config (_gc_secrets, gc_frontend_config, hasConfig) import Gargantext.Core.Config (_gc_secrets, gc_frontend_config)
import Gargantext.Core.Config.Types (SettingsFile(..), jc_js_job_timeout, jc_js_id_timeout, fc_appPort, jwtSettings) import Gargantext.Core.Config.Types (NotificationsConfig(..), fc_appPort, jwtSettings)
import Gargantext.Core.Types.Individu import Gargantext.Core.Types.Individu
import Gargantext.Database.Action.Flow import Gargantext.Database.Action.Flow
import Gargantext.Database.Action.User.New import Gargantext.Database.Action.User.New
...@@ -35,8 +34,8 @@ import Gargantext.Database.Prelude () ...@@ -35,8 +34,8 @@ import Gargantext.Database.Prelude ()
import Gargantext.Database.Query.Table.Node (getOrMkList) import Gargantext.Database.Query.Table.Node (getOrMkList)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..)) import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..))
import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp) import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp)
import Gargantext.Prelude hiding (catches, Handler)
import Gargantext.System.Logging import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Network.HTTP.Client.TLS (newTlsManager) import Network.HTTP.Client.TLS (newTlsManager)
import Network.HTTP.Types import Network.HTTP.Types
import Network.Wai (Application, responseLBS) import Network.Wai (Application, responseLBS)
...@@ -45,11 +44,8 @@ import Network.Wai.Handler.Warp (runSettingsSocket) ...@@ -45,11 +44,8 @@ import Network.Wai.Handler.Warp (runSettingsSocket)
import Network.Wai.Handler.Warp qualified as Warp import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Handler.Warp.Internal import Network.Wai.Handler.Warp.Internal
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Prelude
import Servant.Auth.Client () import Servant.Auth.Client ()
import Servant.Client import Test.Database.Setup (withTestDB)
import Servant.Job.Async qualified as ServantAsync
import Test.Database.Setup (withTestDB, fakeTomlPath)
import Test.Database.Types import Test.Database.Types
import UnliftIO qualified import UnliftIO qualified
...@@ -71,78 +67,43 @@ instance Functor SpecContext where ...@@ -71,78 +67,43 @@ instance Functor SpecContext where
newTestEnv :: TestEnv -> Logger (GargM Env BackendInternalError) -> Warp.Port -> IO Env newTestEnv :: TestEnv -> Logger (GargM Env BackendInternalError) -> Warp.Port -> IO Env
newTestEnv testEnv logger port = do newTestEnv testEnv logger port = do
SettingsFile sf <- fakeTomlPath
!manager_env <- newTlsManager !manager_env <- newTlsManager
let config_env = test_config testEnv & (gc_frontend_config . fc_appPort) .~ port let config_env = test_config testEnv & (gc_frontend_config . fc_appPort) .~ port
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
-- dbParam <- pure $ testEnvToPgConnectionInfo testEnv -- dbParam <- pure $ testEnvToPgConnectionInfo testEnv
-- !pool <- newPool dbParam -- !pool <- newPool dbParam
let pool = _DBHandle $ test_db testEnv
-- !nodeStory_env <- fromDBNodeStoryEnv pool -- !nodeStory_env <- fromDBNodeStoryEnv pool
!scrapers_env <- ServantAsync.newJobEnv ServantAsync.defaultSettings manager_env
!_env_jwt_settings <- jwtSettings (_gc_secrets config_env) !_env_jwt_settings <- jwtSettings (_gc_secrets config_env)
-- !central_exchange <- forkIO CE.gServer
-- !dispatcher <- D.dispatcher
pure $ Env pure $ Env
{ _env_logger = logger { _env_logger = logger
-- , _env_pool = pool
-- , _env_pool = Prelude.error "[Test.API.Setup.Env] pool not needed, but forced somewhere" -- , _env_pool = Prelude.error "[Test.API.Setup.Env] pool not needed, but forced somewhere"
, _env_pool = _DBHandle $ test_db testEnv , _env_pool = pool
-- , _env_nodeStory = nodeStory_env -- , _env_nodeStory = nodeStory_env
-- , _env_nodeStory = Prelude.error "[Test.API.Setup.Env] env nodeStory not needed, but forced somewhere" -- , _env_nodeStory = Prelude.error "[Test.API.Setup.Env] env nodeStory not needed, but forced somewhere"
, _env_nodeStory = test_nodeStory testEnv , _env_nodeStory = test_nodeStory testEnv
, _env_manager = manager_env , _env_manager = manager_env
, _env_scrapers = scrapers_env
, _env_self_url = self_url_env
, _env_config = config_env , _env_config = config_env
, _env_central_exchange = Prelude.error "[Test.API.Setup.Env] central exchange not needed, but forced somewhere (check StrictData)"
, _env_dispatcher = Prelude.error "[Test.API.Setup.Env] dispatcher not needed, but forced somewhere (check StrictData)"
-- , _env_central_exchange = central_exchange -- , _env_central_exchange = central_exchange
-- , _env_dispatcher = dispatcher , _env_dispatcher = errorTrace "[Test.API.Setup.newTestEnv] dispatcher not needed, but forced somewhere"
, _env_jwt_settings , _env_jwt_settings
} }
nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
, _nc_central_exchange_connect = "tcp://localhost:15560"
, _nc_dispatcher_bind = "tcp://*:15561"
, _nc_dispatcher_connect = "tcp://localhost:15561" }
-- | Run the gargantext server on a random port, picked by Warp, which allows -- | Run the gargantext server on a random port, picked by Warp, which allows
-- for concurrent tests to be executed in parallel, if we need to. -- for concurrent tests to be executed in parallel, if we need to.
withTestDBAndPort :: (SpecContext () -> IO ()) -> IO () withTestDBAndPort :: (SpecContext () -> IO ()) -> IO ()
withTestDBAndPort action = withTestDBAndPort action = withNotifications nc $ \dispatcher -> do
withTestDB $ \testEnv -> do
-- TODO Despite being cautious here only to start/kill dispatcher
-- & exchange, I still get nanomsg bind errors, which means these
-- are spawned before previous ones are killed. I guess one could
-- randomize ports for nanomsg...
-- let setup = do
-- withLoggerHoisted Mock $ \ioLogger -> do
-- env <- newTestEnv testEnv ioLogger 8080
-- !central_exchange <- forkIO CE.gServer
-- !dispatcher <- D.dispatcher
-- let env' = env { _env_central_exchange = central_exchange
-- , _env_dispatcher = dispatcher }
-- app <- makeApp env'
-- pure (app, env')
-- let teardown (_app, env) = do
-- killThread (DT.d_ce_listener $ _env_dispatcher env)
-- killThread (_env_central_exchange env)
-- bracket setup teardown $ \(app, _env) -> do
-- withGargApp app $ \port ->
-- action ((testEnv, port), app)
app <- withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080
makeApp env
let stgs = Warp.defaultSettings { settingsOnExceptionResponse = showDebugExceptions }
Warp.testWithApplicationSettings stgs (pure app) $ \port -> action (SpecContext testEnv port app ())
withTestDBAndNotifications :: D.Dispatcher -> (((TestEnv, Warp.Port), Application) -> IO ()) -> IO ()
withTestDBAndNotifications dispatcher action = do
withTestDB $ \testEnv -> do withTestDB $ \testEnv -> do
withLoggerHoisted Mock $ \ioLogger -> do withLoggerHoisted Mock $ \ioLogger -> do
env <- newTestEnv testEnv ioLogger 8080 env <- newTestEnv testEnv ioLogger 8080
...@@ -153,13 +114,19 @@ withTestDBAndNotifications dispatcher action = do ...@@ -153,13 +114,19 @@ withTestDBAndNotifications dispatcher action = do
-- An exception can be thrown by the websocket server (when client closes connection) -- An exception can be thrown by the websocket server (when client closes connection)
-- TODO I don't quite understand why the exception has to be caught here -- TODO I don't quite understand why the exception has to be caught here
-- and not under 'WS.runClient' -- and not under 'WS.runClient'
catches (Warp.testWithApplicationSettings stgs (pure app) $ \port -> action ((testEnv, port), app)) catches (Warp.testWithApplicationSettings stgs (pure app) $ \port -> action (SpecContext testEnv port app ()))
[ Handler $ \(err :: WS.ConnectionException) -> [ Handler $ \(err :: WS.ConnectionException) ->
case err of case err of
WS.CloseRequest _ _ -> WS.CloseRequest _ _ ->
withLogger () $ \ioLogger' -> withLogger () $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndNotifications] closeRequest caught" logTxt ioLogger' DEBUG "[withTestDBAndPort] CloseRequest caught"
_ -> throw err WS.ConnectionClosed ->
withLogger () $ \ioLogger' ->
logTxt ioLogger' DEBUG "[withTestDBAndPort] ConnectionClosed caught"
_ -> do
withLogger () $ \ioLogger' ->
logTxt ioLogger' ERROR $ "[withTestDBAndPort] unknown exception: " <> show err
throw err
-- re-throw any other exceptions -- re-throw any other exceptions
, Handler $ \(err :: SomeException) -> throw err ] , Handler $ \(err :: SomeException) -> throw err ]
......
This diff is collapsed.
{-|
Module : Test.API.Worker
Description : Basic tests for the async worker
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test.API.Worker (
tests
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan
import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson
import Data.Maybe (isJust)
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Worker.Jobs (sendJobCfg)
import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
import Network.WebSockets qualified as WS
import Prelude
import System.Timeout qualified as Timeout
import Test.API.Setup (SpecContext(..), withTestDBAndPort)
import Test.Database.Types (test_config)
import Test.Hspec
import Test.Instances ()
import Test.Utils.Notifications
tests :: Spec
tests = sequential $ aroundAll withTestDBAndPort $ do
describe "Worker" $ do
it "simple Ping job works" $ \(SpecContext testEnv port _app _) -> do
let cfg = test_config testEnv
let topic = DT.Ping
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect =
withWSConnection ("127.0.0.1", port) $ \conn -> do
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
-- wait a bit to settle
threadDelay (100 * millisecond)
withAsync wsConnect $ \_a -> do
-- wait a bit to connect
threadDelay (500 * millisecond)
_ <- sendJobCfg cfg Ping
mTimeout <- Timeout.timeout (5 * 1000000) $ do
md <- atomically $ readTChan tchan
md `shouldBe` Just DT.NPing
mTimeout `shouldSatisfy` isJust
...@@ -7,6 +7,7 @@ module Test.Database.Setup ( ...@@ -7,6 +7,7 @@ module Test.Database.Setup (
) where ) where
import Async.Worker qualified as Worker import Async.Worker qualified as Worker
import Control.Concurrent.STM.TVar (newTVarIO)
import Data.Maybe (fromJust) import Data.Maybe (fromJust)
import Data.Pool hiding (withResource) import Data.Pool hiding (withResource)
import Data.Pool qualified as Pool import Data.Pool qualified as Pool
...@@ -22,11 +23,12 @@ import Gargantext.Core.Config ...@@ -22,11 +23,12 @@ import Gargantext.Core.Config
import Gargantext.Core.Config.Types (SettingsFile(..)) import Gargantext.Core.Config.Types (SettingsFile(..))
import Gargantext.Core.Config.Utils (readConfig) import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.Config.Worker (wsDatabase, wsDefinitions) import Gargantext.Core.Config.Worker (wsDatabase, wsDefinitions)
import Gargantext.Core.NLP (nlpServerMap)
import Gargantext.Core.NodeStory (fromDBNodeStoryEnv) import Gargantext.Core.NodeStory (fromDBNodeStoryEnv)
import Gargantext.Core.Worker (initWorkerState) import Gargantext.Core.Worker (initWorkerState)
import Gargantext.Core.Worker.Env (WorkerEnv(..)) import Gargantext.Core.Worker.Env (WorkerEnv(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (withLoggerHoisted) import Gargantext.System.Logging (withLogger, withLoggerHoisted, logMsg, LogLevel(..))
import Paths_gargantext import Paths_gargantext
import Prelude qualified import Prelude qualified
import Shelly hiding (FilePath, run) import Shelly hiding (FilePath, run)
...@@ -82,7 +84,10 @@ setup = do ...@@ -82,7 +84,10 @@ setup = do
gargConfig <- fakeTomlPath >>= readConfig gargConfig <- fakeTomlPath >>= readConfig
-- fix db since we're using tmp-postgres -- fix db since we're using tmp-postgres
<&> (gc_database_config .~ connInfo) <&> (gc_database_config .~ connInfo)
-- <&> (gc_worker . wsDatabase .~ connInfo)
<&> (gc_worker . wsDatabase .~ (connInfo { PG.connectDatabase = "pgmq_test" })) <&> (gc_worker . wsDatabase .~ (connInfo { PG.connectDatabase = "pgmq_test" }))
-- putText $ "[setup] database: " <> show (gargConfig ^. gc_database_config)
-- putText $ "[setup] worker db: " <> show (gargConfig ^. gc_worker . wsDatabase)
let idleTime = 60.0 let idleTime = 60.0
let maxResources = 2 let maxResources = 2
let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db)) let poolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
...@@ -97,29 +102,31 @@ setup = do ...@@ -97,29 +102,31 @@ setup = do
let idleTime = 60.0 let idleTime = 60.0
let maxResources = 2 let maxResources = 2
let wPoolConfig = defaultPoolConfig (PG.connect $ gargConfig ^. gc_worker . wsDatabase) let wPoolConfig = defaultPoolConfig (PG.connectPostgreSQL (Tmp.toConnectionString db))
PG.close PG.close
idleTime idleTime
maxResources maxResources
wPool <- newPool (setNumStripes (Just 2) wPoolConfig) wPool <- newPool (setNumStripes (Just 2) wPoolConfig)
wNodeStory <- fromDBNodeStoryEnv wPool
_w_env_job_state <- newTVarIO Nothing
withLoggerHoisted Mock $ \wioLogger -> do withLoggerHoisted Mock $ \wioLogger -> do
let wEnv = WorkerEnv { _w_env_config = gargConfig let wEnv = WorkerEnv { _w_env_config = gargConfig
, _w_env_logger = wioLogger , _w_env_logger = wioLogger
, _w_env_pool = wPool , _w_env_pool = wPool
, _w_env_nodeStory = test_nodeStory , _w_env_nodeStory = wNodeStory
, _w_env_mail = Prelude.error "[wEnv] w_env_mail requested but not available" , _w_env_mail = errorTrace "[wEnv] w_env_mail requested but not available"
, _w_env_nlp = Prelude.error "[wEnv] w_env_nlp requested but not available" } , _w_env_nlp = nlpServerMap $ gargConfig ^. gc_nlp_config
, _w_env_job_state }
wState <- initWorkerState wEnv (fromJust $ head $ gargConfig ^. gc_worker . wsDefinitions) wState <- initWorkerState wEnv (fromJust $ head $ gargConfig ^. gc_worker . wsDefinitions)
test_worker_tid <- forkIO (Worker.run wState) test_worker_tid <- forkIO $ Worker.run wState
pure $ TestEnv { test_db = DBHandle pool db pure $ TestEnv { test_db = DBHandle pool db
, test_config = gargConfig , test_config = gargConfig
, test_nodeStory , test_nodeStory
, test_usernameGen = ugen , test_usernameGen = ugen
, test_logger = logger , test_logger = logger
, test_worker_tid , test_worker_tid
} }
withTestDB :: (TestEnv -> IO ()) -> IO () withTestDB :: (TestEnv -> IO ()) -> IO ()
withTestDB = bracket setup teardown withTestDB = bracket setup teardown
......
...@@ -36,6 +36,7 @@ import Gargantext.Core.Mail.Types (HasMail(..)) ...@@ -36,6 +36,7 @@ import Gargantext.Core.Mail.Types (HasMail(..))
import Gargantext.Core.NLP (HasNLPServer(..)) import Gargantext.Core.NLP (HasNLPServer(..))
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
import Gargantext.Database.Prelude (HasConnectionPool(..)) import Gargantext.Database.Prelude (HasConnectionPool(..))
import Gargantext.Prelude hiding (to)
import Gargantext.System.Logging (HasLogger(..), Logger, MonadLogger(..)) import Gargantext.System.Logging (HasLogger(..), Logger, MonadLogger(..))
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Network.URI (parseURI) import Network.URI (parseURI)
...@@ -123,7 +124,7 @@ instance HasNodeArchiveStoryImmediateSaver TestEnv where ...@@ -123,7 +124,7 @@ instance HasNodeArchiveStoryImmediateSaver TestEnv where
coreNLPConfig :: NLPServerConfig coreNLPConfig :: NLPServerConfig
coreNLPConfig = coreNLPConfig =
let uri = parseURI "http://localhost:9000" let uri = parseURI "http://localhost:9000"
in NLPServerConfig CoreNLP (fromMaybe (Prelude.error "parseURI for nlpServerConfig failed") uri) in NLPServerConfig CoreNLP (fromMaybe (errorTrace "parseURI for nlpServerConfig failed") uri)
instance HasNLPServer TestEnv where instance HasNLPServer TestEnv where
......
{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ScopedTypeVariables #-}
module Test.Ngrams.Query.PaginationCorpus where
import Prelude
import Data.Aeson module Test.Ngrams.Query.PaginationCorpus where
import Data.Map.Strict (Map)
import Gargantext.API.Ngrams
import Gargantext.API.Ngrams.Types
import Gargantext.Core.Types.Main
import Gargantext.Database.Admin.Types.Node
import System.IO.Unsafe
import qualified Data.ByteString as B
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Aeson
import Data.ByteString qualified as B
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Set qualified as Set
import Gargantext.API.Ngrams
import Gargantext.API.Ngrams.Types
import Gargantext.Core.Types.Main
import Gargantext.Database.Admin.Types.Node
import Paths_gargantext import Paths_gargantext
import Prelude
import System.IO.Unsafe
implementationElem :: NgramsElement implementationElem :: NgramsElement
......
...@@ -24,6 +24,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT ...@@ -24,6 +24,7 @@ import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Types.Individu (Username, GargPassword) import Gargantext.Core.Types.Individu (Username, GargPassword)
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (withLogger, logMsg, LogLevel(..))
import Network.HTTP.Client (defaultManagerSettings, newManager) import Network.HTTP.Client (defaultManagerSettings, newManager)
import Network.HTTP.Client qualified as HTTP import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Types (Header, Method, status200) import Network.HTTP.Types (Header, Method, status200)
...@@ -247,11 +248,12 @@ pollUntilWorkFinished :: HasCallStack ...@@ -247,11 +248,12 @@ pollUntilWorkFinished :: HasCallStack
-> JobInfo -> JobInfo
-> WaiSession () JobInfo -> WaiSession () JobInfo
pollUntilWorkFinished tkn port ji = do pollUntilWorkFinished tkn port ji = do
let waitSecs = 10
isFinishedTVar <- liftIO $ newTVarIO False isFinishedTVar <- liftIO $ newTVarIO False
let wsConnect = let wsConnect =
withWSConnection ("localhost", port) $ \conn -> do withWSConnection ("127.0.0.1", port) $ \conn -> do
-- We wait a bit before the server settles -- We wait a bit before the server settles
threadDelay (100 * millisecond) -- threadDelay (100 * millisecond)
-- subscribe to notifications about this job -- subscribe to notifications about this job
let topic = DT.UpdateWorkerProgress ji let topic = DT.UpdateWorkerProgress ji
WS.sendTextData conn $ JSON.encode (DT.WSSubscribe topic) WS.sendTextData conn $ JSON.encode (DT.WSSubscribe topic)
...@@ -260,22 +262,29 @@ pollUntilWorkFinished tkn port ji = do ...@@ -260,22 +262,29 @@ pollUntilWorkFinished tkn port ji = do
let dec = JSON.decode d :: Maybe DT.Notification let dec = JSON.decode d :: Maybe DT.Notification
case dec of case dec of
Nothing -> pure () Nothing -> pure ()
Just (DT.NUpdateWorkerProgress ji' jl) -> Just (DT.NUpdateWorkerProgress ji' jl) -> do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] received " <> show ji' <> ", " <> show jl
if ji' == ji && isFinished jl if ji' == ji && isFinished jl
then do then do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] FINISHED! " <> show ji'
atomically $ writeTVar isFinishedTVar True atomically $ writeTVar isFinishedTVar True
else else
pure () pure ()
_ -> pure () _ -> pure ()
liftIO $ withAsync wsConnect $ \a -> do liftIO $ withAsync wsConnect $ \a -> do
mRet <- Timeout.timeout (60 * 1000 * millisecond) $ do mRet <- Timeout.timeout (waitSecs * 1000 * millisecond) $ do
let go = do let go = do
isFinished <- readTVarIO isFinishedTVar isFinished <- readTVarIO isFinishedTVar
if isFinished if isFinished
then return True then do
withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] JOB FINISHED: " <> show ji
return True
else do else do
threadDelay (1000 * millisecond) threadDelay (50 * millisecond)
go go
go go
case mRet of case mRet of
......
...@@ -20,7 +20,7 @@ import Async.Worker.Broker.Types (toA, getMessage) ...@@ -20,7 +20,7 @@ import Async.Worker.Broker.Types (toA, getMessage)
import Async.Worker.Types qualified as WT import Async.Worker.Types qualified as WT
import Control.Concurrent.STM import Control.Concurrent.STM
import Data.Text qualified as T import Data.Text qualified as T
import Gargantext.Core.Config (hasConfig) import Gargantext.Core.Config (hasConfig, gc_database_config, gc_worker)
import Gargantext.Core.Config.Worker (WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerDefinition(..))
import Gargantext.Core.Worker (performAction) import Gargantext.Core.Worker (performAction)
import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate) import Gargantext.Core.Worker.Broker (initBrokerWithDBCreate)
...@@ -57,7 +57,7 @@ initTestWorkerState :: HasWorkerBroker ...@@ -57,7 +57,7 @@ initTestWorkerState :: HasWorkerBroker
-> IO WState -> IO WState
initTestWorkerState jobTVar env (WorkerDefinition { .. }) = do initTestWorkerState jobTVar env (WorkerDefinition { .. }) = do
let gargConfig = env ^. hasConfig let gargConfig = env ^. hasConfig
broker <- initBrokerWithDBCreate gargConfig broker <- initBrokerWithDBCreate (gargConfig ^. gc_database_config) (gargConfig ^. gc_worker)
pure $ WT.State { broker pure $ WT.State { broker
, queueName = _wdQueue , queueName = _wdQueue
......
...@@ -28,18 +28,21 @@ withWSConnection (host, port) = withWSConnection' (host, port, "/ws") ...@@ -28,18 +28,21 @@ withWSConnection (host, port) = withWSConnection' (host, port, "/ws")
-- | Wrap the logic of asynchronous connection closing -- | Wrap the logic of asynchronous connection closing
-- https://hackage.haskell.org/package/websockets-0.13.0.0/docs/Network-WebSockets-Connection.html#v:sendClose -- https://hackage.haskell.org/package/websockets-0.13.0.0/docs/Network-WebSockets-Connection.html#v:sendClose
withWSConnection' :: (String, Int, String) -> WS.ClientApp () -> IO () withWSConnection' :: (String, Int, String) -> WS.ClientApp () -> IO ()
withWSConnection' (host, port, path) cb = withWSConnection' (host, port, path) cb = Exc.catches (
WS.runClient host port path $ \conn -> do WS.runClient host port path $ \conn -> do
cb conn cb conn
-- shut down gracefully, otherwise a 'ConnectionException' is thrown
-- shut down gracefully, otherwise a 'ConnectionException' is thrown WS.sendClose conn ("" :: BS.ByteString)
WS.sendClose conn ("" :: BS.ByteString)
-- wait for close response, should throw a 'CloseRequest' exception -- wait for close response, should throw a 'CloseRequest' exception
Exc.catches (void $ WS.receiveDataMessage conn) void $ WS.receiveDataMessage conn
[ Exc.Handler $ \(err :: WS.ConnectionException) -> ) [ Exc.Handler $ \(err :: WS.ConnectionException) ->
case err of case err of
WS.CloseRequest _ _ -> putStrLn "[withWSConnection] closeRequest caught" WS.CloseRequest _ _ -> putStrLn $ "[withWSConnection] CloseRequest caught"
_ -> Exc.throw err -- WS.ConnectionClosed -> putStrLn $ "[withWSConnection] ConnectionClosed caught"
_ -> do
putStrLn $ "[withWSConnection] unexpected: " <> show err
Exc.throw err
-- re-throw any other exceptions -- re-throw any other exceptions
, Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ] , Exc.Handler $ \(err :: Exc.SomeException) -> Exc.throw err ]
...@@ -2,22 +2,17 @@ ...@@ -2,22 +2,17 @@
module Main where module Main where
import Gargantext.Prelude hiding (isInfixOf)
import Control.Concurrent.Async (asyncThreadId, withAsync)
import Control.Monad import Control.Monad
import Data.Text (isInfixOf) import Data.Text (isInfixOf)
import Gargantext.Core.Notifications.CentralExchange qualified as CE import Data.Text qualified as T
import Gargantext.Core.Notifications.Dispatcher qualified as D import Gargantext.Prelude hiding (isInfixOf)
import Gargantext.Core.Config.Types (NotificationsConfig(..))
import Shelly hiding (FilePath) import Shelly hiding (FilePath)
import System.IO import System.IO
import System.Process import System.Process
import Test.API qualified as API
import Test.Database.Operations qualified as DB
import Test.Hspec import Test.Hspec
import qualified Data.Text as T import Test.Server.ReverseProxy qualified as ReverseProxy
import qualified Test.API as API
import qualified Test.Database.Operations as DB
import qualified Test.Server.ReverseProxy as ReverseProxy
startCoreNLPServer :: IO ProcessHandle startCoreNLPServer :: IO ProcessHandle
...@@ -46,19 +41,6 @@ stopCoreNLPServer ph = do ...@@ -46,19 +41,6 @@ stopCoreNLPServer ph = do
putText "calling stop core nlp - done" putText "calling stop core nlp - done"
withNotifications :: ((NotificationsConfig, ThreadId, D.Dispatcher) -> IO a) -> IO a
withNotifications cb = do
withAsync (CE.gServer nc) $ \ceA -> do
D.withDispatcher nc $ \d -> do
cb (nc, asyncThreadId ceA, d)
nc :: NotificationsConfig
nc = NotificationsConfig { _nc_central_exchange_bind = "tcp://*:15560"
, _nc_central_exchange_connect = "tcp://localhost:15560"
, _nc_dispatcher_bind = "tcp://*:15561"
, _nc_dispatcher_connect = "tcp://localhost:15561" }
-- It's especially important to use Hspec for DB tests, because, -- It's especially important to use Hspec for DB tests, because,
-- unlike 'tasty', 'Hspec' has explicit control over parallelism, -- unlike 'tasty', 'Hspec' has explicit control over parallelism,
-- and it's important that DB tests are run according to a very -- and it's important that DB tests are run according to a very
...@@ -75,10 +57,8 @@ main = do ...@@ -75,10 +57,8 @@ main = do
hSetBuffering stdout NoBuffering hSetBuffering stdout NoBuffering
-- TODO Ideally remove start/stop notifications and use -- TODO Ideally remove start/stop notifications and use
-- Test/API/Setup to initialize this in env -- Test/API/Setup to initialize this in env
bracket startCoreNLPServer stopCoreNLPServer $ \_ -> do bracket startCoreNLPServer stopCoreNLPServer $ \_ -> hspec $ do
withNotifications $ \(nc, _ce, dispatcher) -> hspec $ do API.tests
API.tests nc dispatcher
hspec $ do
ReverseProxy.tests ReverseProxy.tests
DB.tests DB.tests
DB.nodeStoryTests DB.nodeStoryTests
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment