[websockets] send job progress in notification

Also, throttle messages to avoid flooding the client
parent 39f8f17d
Pipeline #6471 failed with stages
in 10 minutes and 23 seconds
...@@ -108,7 +108,7 @@ source-repository-package ...@@ -108,7 +108,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/crawlers/openalex.git location: https://gitlab.iscpif.fr/gargantext/crawlers/openalex.git
tag: c2114adb0382770e419e5a7ae1b3a1ee5b09ee50 tag: e805de664576030e687f4e72e14d2eb3a20dc8a1
source-repository-package source-repository-package
type: git type: git
...@@ -185,6 +185,12 @@ source-repository-package ...@@ -185,6 +185,12 @@ source-repository-package
location: https://github.com/fpringle/servant-routes.git location: https://github.com/fpringle/servant-routes.git
tag: 7694f62af6bc1596d754b42af16da131ac403b3a tag: 7694f62af6bc1596d754b42af16da131ac403b3a
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-throttle
tag: 02f5ed9ee2d6cce45161addf945b88bc6adf9059
allow-older: * allow-older: *
allow-newer: * allow-newer: *
......
...@@ -571,6 +571,7 @@ library ...@@ -571,6 +571,7 @@ library
, graphviz ^>= 2999.20.1.0 , graphviz ^>= 2999.20.1.0
, hashable ^>= 1.3.0.0 , hashable ^>= 1.3.0.0
, haskell-igraph ^>= 0.10.4 , haskell-igraph ^>= 0.10.4
, haskell-throttle
, hlcm ^>= 0.2.2 , hlcm ^>= 0.2.2
, hsinfomap ^>= 0.1 , hsinfomap ^>= 0.1
, hsparql ^>= 0.3.8 , hsparql ^>= 0.3.8
......
...@@ -49,6 +49,7 @@ import Gargantext.Prelude ...@@ -49,6 +49,7 @@ import Gargantext.Prelude
import Gargantext.Core.Config (GargConfig(..)) import Gargantext.Core.Config (GargConfig(..))
import Gargantext.Core.Config.Mail (MailConfig) import Gargantext.Core.Config.Mail (MailConfig)
import Gargantext.System.Logging import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal (pollJob)
import Gargantext.Utils.Jobs.Map (LoggerM, J(..), jTask, rjGetLog) import Gargantext.Utils.Jobs.Map (LoggerM, J(..), jTask, rjGetLog)
import Gargantext.Utils.Jobs.Monad qualified as Jobs import Gargantext.Utils.Jobs.Monad qualified as Jobs
import Network.HTTP.Client (Manager) import Network.HTTP.Client (Manager)
...@@ -56,6 +57,7 @@ import Servant.Client (BaseUrl) ...@@ -56,6 +57,7 @@ import Servant.Client (BaseUrl)
import Servant.Job.Async (HasJobEnv(..), Job) import Servant.Job.Async (HasJobEnv(..), Job)
import Servant.Job.Async qualified as SJ import Servant.Job.Async qualified as SJ
import Servant.Job.Core qualified import Servant.Job.Core qualified
import Servant.Job.Types qualified as SJ
import System.Log.FastLogger qualified as FL import System.Log.FastLogger qualified as FL
data Mode = Dev | Mock | Prod data Mode = Dev | Mock | Prod
...@@ -197,11 +199,15 @@ updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do ...@@ -197,11 +199,15 @@ updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do
jobLog <- Jobs.getLatestJobStatus hdl jobLog <- Jobs.getLatestJobStatus hdl
let jobLogNew = updateJobStatus jobLog let jobLogNew = updateJobStatus jobLog
logStatus jobLogNew logStatus jobLogNew
CET.ce_notify $ CET.UpdateJobProgress jId jobLogNew mJb <- Jobs.findJob jId
-- mJob <- Jobs.findJob jId case mJb of
-- case mJob of Nothing -> pure ()
-- Nothing -> pure () Just je -> do
-- Just job -> liftBase $ CE.ce_notify $ CET.UpdateJobProgress jId job -- We use the same endpoint as the one for polling jobs via
-- API. This way we can send the job status directly in the
-- notification
j <- pollJob (Just $ SJ.Limit 1) Nothing jId je
CET.ce_notify $ CET.UpdateJobProgress j
instance Jobs.MonadJobStatus (GargM Env err) where instance Jobs.MonadJobStatus (GargM Env err) where
......
...@@ -64,8 +64,8 @@ gServer = do ...@@ -64,8 +64,8 @@ gServer = do
forever $ do forever $ do
r <- atomically $ TChan.readTChan tChan r <- atomically $ TChan.readTChan tChan
case Aeson.decode (BSL.fromStrict r) of case Aeson.decode (BSL.fromStrict r) of
Just ujp@(UpdateJobProgress _jId _jobLog) -> do Just _ujp@(UpdateJobProgress _s) -> do
logMsg ioLogger DEBUG $ "[central_exchange] " <> show ujp -- logMsg ioLogger DEBUG $ "[central_exchange] " <> show ujp
-- send the same message that we received -- send the same message that we received
send s_dispatcher r send s_dispatcher r
Just (UpdateTreeFirstLevel node_id) -> do Just (UpdateTreeFirstLevel node_id) -> do
......
...@@ -25,7 +25,7 @@ import Gargantext.Core.Types (NodeId) ...@@ -25,7 +25,7 @@ import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude import Gargantext.Prelude
import Prelude qualified import Prelude qualified
import Servant.Job.Core (Safety(Safe)) import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobID) import Servant.Job.Types (JobStatus)
{- {-
...@@ -37,31 +37,27 @@ various events). ...@@ -37,31 +37,27 @@ various events).
-- INTERNAL MESSAGES -- INTERNAL MESSAGES
data CEMessage = data CEMessage =
-- UpdateJobProgress (JobID 'Safe) (JM.JobEntry (JobID 'Safe) (Seq JobLog) JobLog) UpdateJobProgress (JobStatus 'Safe JobLog)
UpdateJobProgress (JobID 'Safe) JobLog
| UpdateTreeFirstLevel NodeId | UpdateTreeFirstLevel NodeId
deriving (Eq)
instance Prelude.Show CEMessage where instance Prelude.Show CEMessage where
show (UpdateJobProgress jId jobLog) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) <> " " <> show jobLog show (UpdateJobProgress js) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode js)
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
instance FromJSON CEMessage where instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type" type_ <- o .: "type"
case type_ of case type_ of
"update_job_progress" -> do "update_job_progress" -> do
jId <- o .: "j_id" js <- o .: "js"
jobLog <- o .: "job_log" pure $ UpdateJobProgress js
pure $ UpdateJobProgress jId jobLog
"update_tree_first_level" -> do "update_tree_first_level" -> do
node_id <- o .: "node_id" node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id pure $ UpdateTreeFirstLevel node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where instance ToJSON CEMessage where
toJSON (UpdateJobProgress jId jobLog) = object [ toJSON (UpdateJobProgress js) = object [
"type" .= toJSON ("update_job_progress" :: Text) "type" .= toJSON ("update_job_progress" :: Text)
, "j_id" .= toJSON jId , "js" .= toJSON js
, "job_log" .= toJSON jobLog
] ]
toJSON (UpdateTreeFirstLevel node_id) = object [ toJSON (UpdateTreeFirstLevel node_id) = object [
"type" .= toJSON ("update_tree_first_level" :: Text) "type" .= toJSON ("update_tree_first_level" :: Text)
......
...@@ -20,6 +20,7 @@ module Gargantext.Core.AsyncUpdates.Dispatcher where ...@@ -20,6 +20,7 @@ module Gargantext.Core.AsyncUpdates.Dispatcher where
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TChan qualified as TChan
import Control.Concurrent.Throttle (throttle)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
...@@ -30,7 +31,8 @@ import Gargantext.Prelude ...@@ -30,7 +31,8 @@ import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg) import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket) import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set as SSet import Servant.Job.Types (JobStatus(_job_id))
import StmContainers.Set qualified as SSet
{- {-
...@@ -66,17 +68,20 @@ dispatcherListener subscriptions = do ...@@ -66,17 +68,20 @@ dispatcherListener subscriptions = do
tChan <- TChan.newTChanIO tChan <- TChan.newTChanIO
throttleTChan <- TChan.newTChanIO
-- NOTE I'm not sure that we need more than 1 worker here, but in -- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan) $ do Async.withAsync (throttle 500 throttleTChan sendDataMessageThrottled) $ \_ -> do
forever $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
-- putText "[dispatcher_listener] receiving" forever $ do
r <- recvMalloc s 1024 -- putText "[dispatcher_listener] receiving"
-- C.putStrLn $ "[dispatcher_listener] " <> r r <- recvMalloc s 1024
atomically $ TChan.writeTChan tChan r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r
where where
worker tChan = do worker tChan throttleTChan = do
-- tId <- myThreadId -- tId <- myThreadId
forever $ do forever $ do
...@@ -100,22 +105,30 @@ dispatcherListener subscriptions = do ...@@ -100,22 +105,30 @@ dispatcherListener subscriptions = do
-- one drops in the meantime, it won't listen to what we -- one drops in the meantime, it won't listen to what we
-- send...) -- send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs -- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification ceMessage) filteredSubs mapM_ (sendNotification throttleTChan ceMessage) filteredSubs
sendNotification :: CETypes.CEMessage -> Subscription -> IO () -- | When processing tasks such as Flow, we can generate quite a few
sendNotification ceMessage sub = do -- notifications in a short time. We want to limit this with throttle
let ws = s_ws_key_connection sub -- tchan.
let topic = s_topic sub sendNotification :: TChan.TChan ((ByteString, Topic), (WS.Connection, WS.DataMessage))
notification <- -> CETypes.CEMessage
case ceMessage of -> Subscription
CETypes.UpdateJobProgress _jId jobLog -> do -> IO ()
-- js <- getLatestJobStatus jId sendNotification throttleTChan ceMessage sub = do
-- putText $ "[sendNotification] latestJobStatus" js let ws = s_ws_key_connection sub
pure $ Notification topic (MJobProgress jobLog) let topic = s_topic sub
CETypes.UpdateTreeFirstLevel _nodeId -> pure $ Notification topic MEmpty notification <-
-- TODO send the same thing to everyone for now, this should be case ceMessage of
-- converted to notifications CETypes.UpdateJobProgress jobStatus -> do
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode notification) Nothing) pure $ Notification topic (MJobProgress jobStatus)
CETypes.UpdateTreeFirstLevel _nodeId -> pure $ Notification topic MEmpty
let id' = (wsKey ws, topic)
atomically $ TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
sendDataMessageThrottled :: (WS.Connection, WS.DataMessage) -> IO ()
sendDataMessageThrottled (conn, msg) =
WS.sendDataMessage conn msg
-- Custom filtering of list of Subscriptions based on -- Custom filtering of list of Subscriptions based on
...@@ -126,7 +139,7 @@ filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription] ...@@ -126,7 +139,7 @@ filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions
ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool
ceMessageSubPred (CETypes.UpdateJobProgress jId _jobLog) (Subscription { s_topic }) = ceMessageSubPred (CETypes.UpdateJobProgress js) (Subscription { s_topic }) =
s_topic == UpdateJobProgress jId s_topic == (UpdateJobProgress $ _job_id js)
ceMessageSubPred (CETypes.UpdateTreeFirstLevel node_id) (Subscription { s_topic }) = ceMessageSubPred (CETypes.UpdateTreeFirstLevel node_id) (Subscription { s_topic }) =
s_topic == UpdateTree node_id s_topic == UpdateTree node_id
...@@ -47,7 +47,7 @@ import Servant ...@@ -47,7 +47,7 @@ import Servant
import Servant.API.WebSocket qualified as WS import Servant.API.WebSocket qualified as WS
import Servant.Auth.Server (verifyJWT) import Servant.Auth.Server (verifyJWT)
import Servant.Job.Core (Safety(Safe)) import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobID) import Servant.Job.Types (JobID, JobStatus)
import Servant.Server.Generic (AsServer, AsServerT) import Servant.Server.Generic (AsServer, AsServerT)
import StmContainers.Set as SSet import StmContainers.Set as SSet
...@@ -62,7 +62,7 @@ data Topic = ...@@ -62,7 +62,7 @@ data Topic =
-- | Given parent node id, trigger update of the node and its -- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus) -- children (e.g. list is automatically created in a corpus)
| UpdateTree NodeId | UpdateTree NodeId
deriving (Eq) deriving (Eq, Ord)
instance Prelude.Show Topic where instance Prelude.Show Topic where
show (UpdateJobProgress jId) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) show (UpdateJobProgress jId) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId)
show (UpdateTree nodeId) = "UpdateTree " <> show nodeId show (UpdateTree nodeId) = "UpdateTree " <> show nodeId
...@@ -92,16 +92,15 @@ instance ToJSON Topic where ...@@ -92,16 +92,15 @@ instance ToJSON Topic where
-- | A message to be sent inside a Notification -- | A message to be sent inside a Notification
data Message = data Message =
MJobProgress JobLog MJobProgress (JobStatus 'Safe JobLog)
| MEmpty | MEmpty
deriving (Eq)
instance Prelude.Show Message where instance Prelude.Show Message where
show (MJobProgress jobProgress) = "MJobProgress " <> show jobProgress show (MJobProgress jobStatus) = "MJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jobStatus)
show MEmpty = "MEmpty" show MEmpty = "MEmpty"
instance ToJSON Message where instance ToJSON Message where
toJSON (MJobProgress jobProgress) = Aeson.object [ toJSON (MJobProgress jobStatus) = Aeson.object [
"type" .= toJSON ("MJobProgress" :: Text) "type" .= toJSON ("MJobProgress" :: Text)
, "job_progress" .= toJSON jobProgress , "job_status" .= toJSON jobStatus
] ]
toJSON MEmpty = Aeson.object [ toJSON MEmpty = Aeson.object [
"type" .= toJSON ("MEmpty" :: Text) "type" .= toJSON ("MEmpty" :: Text)
...@@ -202,7 +201,7 @@ class HasDispatcher env where ...@@ -202,7 +201,7 @@ class HasDispatcher env where
-- | A notification is sent to clients who subscribed to specific topics -- | A notification is sent to clients who subscribed to specific topics
data Notification = data Notification =
Notification Topic Message Notification Topic Message
deriving (Eq, Show) deriving (Show)
instance ToJSON Notification where instance ToJSON Notification where
toJSON (Notification topic message) = Aeson.object [ toJSON (Notification topic message) = Aeson.object [
......
...@@ -5,6 +5,7 @@ module Gargantext.Utils.Jobs.Internal ( ...@@ -5,6 +5,7 @@ module Gargantext.Utils.Jobs.Internal (
serveJobsAPI serveJobsAPI
-- * Internals for testing -- * Internals for testing
, newJob , newJob
, pollJob
) where ) where
import Control.Concurrent import Control.Concurrent
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment