[worker] rewrite env a bit, wrap everything in notifications

parent 2faac790
Pipeline #6941 failed with stages
in 15 minutes and 25 seconds
...@@ -85,6 +85,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo = ...@@ -85,6 +85,7 @@ convertConfigs ini@(Ini.GargConfig { .. }) iniMail nlpConfig connInfo =
, _ac_scrapyd_url } , _ac_scrapyd_url }
, _gc_worker = WorkerSettings { _wsDefinitions = [ wd ] , _gc_worker = WorkerSettings { _wsDefinitions = [ wd ]
, _wsDefaultVisibilityTimeout = 1 , _wsDefaultVisibilityTimeout = 1
, _wsDefaultDelay = 0
, _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} } , _wsDatabase = connInfo { PGS.connectDatabase = "pgmq"} }
, _gc_log_level = LevelDebug , _gc_log_level = LevelDebug
} }
......
...@@ -196,7 +196,7 @@ source-repository-package ...@@ -196,7 +196,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 239a5eca1f11f802f4ae3cc1c80c390f7c6896ac tag: d3c0b658aae5dedce04f4f1605e4a6605efebd31
source-repository-package source-repository-package
type: git type: git
......
...@@ -146,6 +146,9 @@ FR = "spacy://localhost:8001" ...@@ -146,6 +146,9 @@ FR = "spacy://localhost:8001"
# preferred method over using defaultVt. # preferred method over using defaultVt.
default_visibility_timeout = 1 default_visibility_timeout = 1
# default delay before job is visible to the worker
default_delay = 0
# if you leave the same credentials as in [database] section above, # if you leave the same credentials as in [database] section above,
# workers will try to set up the `gargantext_pgmq` database # workers will try to set up the `gargantext_pgmq` database
# automatically # automatically
......
...@@ -51,7 +51,9 @@ import Gargantext.API.Routes.Named (API) ...@@ -51,7 +51,9 @@ import Gargantext.API.Routes.Named (API)
import Gargantext.API.Routes.Named.EKG import Gargantext.API.Routes.Named.EKG
import Gargantext.API.Server.Named (server) import Gargantext.API.Server.Named (server)
import Gargantext.Core.Config (gc_notifications_config, gc_frontend_config) import Gargantext.Core.Config (gc_notifications_config, gc_frontend_config)
import Gargantext.Core.Config.Types (CORSOrigin(..), CORSSettings, MicroServicesProxyStatus(..), NotificationsConfig(..), PortNumber, SettingsFile(..), corsAllowedOrigins, fc_cors, fc_cookie_settings, microServicesProxyStatus) import Gargantext.Core.Config.Types (CORSOrigin(..), CORSSettings, MicroServicesProxyStatus(..), NotificationsConfig(..), PortNumber, SettingsFile(..), corsAllowedOrigins, fc_appPort, fc_cors, fc_cookie_settings, microServicesProxyStatus)
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.Notifications (withNotifications)
import Gargantext.Database.Prelude qualified as DB import Gargantext.Database.Prelude qualified as DB
import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp) import Gargantext.MicroServices.ReverseProxy (microServicesProxyApp)
import Gargantext.Prelude hiding (putStrLn, to) import Gargantext.Prelude hiding (putStrLn, to)
...@@ -68,28 +70,33 @@ import System.Clock qualified as Clock ...@@ -68,28 +70,33 @@ import System.Clock qualified as Clock
import System.Cron.Schedule qualified as Cron import System.Cron.Schedule qualified as Cron
-- import System.FilePath -- import System.FilePath
-- | startGargantext takes as parameters port number and Ini file. -- | startGargantext takes as parameters port number and Toml file.
startGargantext :: Mode -> PortNumber -> SettingsFile -> IO () startGargantext :: Mode -> PortNumber -> SettingsFile -> IO ()
startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mode $ \logger -> do startGargantext mode port sf@(SettingsFile settingsFile) = withLoggerHoisted mode $ \logger -> do
env <- newEnv logger port sf config <- readConfig sf <&> (gc_frontend_config . fc_appPort) .~ port
let fc = env ^. env_config . gc_frontend_config when (port /= config ^. gc_frontend_config . fc_appPort) $
let proxyStatus = microServicesProxyStatus fc panicTrace "TODO: conflicting settings of port"
runDbCheck env let nc = config ^. gc_notifications_config
portRouteInfo (env ^. env_config . gc_notifications_config) port proxyStatus withNotifications nc $ \dispatcher -> do
app <- makeApp env env <- newEnv logger config dispatcher
mid <- makeGargMiddleware (fc ^. fc_cors) mode let fc = env ^. env_config . gc_frontend_config
periodicActions <- schedulePeriodicActions env let proxyStatus = microServicesProxyStatus fc
runDbCheck env
let runServer = run port (mid app) `finally` stopGargantext periodicActions portRouteInfo nc port proxyStatus
case proxyStatus of app <- makeApp env
PXY_disabled mid <- makeGargMiddleware (fc ^. fc_cors) mode
-> runServer -- the proxy is disabled, do not spawn the application periodicActions <- schedulePeriodicActions env
PXY_enabled proxyPort
-> do let runServer = run port (mid app) `finally` stopGargantext periodicActions
proxyCache <- InMemory.newCache (Just oneHour) case proxyStatus of
let runProxy = run proxyPort (mid (microServicesProxyApp proxyCache env)) PXY_disabled
Async.race_ runServer runProxy -> runServer -- the proxy is disabled, do not spawn the application
PXY_enabled proxyPort
-> do
proxyCache <- InMemory.newCache (Just oneHour)
let runProxy = run proxyPort (mid (microServicesProxyApp proxyCache env))
Async.race_ runServer runProxy
where runDbCheck env = do where runDbCheck env = do
r <- runExceptT (runReaderT DB.dbCheck env) `catch` r <- runExceptT (runReaderT DB.dbCheck env) `catch`
(\(err :: SomeException) -> pure $ Left err) (\(err :: SomeException) -> pure $ Left err)
......
...@@ -11,8 +11,6 @@ module Gargantext.API.Admin.EnvTypes ( ...@@ -11,8 +11,6 @@ module Gargantext.API.Admin.EnvTypes (
, env_config , env_config
, env_logger , env_logger
, env_manager , env_manager
, env_self_url
, env_central_exchange
, env_dispatcher , env_dispatcher
, env_jwt_settings , env_jwt_settings
, env_pool , env_pool
...@@ -51,7 +49,6 @@ import Gargantext.System.Logging ...@@ -51,7 +49,6 @@ import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..)) import Gargantext.Utils.Jobs.Monad (MonadJobStatus(..))
import Network.HTTP.Client (Manager) import Network.HTTP.Client (Manager)
import Servant.Auth.Server (JWTSettings) import Servant.Auth.Server (JWTSettings)
import Servant.Client (BaseUrl)
import System.Log.FastLogger qualified as FL import System.Log.FastLogger qualified as FL
data Mode = Dev | Mock | Prod data Mode = Dev | Mock | Prod
...@@ -98,9 +95,7 @@ data Env = Env ...@@ -98,9 +95,7 @@ data Env = Env
, _env_pool :: ~(Pool Connection) , _env_pool :: ~(Pool Connection)
, _env_nodeStory :: ~NodeStoryEnv , _env_nodeStory :: ~NodeStoryEnv
, _env_manager :: ~Manager , _env_manager :: ~Manager
, _env_self_url :: ~BaseUrl
, _env_config :: ~GargConfig , _env_config :: ~GargConfig
, _env_central_exchange :: ~ThreadId
, _env_dispatcher :: ~Dispatcher , _env_dispatcher :: ~Dispatcher
, _env_jwt_settings :: ~JWTSettings , _env_jwt_settings :: ~JWTSettings
} }
......
...@@ -18,9 +18,8 @@ TODO-SECURITY: Critical ...@@ -18,9 +18,8 @@ TODO-SECURITY: Critical
module Gargantext.API.Admin.Settings module Gargantext.API.Admin.Settings
where where
import Codec.Serialise (Serialise(), serialise) import Codec.Serialise (Serialise(), serialise)
import Control.Lens
import Control.Monad.Reader
import Data.ByteString.Lazy qualified as L import Data.ByteString.Lazy qualified as L
import Data.Pool (Pool) import Data.Pool (Pool)
import Data.Pool qualified as Pool import Data.Pool qualified as Pool
...@@ -28,16 +27,13 @@ import Database.PostgreSQL.Simple (Connection, connect, close, ConnectInfo) ...@@ -28,16 +27,13 @@ import Database.PostgreSQL.Simple (Connection, connect, close, ConnectInfo)
import Gargantext.API.Admin.EnvTypes import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.Dispatcher qualified as D import Gargantext.Core.Notifications.Dispatcher qualified as D
import Gargantext.Core.Config (GargConfig(..), gc_frontend_config) import Gargantext.Core.Config (GargConfig(..))
import Gargantext.Core.Config.Types (PortNumber, SettingsFile(..), fc_appPort, jwtSettings) import Gargantext.Core.Config.Types (jwtSettings)
import Gargantext.Core.Config.Utils (readConfig)
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging import Gargantext.System.Logging
import Network.HTTP.Client.TLS (newTlsManager) import Network.HTTP.Client.TLS (newTlsManager)
import Servant.Client (parseBaseUrl)
import System.Directory (renameFile) import System.Directory (renameFile)
import System.IO (hClose) import System.IO (hClose)
import System.IO.Temp (withTempFile) import System.IO.Temp (withTempFile)
...@@ -145,19 +141,15 @@ readRepoEnv repoDir = do ...@@ -145,19 +141,15 @@ readRepoEnv repoDir = do
pure $ RepoEnv { _renv_var = mvar, _renv_saver = saver, _renv_lock = lock } pure $ RepoEnv { _renv_var = mvar, _renv_saver = saver, _renv_lock = lock }
--} --}
newEnv :: Logger (GargM Env BackendInternalError) -> PortNumber -> SettingsFile -> IO Env newEnv :: Logger (GargM Env BackendInternalError) -> GargConfig -> D.Dispatcher -> IO Env
newEnv logger port settingsFile = do newEnv logger config dispatcher = do
!manager_env <- newTlsManager !manager_env <- newTlsManager
!config_env <- readConfig settingsFile <&> (gc_frontend_config . fc_appPort) .~ port -- TODO read from 'file'
when (port /= config_env ^. gc_frontend_config . fc_appPort) $
panicTrace "TODO: conflicting settings of port"
-- prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs") -- prios <- withLogger () $ \ioLogger -> Jobs.readPrios ioLogger (sf <> ".jobs")
-- let prios' = Jobs.applyPrios prios Jobs.defaultPrios -- let prios' = Jobs.applyPrios prios Jobs.defaultPrios
-- putStrLn ("Overrides: " <> show prios :: Text) -- putStrLn ("Overrides: " <> show prios :: Text)
-- putStrLn ("New priorities: " <> show prios' :: Text) -- putStrLn ("New priorities: " <> show prios' :: Text)
!self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port !pool <- newPool $ _gc_database_config config
!pool <- newPool $ _gc_database_config config_env
!nodeStory_env <- fromDBNodeStoryEnv pool !nodeStory_env <- fromDBNodeStoryEnv pool
-- secret <- Jobs.genSecret -- secret <- Jobs.genSecret
...@@ -165,22 +157,21 @@ newEnv logger port settingsFile = do ...@@ -165,22 +157,21 @@ newEnv logger port settingsFile = do
-- & Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout) -- & Jobs.l_jsJobTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_job_timeout)
-- & Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout) -- & Jobs.l_jsIDTimeout .~ (fromIntegral $ config_env ^. hasConfig ^. gc_jobs . jc_js_id_timeout)
!central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env) !_env_jwt_settings <- jwtSettings (_gc_secrets config)
!dispatcher <- D.newDispatcher (_gc_notifications_config config_env)
!_env_jwt_settings <- jwtSettings (_gc_secrets config_env)
{- An 'Env' by default doesn't have strict fields, but when constructing one in production --_central_exchange <- forkIO $ CE.gServer (_gc_notifications_config config_env)
we want to force them to WHNF to avoid accumulating unnecessary thunks.
-}
{- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks.
-}
pure $ Env pure $ Env
{ _env_logger = logger { _env_logger = logger
, _env_pool = pool , _env_pool = pool
, _env_nodeStory = nodeStory_env , _env_nodeStory = nodeStory_env
, _env_manager = manager_env , _env_manager = manager_env
, _env_self_url = self_url_env , _env_config = config
, _env_config = config_env
, _env_central_exchange = central_exchange
, _env_dispatcher = dispatcher , _env_dispatcher = dispatcher
, _env_jwt_settings , _env_jwt_settings
} }
......
{-|
Module : Gargantext.API.Node.Corpus.New.Types
Description :
Copyright : (c) CNRS, 2017
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
module Gargantext.API.Node.Corpus.New.Types where module Gargantext.API.Node.Corpus.New.Types where
import Data.Aeson import Data.Aeson
......
{-|
Module : Gargantext.API.Named.Corpus
Description :
Copyright : (c) CNRS, 2017
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
module Gargantext.API.Routes.Named.Corpus ( module Gargantext.API.Routes.Named.Corpus (
......
{-|
Module : Gargantext.API.Routes.Named.List
Description :
Copyright : (c) CNRS, 2017
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
module Gargantext.API.Routes.Named.List ( module Gargantext.API.Routes.Named.List (
......
{-|
Module : Gargantext.API.Routes.Named.Private
Description :
Copyright : (c) CNRS, 2017
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
{-# LANGUAGE KindSignatures #-} {-# LANGUAGE KindSignatures #-}
......
...@@ -43,6 +43,9 @@ data WorkerSettings = ...@@ -43,6 +43,9 @@ data WorkerSettings =
-- You can set timeout for each job individually and this is the -- You can set timeout for each job individually and this is the
-- preferred method over using defaultVt. -- preferred method over using defaultVt.
, _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout , _wsDefaultVisibilityTimeout :: PGMQ.VisibilityTimeout
-- Default delay for jobs. This is useful in tests, so that we can
-- get a chance to set up proper watchers for job, given its id
, _wsDefaultDelay :: B.TimeoutS
, _wsDefinitions :: ![WorkerDefinition] , _wsDefinitions :: ![WorkerDefinition]
} deriving (Show, Eq) } deriving (Show, Eq)
instance FromValue WorkerSettings where instance FromValue WorkerSettings where
...@@ -50,15 +53,18 @@ instance FromValue WorkerSettings where ...@@ -50,15 +53,18 @@ instance FromValue WorkerSettings where
dbConfig <- reqKey "database" dbConfig <- reqKey "database"
_wsDefinitions <- reqKey "definitions" _wsDefinitions <- reqKey "definitions"
_wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout" _wsDefaultVisibilityTimeout <- reqKey "default_visibility_timeout"
defaultDelay <- reqKey "default_delay"
return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig return $ WorkerSettings { _wsDatabase = unTOMLConnectInfo dbConfig
, _wsDefinitions , _wsDefinitions
, _wsDefaultVisibilityTimeout } , _wsDefaultVisibilityTimeout
, _wsDefaultDelay = B.TimeoutS defaultDelay }
instance ToValue WorkerSettings where instance ToValue WorkerSettings where
toValue = defaultTableToValue toValue = defaultTableToValue
instance ToTable WorkerSettings where instance ToTable WorkerSettings where
toTable (WorkerSettings { .. }) = toTable (WorkerSettings { .. }) =
table [ "database" .= TOMLConnectInfo _wsDatabase table [ "database" .= TOMLConnectInfo _wsDatabase
, "default_visibility_timeout" .= _wsDefaultVisibilityTimeout , "default_visibility_timeout" .= _wsDefaultVisibilityTimeout
, "default_delay" .= B._TimeoutS _wsDefaultDelay
, "definitions" .= _wsDefinitions ] , "definitions" .= _wsDefinitions ]
data WorkerDefinition = data WorkerDefinition =
......
...@@ -8,136 +8,18 @@ Stability : experimental ...@@ -8,136 +8,18 @@ Stability : experimental
Portability : POSIX Portability : POSIX
-} -}
{-# OPTIONS_GHC -Wno-deprecations #-} -- FIXME(cgenie) undefined remains in code
module Gargantext.Core.Notifications module Gargantext.Core.Notifications
where where
import Gargantext.Core.Types (NodeId, UserId) import Gargantext.Core.Config.Types (NotificationsConfig)
import Gargantext.Core.Notifications.CentralExchange qualified as CE
import Gargantext.Core.Notifications.Dispatcher qualified as D
import Protolude import Protolude
{- withNotifications :: NotificationsConfig -> (D.Dispatcher -> IO a) -> IO a
withNotifications nc cb =
Please note that we have 2 different notification mechanisms: D.withDispatcher nc $ \dispatcher -> do
- external (i.e. WebSocket or SSE connection to the frontend) withAsync (CE.gServer nc) $ \_ce -> do
- internal (e.g. job workers would like to report either progress or cb dispatcher
that some node changed in the tree)
I imagine the workflow as follows (this is a mix of internal and
external communication):
- somewhere in the code (or in the async job worker) we decide to send
an update message to all interested users
- such an action (UserAction) can be associated with the triggering
user (but doesn't have to be)
- we compute interested users for given notification
- we broadcast (using our broker) these notifications to all
interested users
- the broadcast message is either simple (meaning: hey, we have new
data, if you want you can send an update request) or we could send
the changed data already
On the client side it looks more or less like this (external
communication):
- specific components decide to subscribe to specific
UserNotifications: task component is interested in running tasks (for
given node id), tree component is interested in the tree and its
first-level children (same for the children components)
- the components react to events accordingly (usually by pulling in
new data)
Thus, for example, the triple (user_id, node_id, "update_tree")
defines a "update tree for given user and given node" subscription to
this event, both for server and client. This triple is then the
"touching point" between client and server. Through that point, update
messages are sent from server.
Subscription to topics is important IMHO because it allows to target
clients directly rather than broadcasting messages to everyone. This
reduces latency and is more secure. At the same time it is a bit more
complicated because we need to agree on the topic schema both on
server and client.
As for internal communication, we don't need topics: we always want to
get all notifications and process them accordingly (send messages to
connected users, ignore any messages that would be sent to
non-connected users).
-}
-------------------------
-- EXTERNAL COMMUNICATION
-------------------------
-- | Various topics that users can subscribe to
data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
-- UpdateJob JobID
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
UpdateTree NodeId
deriving (Eq, Show)
-- TODO: I'm not sure if UserAction/UserSource is needed. I initially
-- created that to mark who initiated the action, but I think we don't
-- need it.
--
-- Suppose we send an 'UpdateTree node_id' message: from the DB we can
-- infer all users that are associated with that node (I do keep in
-- mind that we can share nodes to other users).
data UserSource =
USUser UserId
| USSystem
deriving (Eq, Show)
-- | Action possibly associated with user who triggered it (there can
-- be system actions as well)
data UserAction =
UserAction UserSource Topic
deriving (Eq, Show)
-- | Represents a notification that goes to a given user. This is
-- directly sent via WebSockets.
--
-- NOTE: Do we need public notifications? I.e. sent out to non-logged
-- in users?
data UserNotification =
UserNotification UserId UserAction
deriving (Eq, Show)
-- | What we want now is, given a UserAction action, generate all
-- interested users to which the notification will be sent.
-- This function lives in a monad because we have to fetch users
-- from DB.
notificationsForUserAction :: UserAction -> m [ UserNotification ]
notificationsForUserAction = undefined
-- | A connected user can be either associated with his UserId or
-- don't have it, since he's not logged in (for public messages).
data ConnectedUser =
CUUser UserId
| CUPublic
deriving (Eq, Show)
-- | Stores connection type associated with given user, subscribed to
-- | a given topic.
--
-- We probably should set conn = Servant.API.WebSocket.Connection
data Subscription conn =
Subscription ConnectedUser conn Topic
-- | Given a UserNotification and all subscriptions, send it to all
-- matching ones. Possibly we could make this function as part of a
-- typeclass so that we can decide how to send the notification
-- based on whether we choose pure WebSockets, NATS or something
-- else.
sendNotification :: UserNotification -> [ Subscription conn ] -> m ()
sendNotification = undefined
...@@ -49,7 +49,11 @@ gServer :: NotificationsConfig -> IO () ...@@ -49,7 +49,11 @@ gServer :: NotificationsConfig -> IO ()
gServer (NotificationsConfig { .. }) = do gServer (NotificationsConfig { .. }) = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do withSocket Push $ \s_dispatcher -> do
withLogger () $ \ioLogger -> do
logMsg ioLogger DDEBUG $ "[central_exchange] binding to " <> T.unpack _nc_central_exchange_bind
_ <- bind s $ T.unpack _nc_central_exchange_bind _ <- bind s $ T.unpack _nc_central_exchange_bind
withLogger () $ \ioLogger -> do
logMsg ioLogger DDEBUG $ "[central_exchange] connecting to " <> T.unpack _nc_dispatcher_bind
_ <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect _ <- connect s_dispatcher $ T.unpack _nc_dispatcher_connect
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
......
...@@ -16,8 +16,6 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -16,8 +16,6 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.Notifications.Dispatcher ( module Gargantext.Core.Notifications.Dispatcher (
Dispatcher -- opaque Dispatcher -- opaque
, newDispatcher
, terminateDispatcher
, withDispatcher , withDispatcher
-- * Querying a dispatcher -- * Querying a dispatcher
...@@ -52,33 +50,17 @@ Dispatcher is a service, which provides couple of functionalities: ...@@ -52,33 +50,17 @@ Dispatcher is a service, which provides couple of functionalities:
data Dispatcher = data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription Dispatcher { d_subscriptions :: SSet.Set Subscription
, d_ce_listener :: ThreadId
} }
terminateDispatcher :: Dispatcher -> IO ()
terminateDispatcher = killThread . d_ce_listener
dispatcherSubscriptions :: Dispatcher -> SSet.Set Subscription dispatcherSubscriptions :: Dispatcher -> SSet.Set Subscription
dispatcherSubscriptions = d_subscriptions dispatcherSubscriptions = d_subscriptions
newDispatcher :: NotificationsConfig -> IO Dispatcher
newDispatcher nc = do
subscriptions <- SSet.newIO
-- let server = wsServer authSettings subscriptions
d_ce_listener <- forkIO (dispatcherListener nc subscriptions)
pure $ Dispatcher { d_subscriptions = subscriptions
, d_ce_listener = d_ce_listener }
withDispatcher :: NotificationsConfig -> (Dispatcher -> IO a) -> IO a withDispatcher :: NotificationsConfig -> (Dispatcher -> IO a) -> IO a
withDispatcher nc cb = do withDispatcher nc cb = do
subscriptions <- SSet.newIO subscriptions <- SSet.newIO
Async.withAsync (dispatcherListener nc subscriptions) $ \a -> do Async.withAsync (dispatcherListener nc subscriptions) $ \_a -> do
let dispatcher = Dispatcher { d_subscriptions = subscriptions let dispatcher = Dispatcher { d_subscriptions = subscriptions }
, d_ce_listener = Async.asyncThreadId a }
cb dispatcher cb dispatcher
...@@ -88,6 +70,8 @@ withDispatcher nc cb = do ...@@ -88,6 +70,8 @@ withDispatcher nc cb = do
dispatcherListener :: NotificationsConfig -> SSet.Set Subscription -> IO () dispatcherListener :: NotificationsConfig -> SSet.Set Subscription -> IO ()
dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions = do dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions = do
withSocket Pull $ \s -> do withSocket Pull $ \s -> do
withLogger () $ \ioLogger -> do
logMsg ioLogger DDEBUG $ "[dispatcherListener] binding to " <> T.unpack _nc_dispatcher_bind
_ <- bind s $ T.unpack _nc_dispatcher_bind _ <- bind s $ T.unpack _nc_dispatcher_bind
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
...@@ -117,7 +101,8 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions = ...@@ -117,7 +101,8 @@ dispatcherListener (NotificationsConfig { _nc_dispatcher_bind }) subscriptions =
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange" logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do Just ceMessage -> do
-- putText $ "[dispatcher_listener] received message: " <> show ceMessage withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions -- subs <- atomically $ readTVar subscriptions
filteredSubs <- atomically $ do filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
......
...@@ -90,6 +90,7 @@ wsLoop jwtS subscriptions ws = flip finally disconnect $ do ...@@ -90,6 +90,7 @@ wsLoop jwtS subscriptions ws = flip finally disconnect $ do
logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm' logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm'
return user return user
Just (WSSubscribe topic) -> do Just (WSSubscribe topic) -> do
logMsg ioLogger DEBUG $ "[wsLoop'] subscribe topic " <> show topic
-- TODO Fix s_connected_user based on header -- TODO Fix s_connected_user based on header
let sub = Subscription { s_connected_user = user let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws , s_ws_key_connection = ws
...@@ -98,6 +99,7 @@ wsLoop jwtS subscriptions ws = flip finally disconnect $ do ...@@ -98,6 +99,7 @@ wsLoop jwtS subscriptions ws = flip finally disconnect $ do
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss) -- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
return user return user
Just (WSUnsubscribe topic) -> do Just (WSUnsubscribe topic) -> do
logMsg ioLogger DEBUG $ "[wsLoop'] unsubscribe topic " <> show topic
let sub = Subscription { s_connected_user = user let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws , s_ws_key_connection = ws
, s_topic = topic } , s_topic = topic }
......
...@@ -75,10 +75,11 @@ notifyJobStarted :: HasWorkerBroker ...@@ -75,10 +75,11 @@ notifyJobStarted :: HasWorkerBroker
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
notifyJobStarted env (W.State { name }) bm = do notifyJobStarted env (W.State { name }) bm = do
let mId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobStarted] [" <> name <> "] starting job: " <> show j logMsg ioL DEBUG $ "[notifyJobStarted] [" <> name <> " :: " <> show mId <> "] starting job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
...@@ -90,10 +91,11 @@ notifyJobFinished :: HasWorkerBroker ...@@ -90,10 +91,11 @@ notifyJobFinished :: HasWorkerBroker
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
notifyJobFinished env (W.State { name }) bm = do notifyJobFinished env (W.State { name }) bm = do
let mId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL DEBUG $ "[notifyJobFinished] [" <> name <> "] finished job: " <> show j logMsg ioL DEBUG $ "[notifyJobFinished] [" <> name <> " :: " <> show mId <> "] finished job: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
...@@ -105,10 +107,11 @@ notifyJobTimeout :: HasWorkerBroker ...@@ -105,10 +107,11 @@ notifyJobTimeout :: HasWorkerBroker
-> BrokerMessage -> BrokerMessage
-> IO () -> IO ()
notifyJobTimeout env (W.State { name }) bm = do notifyJobTimeout env (W.State { name }) bm = do
let mId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobTimeout] [" <> name <> "] job timed out: " <> show j logMsg ioL ERROR $ "[notifyJobTimeout] [" <> name <> " :: " <> show mId <> "] job timed out: " <> show j
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
...@@ -121,10 +124,11 @@ notifyJobFailed :: (HasWorkerBroker, HasCallStack) ...@@ -121,10 +124,11 @@ notifyJobFailed :: (HasWorkerBroker, HasCallStack)
-> SomeException -> SomeException
-> IO () -> IO ()
notifyJobFailed env (W.State { name }) bm exc = do notifyJobFailed env (W.State { name }) bm exc = do
let mId = messageId bm
let j = toA $ getMessage bm let j = toA $ getMessage bm
let job = W.job j let job = W.job j
withLogger () $ \ioL -> withLogger () $ \ioL ->
logMsg ioL ERROR $ "[notifyJobFailed] [" <> name <> "] failed job: " <> show j <> " --- ERROR: " <> show exc logMsg ioL ERROR $ "[notifyJobFailed] [" <> name <> " :: " <> show mId <> "] failed job: " <> show j <> " --- ERROR: " <> show exc
let ji = JobInfo { _ji_message_id = messageId bm let ji = JobInfo { _ji_message_id = messageId bm
, _ji_mNode_id = getWorkerMNodeId job } , _ji_mNode_id = getWorkerMNodeId job }
let jh = WorkerJobHandle { _w_job_info = ji } let jh = WorkerJobHandle { _w_job_info = ji }
......
...@@ -53,6 +53,7 @@ import System.Log.FastLogger qualified as FL ...@@ -53,6 +53,7 @@ import System.Log.FastLogger qualified as FL
data WorkerEnv = WorkerEnv data WorkerEnv = WorkerEnv
{ _w_env_config :: ~GargConfig { _w_env_config :: ~GargConfig
, _w_env_logger :: ~(Logger (GargM WorkerEnv IOException)) , _w_env_logger :: ~(Logger (GargM WorkerEnv IOException))
-- the pool is a pool for gargantext db, not pgmq db!
, _w_env_pool :: ~(Pool.Pool PSQL.Connection) , _w_env_pool :: ~(Pool.Pool PSQL.Connection)
, _w_env_nodeStory :: ~NodeStoryEnv , _w_env_nodeStory :: ~NodeStoryEnv
, _w_env_mail :: ~Mail.MailConfig , _w_env_mail :: ~Mail.MailConfig
......
...@@ -31,7 +31,7 @@ sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env) ...@@ -31,7 +31,7 @@ sendJob :: (HasWorkerBroker PGMQBroker Job, HasConfig env)
-> Cmd' env err (MessageId PGMQBroker) -> Cmd' env err (MessageId PGMQBroker)
sendJob job = do sendJob job = do
gcConfig <- view $ hasConfig gcConfig <- view $ hasConfig
let WorkerSettings { _wsDefinitions } = gcConfig ^. gc_worker let WorkerSettings { _wsDefinitions, _wsDefaultDelay } = gcConfig ^. gc_worker
-- TODO Try to guess which worker should get this job -- TODO Try to guess which worker should get this job
-- let mWd = findDefinitionByName ws workerName -- let mWd = findDefinitionByName ws workerName
let mWd = head _wsDefinitions let mWd = head _wsDefinitions
...@@ -40,7 +40,9 @@ sendJob job = do ...@@ -40,7 +40,9 @@ sendJob job = do
Just wd -> liftBase $ do Just wd -> liftBase $ do
b <- initBrokerWithDBCreate gcConfig b <- initBrokerWithDBCreate gcConfig
let queueName = _wdQueue wd let queueName = _wdQueue wd
W.sendJob' $ updateJobData job $ W.mkDefaultSendJob' b queueName job let job' = (updateJobData job $ W.mkDefaultSendJob' b queueName job) { W.delay = _wsDefaultDelay }
putText $ "[sendJob] sending job " <> show job <> " (delay " <> show (W.delay job') <> ")"
W.sendJob' job'
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> W.SendJob PGMQBroker Job -> W.SendJob PGMQBroker Job updateJobData :: Job -> W.SendJob PGMQBroker Job -> W.SendJob PGMQBroker Job
......
...@@ -223,4 +223,3 @@ getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id ...@@ -223,4 +223,3 @@ getWorkerMNodeId (PostNodeAsync { _pna_node_id }) = Just _pna_node_id
getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id getWorkerMNodeId (RecomputeGraph { _rg_node_id }) = Just _rg_node_id
getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id getWorkerMNodeId (UpdateNode { _un_node_id }) = Just _un_node_id
getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id getWorkerMNodeId (UploadDocument { _ud_node_id }) = Just _ud_node_id
...@@ -13,12 +13,13 @@ module Gargantext.System.Logging ( ...@@ -13,12 +13,13 @@ module Gargantext.System.Logging (
, withLoggerHoisted , withLoggerHoisted
) where ) where
import Language.Haskell.TH hiding (Type)
import Control.Exception.Safe (MonadMask, bracket) import Control.Exception.Safe (MonadMask, bracket)
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Control import Control.Monad.Trans.Control
import Data.Text qualified as T
import Data.Kind (Type) import Data.Kind (Type)
import Data.Text qualified as T
import Data.Time.Clock (getCurrentTime)
import Language.Haskell.TH hiding (Type)
import Language.Haskell.TH.Syntax qualified as TH import Language.Haskell.TH.Syntax qualified as TH
import Prelude import Prelude
import System.Environment (lookupEnv) import System.Environment (lookupEnv)
...@@ -140,9 +141,10 @@ instance HasLogger IO where ...@@ -140,9 +141,10 @@ instance HasLogger IO where
pure $ IOLogger lvl pure $ IOLogger lvl
destroyLogger _ = pure () destroyLogger _ = pure ()
logMsg (IOLogger minLvl) lvl msg = do logMsg (IOLogger minLvl) lvl msg = do
t <- getCurrentTime
if lvl < minLvl if lvl < minLvl
then pure () then pure ()
else do else do
let pfx = "[" <> show lvl <> "] " let pfx = "[" <> show t <> "] [" <> show lvl <> "] "
putStrLn $ pfx <> msg putStrLn $ pfx <> msg
logTxt lgr lvl msg = logMsg lgr lvl (T.unpack msg) logTxt lgr lvl msg = logMsg lgr lvl (T.unpack msg)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment