[notifications] add option to notify users

Also, added Data.List.nubBy to limit to only 1 dispatched notification
per ws connection. This should fix a bug whereby a single user got
multiple notifications about the same thing.
parent a32547d8
Pipeline #7856 passed with stages
in 42 minutes
...@@ -573,3 +573,13 @@ or `inproc` (this limits us to inter-process communication). ...@@ -573,3 +573,13 @@ or `inproc` (this limits us to inter-process communication).
The `bind` part is for the server, the `connect` part is for the The `bind` part is for the server, the `connect` part is for the
clients connecting to that server. clients connecting to that server.
## Notifying users and debugging notifications
Since notifications are handled by nng, one can use `nngcat` to send
handcrafted messages for debugging purposes.
In particular, it is possible to notify individual users like this:
```shell
nngcat --push --connect tcp://127.0.0.1:5560 --data '{"user_id": 2, "message": "hello user1","type":"notify_user"}'
```
...@@ -103,6 +103,8 @@ gServer cfg = do ...@@ -103,6 +103,8 @@ gServer cfg = do
Just (UpdateWorkerProgress _ji _jl) -> do Just (UpdateWorkerProgress _ji _jl) -> do
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl -- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
sendTimeout nc ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Just (NotifyUser _uid _msg) -> do
sendTimeout nc ioLogger s_dispatcher r
Just Ping -> do Just Ping -> do
sendTimeout nc ioLogger s_dispatcher r sendTimeout nc ioLogger s_dispatcher r
Nothing -> Nothing ->
......
...@@ -20,6 +20,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch) ...@@ -20,6 +20,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.Orchestrator.Types (JobLog) import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.Core.Types (NodeId) import Gargantext.Core.Types (NodeId)
import Gargantext.Core.Worker.Types (JobInfo) import Gargantext.Core.Worker.Types (JobInfo)
import Gargantext.Database.Admin.Types.Node (UserId)
import Gargantext.Prelude import Gargantext.Prelude
import Prelude qualified import Prelude qualified
...@@ -40,11 +41,13 @@ data CEMessage = ...@@ -40,11 +41,13 @@ data CEMessage =
UpdateWorkerProgress JobInfo JobLog UpdateWorkerProgress JobInfo JobLog
-- | Update tree for given nodeId -- | Update tree for given nodeId
| UpdateTreeFirstLevel NodeId | UpdateTreeFirstLevel NodeId
| NotifyUser UserId Text
| Ping | Ping
instance Prelude.Show CEMessage where instance Prelude.Show CEMessage where
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl -- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl show (UpdateWorkerProgress ji jl) = "UpdateWorkerProgress " <> show ji <> " " <> show jl
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
show (NotifyUser userId msg) = "NotifyUser " <> show userId <> ", " <> show msg
show Ping = "Ping" show Ping = "Ping"
instance FromJSON CEMessage where instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do parseJSON = withObject "CEMessage" $ \o -> do
...@@ -59,6 +62,10 @@ instance FromJSON CEMessage where ...@@ -59,6 +62,10 @@ instance FromJSON CEMessage where
"update_tree_first_level" -> do "update_tree_first_level" -> do
node_id <- o .: "node_id" node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id pure $ UpdateTreeFirstLevel node_id
"notify_user" -> do
user_id <- o .: "user_id"
msg <- o .: "message"
pure $ NotifyUser user_id msg
"ping" -> pure Ping "ping" -> pure Ping
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where instance ToJSON CEMessage where
...@@ -72,8 +79,14 @@ instance ToJSON CEMessage where ...@@ -72,8 +79,14 @@ instance ToJSON CEMessage where
"type" .= ("update_tree_first_level" :: Text) "type" .= ("update_tree_first_level" :: Text)
, "node_id" .= nodeId , "node_id" .= nodeId
] ]
toJSON (NotifyUser userId msg) = object [
"type" .= ("notify_user" :: Text)
, "user_id" .= userId
, "message" .= msg
]
toJSON Ping = object [ "type" .= ("ping" :: Text) ] toJSON Ping = object [ "type" .= ("ping" :: Text) ]
class HasCentralExchangeNotification env where class HasCentralExchangeNotification env where
ce_notify :: (MonadReader env m, MonadBase IO m) => CEMessage -> m () ce_notify :: (MonadReader env m, MonadBase IO m) => CEMessage -> m ()
...@@ -29,6 +29,7 @@ import Control.Concurrent.STM.TChan qualified as TChan ...@@ -29,6 +29,7 @@ import Control.Concurrent.STM.TChan qualified as TChan
import Control.Concurrent.Throttle (throttle) import Control.Concurrent.Throttle (throttle)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.List (nubBy)
import Data.Text qualified as T import Data.Text qualified as T
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.Config import Gargantext.Core.Config
...@@ -107,7 +108,17 @@ dispatcherListener config subscriptions = do ...@@ -107,7 +108,17 @@ dispatcherListener config subscriptions = do
case Aeson.decode (BSL.fromStrict r) of case Aeson.decode (BSL.fromStrict r) of
Nothing -> Nothing ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange" logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
-- Just n@(CETypes.NotifyUser _userId _msg) -> do
-- -- A single user could have multiple subcriptions. We only
-- -- want to send one notification to each of this user's
-- -- browsers. That's why we have the 'WSKeyConnection' type
-- logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show n
-- -- subs <- atomically $ readTVar subscriptions
-- filteredSubs <- atomically $ do
-- let subs' = UnfoldlM.filter (pure . ceMessageSubPred n) $ SSet.unfoldlM subscriptions
-- UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs'
-- pure ()
Just ceMessage -> do Just ceMessage -> do
logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions -- subs <- atomically $ readTVar subscriptions
...@@ -122,7 +133,13 @@ dispatcherListener config subscriptions = do ...@@ -122,7 +133,13 @@ dispatcherListener config subscriptions = do
-- one drops in the meantime, it won't listen to what we -- one drops in the meantime, it won't listen to what we
-- send...) -- send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs -- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification throttleTChan ceMessage) filteredSubs
-- We do 'nubBy' because we want to send only 1 such
-- message to each connection, even if there are more
-- subscriptions from the same user (multiple subcriptions
-- could have matched the above 'ceMessageSubPred').
let uniqueConnections = nubBy (\a b -> s_ws_key_connection a == s_ws_key_connection b) filteredSubs
mapM_ (sendNotification throttleTChan ceMessage) uniqueConnections
-- | When processing tasks such as Flow, we can generate quite a few -- | When processing tasks such as Flow, we can generate quite a few
-- notifications in a short time. We want to limit this with throttle -- notifications in a short time. We want to limit this with throttle
...@@ -157,6 +174,8 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -157,6 +174,8 @@ sendNotification throttleTChan ceMessage sub = do
else Nothing else Nothing
(Ping, CETypes.Ping) -> (Ping, CETypes.Ping) ->
Just NPing Just NPing
(_, CETypes.NotifyUser userId msg) ->
Just $ NNotifyUser userId msg
_ -> Nothing _ -> Nothing
case mNotification of case mNotification of
...@@ -200,5 +219,7 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic } ...@@ -200,5 +219,7 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }
|| Just s_topic == (UpdateTree <$> _ji_mNode_id ji) || Just s_topic == (UpdateTree <$> _ji_mNode_id ji)
ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) = ceMessageSubPred (CETypes.UpdateTreeFirstLevel nodeId) (Subscription { s_topic }) =
s_topic == UpdateTree nodeId s_topic == UpdateTree nodeId
ceMessageSubPred (CETypes.NotifyUser userId _msg) (Subscription { s_connected_user }) =
s_connected_user == CUUser userId
ceMessageSubPred CETypes.Ping (Subscription { s_topic }) = ceMessageSubPred CETypes.Ping (Subscription { s_topic }) =
s_topic == Ping s_topic == Ping
...@@ -211,6 +211,7 @@ data Notification = ...@@ -211,6 +211,7 @@ data Notification =
| NUpdateTree NodeId | NUpdateTree NodeId
| NWorkerJobStarted NodeId JobInfo | NWorkerJobStarted NodeId JobInfo
| NWorkerJobFinished NodeId JobInfo | NWorkerJobFinished NodeId JobInfo
| NNotifyUser UserId Text
| NPing | NPing
instance Prelude.Show Notification where instance Prelude.Show Notification where
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog -- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
...@@ -218,6 +219,7 @@ instance Prelude.Show Notification where ...@@ -218,6 +219,7 @@ instance Prelude.Show Notification where
show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId show (NUpdateTree nodeId) = "NUpdateTree " <> show nodeId
show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji show (NWorkerJobStarted nodeId ji) = "NWorkerJobStarted " <> show nodeId <> ", " <> show ji
show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji show (NWorkerJobFinished nodeId ji) = "NWorkerJobFinished " <> show nodeId <> ", " <> show ji
show (NNotifyUser userId msg) = "NNotifyUser " <> show userId <> ", " <> show msg
show NPing = "NPing" show NPing = "NPing"
instance ToJSON Notification where instance ToJSON Notification where
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [ -- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
...@@ -241,6 +243,11 @@ instance ToJSON Notification where ...@@ -241,6 +243,11 @@ instance ToJSON Notification where
, "node_id" .= toJSON nodeId , "node_id" .= toJSON nodeId
, "ji" .= toJSON ji , "ji" .= toJSON ji
] ]
toJSON (NNotifyUser userId msg) = Aeson.object [
"type" .= ("notify_user" :: Text)
, "user_id" .= toJSON userId
, "message" .= toJSON msg
]
toJSON NPing = Aeson.object [ "type" .= ("ping" :: Text) ] toJSON NPing = Aeson.object [ "type" .= ("ping" :: Text) ]
-- We don't need to decode notifications, this is for tests only -- We don't need to decode notifications, this is for tests only
instance FromJSON Notification where instance FromJSON Notification where
...@@ -264,5 +271,9 @@ instance FromJSON Notification where ...@@ -264,5 +271,9 @@ instance FromJSON Notification where
nodeId <- o .: "node_id" nodeId <- o .: "node_id"
ji <- o .: "ji" ji <- o .: "ji"
pure $ NWorkerJobFinished nodeId ji pure $ NWorkerJobFinished nodeId ji
"notify_user" -> do
userId <- o .: "user_id"
msg <- o .: "message"
pure $ NNotifyUser userId msg
"ping" -> pure NPing "ping" -> pure NPing
s -> prependFailure "parsing type failed, " (typeMismatch "type" s) s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment