Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
haskell-gargantext
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
199
Issues
199
List
Board
Labels
Milestones
Merge Requests
12
Merge Requests
12
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gargantext
haskell-gargantext
Commits
9dd68abe
Commit
9dd68abe
authored
Sep 18, 2025
by
Fabien Maniere
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev-add-option-to-notify-users' into 'dev'
Dev add option to notify users See merge request
!438
parents
97441f3d
d641fd97
Pipeline
#7907
passed with stages
in 53 minutes and 56 seconds
Changes
5
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
59 additions
and
2 deletions
+59
-2
README.md
README.md
+10
-0
CentralExchange.hs
src/Gargantext/Core/Notifications/CentralExchange.hs
+2
-0
Types.hs
src/Gargantext/Core/Notifications/CentralExchange/Types.hs
+13
-0
Dispatcher.hs
src/Gargantext/Core/Notifications/Dispatcher.hs
+23
-2
Types.hs
src/Gargantext/Core/Notifications/Dispatcher/Types.hs
+11
-0
No files found.
README.md
View file @
9dd68abe
...
@@ -573,3 +573,13 @@ or `inproc` (this limits us to inter-process communication).
...
@@ -573,3 +573,13 @@ or `inproc` (this limits us to inter-process communication).
The `bind` part is for the server, the `connect` part is for the
The `bind` part is for the server, the `connect` part is for the
clients connecting to that server.
clients connecting to that server.
## Notifying users and debugging notifications
Since notifications are handled by nng, one can use `nngcat` to send
handcrafted messages for debugging purposes.
In particular, it is possible to notify individual users like this:
```
shell
nngcat --push --connect tcp://127.0.0.1:5560 --data '{"user_id": 2, "message": "hello user1","type":"notify_user"}'
```
src/Gargantext/Core/Notifications/CentralExchange.hs
View file @
9dd68abe
...
@@ -103,6 +103,8 @@ gServer cfg = do
...
@@ -103,6 +103,8 @@ gServer cfg = do
Just
(
UpdateWorkerProgress
_ji
_jl
)
->
do
Just
(
UpdateWorkerProgress
_ji
_jl
)
->
do
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] update worker progress: " <> show ji <> ", " <> show jl
sendTimeout
nc
ioLogger
s_dispatcher
r
sendTimeout
nc
ioLogger
s_dispatcher
r
Just
(
NotifyUser
_uid
_msg
)
->
do
sendTimeout
nc
ioLogger
s_dispatcher
r
Just
Ping
->
do
Just
Ping
->
do
sendTimeout
nc
ioLogger
s_dispatcher
r
sendTimeout
nc
ioLogger
s_dispatcher
r
Nothing
->
Nothing
->
...
...
src/Gargantext/Core/Notifications/CentralExchange/Types.hs
View file @
9dd68abe
...
@@ -20,6 +20,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
...
@@ -20,6 +20,7 @@ import Data.Aeson.Types (prependFailure, typeMismatch)
import
Gargantext.API.Admin.Orchestrator.Types
(
JobLog
)
import
Gargantext.API.Admin.Orchestrator.Types
(
JobLog
)
import
Gargantext.Core.Types
(
NodeId
)
import
Gargantext.Core.Types
(
NodeId
)
import
Gargantext.Core.Worker.Types
(
JobInfo
)
import
Gargantext.Core.Worker.Types
(
JobInfo
)
import
Gargantext.Database.Admin.Types.Node
(
UserId
)
import
Gargantext.Prelude
import
Gargantext.Prelude
import
Prelude
qualified
import
Prelude
qualified
...
@@ -40,11 +41,13 @@ data CEMessage =
...
@@ -40,11 +41,13 @@ data CEMessage =
UpdateWorkerProgress
JobInfo
JobLog
UpdateWorkerProgress
JobInfo
JobLog
-- | Update tree for given nodeId
-- | Update tree for given nodeId
|
UpdateTreeFirstLevel
NodeId
|
UpdateTreeFirstLevel
NodeId
|
NotifyUser
UserId
Text
|
Ping
|
Ping
instance
Prelude
.
Show
CEMessage
where
instance
Prelude
.
Show
CEMessage
where
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
-- show (UpdateWorkerProgress ji nodeId jl) = "UpdateWorkerProgress " <> show ji <> " " <> show nodeId <> " " <> show jl
show
(
UpdateWorkerProgress
ji
jl
)
=
"UpdateWorkerProgress "
<>
show
ji
<>
" "
<>
show
jl
show
(
UpdateWorkerProgress
ji
jl
)
=
"UpdateWorkerProgress "
<>
show
ji
<>
" "
<>
show
jl
show
(
UpdateTreeFirstLevel
nodeId
)
=
"UpdateTreeFirstLevel "
<>
show
nodeId
show
(
UpdateTreeFirstLevel
nodeId
)
=
"UpdateTreeFirstLevel "
<>
show
nodeId
show
(
NotifyUser
userId
msg
)
=
"NotifyUser "
<>
show
userId
<>
", "
<>
show
msg
show
Ping
=
"Ping"
show
Ping
=
"Ping"
instance
FromJSON
CEMessage
where
instance
FromJSON
CEMessage
where
parseJSON
=
withObject
"CEMessage"
$
\
o
->
do
parseJSON
=
withObject
"CEMessage"
$
\
o
->
do
...
@@ -59,6 +62,10 @@ instance FromJSON CEMessage where
...
@@ -59,6 +62,10 @@ instance FromJSON CEMessage where
"update_tree_first_level"
->
do
"update_tree_first_level"
->
do
node_id
<-
o
.:
"node_id"
node_id
<-
o
.:
"node_id"
pure
$
UpdateTreeFirstLevel
node_id
pure
$
UpdateTreeFirstLevel
node_id
"notify_user"
->
do
user_id
<-
o
.:
"user_id"
msg
<-
o
.:
"message"
pure
$
NotifyUser
user_id
msg
"ping"
->
pure
Ping
"ping"
->
pure
Ping
s
->
prependFailure
"parsing type failed, "
(
typeMismatch
"type"
s
)
s
->
prependFailure
"parsing type failed, "
(
typeMismatch
"type"
s
)
instance
ToJSON
CEMessage
where
instance
ToJSON
CEMessage
where
...
@@ -72,8 +79,14 @@ instance ToJSON CEMessage where
...
@@ -72,8 +79,14 @@ instance ToJSON CEMessage where
"type"
.=
(
"update_tree_first_level"
::
Text
)
"type"
.=
(
"update_tree_first_level"
::
Text
)
,
"node_id"
.=
nodeId
,
"node_id"
.=
nodeId
]
]
toJSON
(
NotifyUser
userId
msg
)
=
object
[
"type"
.=
(
"notify_user"
::
Text
)
,
"user_id"
.=
userId
,
"message"
.=
msg
]
toJSON
Ping
=
object
[
"type"
.=
(
"ping"
::
Text
)
]
toJSON
Ping
=
object
[
"type"
.=
(
"ping"
::
Text
)
]
class
HasCentralExchangeNotification
env
where
class
HasCentralExchangeNotification
env
where
ce_notify
::
(
MonadReader
env
m
,
MonadBase
IO
m
)
=>
CEMessage
->
m
()
ce_notify
::
(
MonadReader
env
m
,
MonadBase
IO
m
)
=>
CEMessage
->
m
()
src/Gargantext/Core/Notifications/Dispatcher.hs
View file @
9dd68abe
...
@@ -29,6 +29,7 @@ import Control.Concurrent.STM.TChan qualified as TChan
...
@@ -29,6 +29,7 @@ import Control.Concurrent.STM.TChan qualified as TChan
import
Control.Concurrent.Throttle
(
throttle
)
import
Control.Concurrent.Throttle
(
throttle
)
import
Data.Aeson
qualified
as
Aeson
import
Data.Aeson
qualified
as
Aeson
import
Data.ByteString.Lazy
qualified
as
BSL
import
Data.ByteString.Lazy
qualified
as
BSL
import
Data.List
(
nubBy
)
import
Data.Text
qualified
as
T
import
Data.Text
qualified
as
T
import
DeferredFolds.UnfoldlM
qualified
as
UnfoldlM
import
DeferredFolds.UnfoldlM
qualified
as
UnfoldlM
import
Gargantext.Core.Config
import
Gargantext.Core.Config
...
@@ -107,7 +108,17 @@ dispatcherListener config subscriptions = do
...
@@ -107,7 +108,17 @@ dispatcherListener config subscriptions = do
case
Aeson
.
decode
(
BSL
.
fromStrict
r
)
of
case
Aeson
.
decode
(
BSL
.
fromStrict
r
)
of
Nothing
->
Nothing
->
logMsg
ioL
DEBUG
"[dispatcher_listener] unknown message from central exchange"
logMsg
ioL
DEBUG
"[dispatcher_listener] unknown message from central exchange"
-- Just n@(CETypes.NotifyUser _userId _msg) -> do
-- -- A single user could have multiple subcriptions. We only
-- -- want to send one notification to each of this user's
-- -- browsers. That's why we have the 'WSKeyConnection' type
-- logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show n
-- -- subs <- atomically $ readTVar subscriptions
-- filteredSubs <- atomically $ do
-- let subs' = UnfoldlM.filter (pure . ceMessageSubPred n) $ SSet.unfoldlM subscriptions
-- UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs'
-- pure ()
Just
ceMessage
->
do
Just
ceMessage
->
do
logMsg
ioL
DEBUG
$
"[dispatcher_listener] received "
<>
show
ceMessage
logMsg
ioL
DEBUG
$
"[dispatcher_listener] received "
<>
show
ceMessage
-- subs <- atomically $ readTVar subscriptions
-- subs <- atomically $ readTVar subscriptions
...
@@ -122,7 +133,13 @@ dispatcherListener config subscriptions = do
...
@@ -122,7 +133,13 @@ dispatcherListener config subscriptions = do
-- one drops in the meantime, it won't listen to what we
-- one drops in the meantime, it won't listen to what we
-- send...)
-- send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs
-- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_
(
sendNotification
throttleTChan
ceMessage
)
filteredSubs
-- We do 'nubBy' because we want to send only 1 such
-- message to each connection, even if there are more
-- subscriptions from the same user (multiple subcriptions
-- could have matched the above 'ceMessageSubPred').
let
uniqueConnections
=
nubBy
(
\
a
b
->
s_ws_key_connection
a
==
s_ws_key_connection
b
)
filteredSubs
mapM_
(
sendNotification
throttleTChan
ceMessage
)
uniqueConnections
-- | When processing tasks such as Flow, we can generate quite a few
-- | When processing tasks such as Flow, we can generate quite a few
-- notifications in a short time. We want to limit this with throttle
-- notifications in a short time. We want to limit this with throttle
...
@@ -157,6 +174,8 @@ sendNotification throttleTChan ceMessage sub = do
...
@@ -157,6 +174,8 @@ sendNotification throttleTChan ceMessage sub = do
else
Nothing
else
Nothing
(
Ping
,
CETypes
.
Ping
)
->
(
Ping
,
CETypes
.
Ping
)
->
Just
NPing
Just
NPing
(
_
,
CETypes
.
NotifyUser
userId
msg
)
->
Just
$
NNotifyUser
userId
msg
_
->
Nothing
_
->
Nothing
case
mNotification
of
case
mNotification
of
...
@@ -200,5 +219,7 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }
...
@@ -200,5 +219,7 @@ ceMessageSubPred (CETypes.UpdateWorkerProgress ji _jl) (Subscription { s_topic }
||
Just
s_topic
==
(
UpdateTree
<$>
_ji_mNode_id
ji
)
||
Just
s_topic
==
(
UpdateTree
<$>
_ji_mNode_id
ji
)
ceMessageSubPred
(
CETypes
.
UpdateTreeFirstLevel
nodeId
)
(
Subscription
{
s_topic
})
=
ceMessageSubPred
(
CETypes
.
UpdateTreeFirstLevel
nodeId
)
(
Subscription
{
s_topic
})
=
s_topic
==
UpdateTree
nodeId
s_topic
==
UpdateTree
nodeId
ceMessageSubPred
(
CETypes
.
NotifyUser
userId
_msg
)
(
Subscription
{
s_connected_user
})
=
s_connected_user
==
CUUser
userId
ceMessageSubPred
CETypes
.
Ping
(
Subscription
{
s_topic
})
=
ceMessageSubPred
CETypes
.
Ping
(
Subscription
{
s_topic
})
=
s_topic
==
Ping
s_topic
==
Ping
src/Gargantext/Core/Notifications/Dispatcher/Types.hs
View file @
9dd68abe
...
@@ -211,6 +211,7 @@ data Notification =
...
@@ -211,6 +211,7 @@ data Notification =
|
NUpdateTree
NodeId
|
NUpdateTree
NodeId
|
NWorkerJobStarted
NodeId
JobInfo
|
NWorkerJobStarted
NodeId
JobInfo
|
NWorkerJobFinished
NodeId
JobInfo
|
NWorkerJobFinished
NodeId
JobInfo
|
NNotifyUser
UserId
Text
|
NPing
|
NPing
instance
Prelude
.
Show
Notification
where
instance
Prelude
.
Show
Notification
where
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
-- show (NUpdateWorkerProgress jobInfo nodeId mJobLog) = "NUpdateWorkerProgress " <> show jobInfo <> ", " <> show nodeId <> ", " <> show mJobLog
...
@@ -218,6 +219,7 @@ instance Prelude.Show Notification where
...
@@ -218,6 +219,7 @@ instance Prelude.Show Notification where
show
(
NUpdateTree
nodeId
)
=
"NUpdateTree "
<>
show
nodeId
show
(
NUpdateTree
nodeId
)
=
"NUpdateTree "
<>
show
nodeId
show
(
NWorkerJobStarted
nodeId
ji
)
=
"NWorkerJobStarted "
<>
show
nodeId
<>
", "
<>
show
ji
show
(
NWorkerJobStarted
nodeId
ji
)
=
"NWorkerJobStarted "
<>
show
nodeId
<>
", "
<>
show
ji
show
(
NWorkerJobFinished
nodeId
ji
)
=
"NWorkerJobFinished "
<>
show
nodeId
<>
", "
<>
show
ji
show
(
NWorkerJobFinished
nodeId
ji
)
=
"NWorkerJobFinished "
<>
show
nodeId
<>
", "
<>
show
ji
show
(
NNotifyUser
userId
msg
)
=
"NNotifyUser "
<>
show
userId
<>
", "
<>
show
msg
show
NPing
=
"NPing"
show
NPing
=
"NPing"
instance
ToJSON
Notification
where
instance
ToJSON
Notification
where
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
-- toJSON (NUpdateWorkerProgress jobInfo nodeId mJobLog) = Aeson.object [
...
@@ -241,6 +243,11 @@ instance ToJSON Notification where
...
@@ -241,6 +243,11 @@ instance ToJSON Notification where
,
"node_id"
.=
toJSON
nodeId
,
"node_id"
.=
toJSON
nodeId
,
"ji"
.=
toJSON
ji
,
"ji"
.=
toJSON
ji
]
]
toJSON
(
NNotifyUser
userId
msg
)
=
Aeson
.
object
[
"type"
.=
(
"notify_user"
::
Text
)
,
"user_id"
.=
toJSON
userId
,
"message"
.=
toJSON
msg
]
toJSON
NPing
=
Aeson
.
object
[
"type"
.=
(
"ping"
::
Text
)
]
toJSON
NPing
=
Aeson
.
object
[
"type"
.=
(
"ping"
::
Text
)
]
-- We don't need to decode notifications, this is for tests only
-- We don't need to decode notifications, this is for tests only
instance
FromJSON
Notification
where
instance
FromJSON
Notification
where
...
@@ -264,5 +271,9 @@ instance FromJSON Notification where
...
@@ -264,5 +271,9 @@ instance FromJSON Notification where
nodeId
<-
o
.:
"node_id"
nodeId
<-
o
.:
"node_id"
ji
<-
o
.:
"ji"
ji
<-
o
.:
"ji"
pure
$
NWorkerJobFinished
nodeId
ji
pure
$
NWorkerJobFinished
nodeId
ji
"notify_user"
->
do
userId
<-
o
.:
"user_id"
msg
<-
o
.:
"message"
pure
$
NNotifyUser
userId
msg
"ping"
->
pure
NPing
"ping"
->
pure
NPing
s
->
prependFailure
"parsing type failed, "
(
typeMismatch
"type"
s
)
s
->
prependFailure
"parsing type failed, "
(
typeMismatch
"type"
s
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment