[ws] add ping/pong, add notification function, unique subscriptions

parent 9de83328
Pipeline #6139 failed with stages
in 74 minutes and 48 seconds
......@@ -42,12 +42,12 @@ gClient = do
withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560"
-- let str = C.unwords (take 10 $ repeat "hello")
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": 15}"
let str = "{\"type\": \"update_tree_first_level\", \"node_id\": -1}"
C.putStrLn $ C.pack "sending: " <> str
send s str
withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560"
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": 16}"
let str2 = "{\"type\": \"update_tree_first_level\", \"node_id\": -2}"
C.putStrLn $ C.pack "sending: " <> str2
send s str2
......@@ -28,6 +28,8 @@ import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Errors.Types
import Gargantext.API.Prelude
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Database.Action.Flow.Types
import Gargantext.Database.Action.Node
import Gargantext.Database.Admin.Types.Node
......@@ -63,7 +65,13 @@ postNode :: HasNodeError err
-> Cmd err [NodeId]
postNode authenticatedUser pId (PostNode nodeName nt) = do
let userId = authenticatedUser ^. auth_user_id
mkNodeWithParent nt (Just pId) userId nodeName
nodeIds <- mkNodeWithParent nt (Just pId) userId nodeName
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
CE.notify $ CE.UpdateTreeFirstLevel pId
return nodeIds
------------------------------------------------------------------------
type PostNodeAsync = Summary "Post Node"
......@@ -98,6 +106,10 @@ postNodeAsync authenticatedUser nId (PostNode nodeName tn) jobHandle = do
markProgress 1 jobHandle
let userId = authenticatedUser ^. auth_user_id
_ <- mkNodeWithParent tn (Just nId) userId nodeName
_nodeIds <- mkNodeWithParent tn (Just nId) userId nodeName
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
CE.notify $ CE.UpdateTreeFirstLevel nId
markComplete jobHandle
......@@ -64,3 +64,10 @@ gServer = do
send s_dispatcher r
_ -> putText "[central_exchange] unknown"
notify :: CEMessage -> IO ()
notify ceMessage = do
withSocket Push $ \s -> do
_ <- connect s "tcp://localhost:5560"
let str = Aeson.encode ceMessage
send s $ BSL.toStrict str
......@@ -25,6 +25,7 @@ import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL
import Data.List (nubBy)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude
......@@ -114,6 +115,8 @@ browser.
data WSRequest =
WSSubscribe Topic
| WSUnsubscribe Topic
| WSPing
| WSPong
deriving (Eq, Show)
instance FromJSON WSRequest where
parseJSON = Aeson.withObject "WSRequest" $ \o -> do
......@@ -125,6 +128,8 @@ instance FromJSON WSRequest where
"unsubscribe" -> do
topic <- o .: "topic"
pure $ WSUnsubscribe topic
"ping" -> pure WSPing
"pong" -> pure WSPong
s -> prependFailure "parsing request type failed, " (typeMismatch "request" s)
data Dispatcher =
......@@ -154,7 +159,7 @@ insertSubscription :: TVar [Subscription] -> Subscription -> IO [Subscription]
insertSubscription subscriptions sub =
atomically $ do
s <- readTVar subscriptions
let ss = s <> [sub]
let ss = nubBy eqSub $ s <> [sub]
writeTVar subscriptions ss
pure ss
......@@ -190,9 +195,15 @@ wsServer subscriptions = streamData
putText $ show $ WS.requestHeaders reqHead
c <- liftIO $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c)
_ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
_ <- liftIO $ Async.concurrently (wsLoop ws) (pingLoop ws)
-- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure ()
pingLoop ws = do
forever $ do
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode Ping) Nothing)
threadDelay $ 10 * 1000000
wsLoop ws = flip finally disconnect $ do
putText "[wsLoop] connecting"
forever $ do
......@@ -215,6 +226,10 @@ wsServer subscriptions = streamData
, s_topic = topic }
ss <- removeSubscription subscriptions sub
putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
Just WSPing -> do
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode Pong) Nothing)
Just WSPong -> do
putText $ "[wsLoop] pong received"
_ -> putText "[wsLoop] binary ws messages not supported"
where
disconnect = do
......@@ -223,13 +238,18 @@ wsServer subscriptions = streamData
putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
data Notification = Notification Topic
data Notification =
Notification Topic
| Ping
| Pong
deriving (Eq, Show)
instance ToJSON Notification where
toJSON (Notification topic) = Aeson.object [
"notification" .= toJSON topic
]
toJSON Ping = toJSON ("ping" :: Text)
toJSON Pong = toJSON ("pong" :: Text)
ce_listener :: TVar [Subscription] -> IO ()
......
......@@ -20,6 +20,8 @@ module Gargantext.Database.Action.Delete
import Control.Lens (view)
import Data.Text (unpack)
import Gargantext.Core (HasDBid(..))
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Database.Action.Share (delFolderTeam)
import Gargantext.Database.Action.User (getUserId)
......@@ -43,7 +45,7 @@ deleteNode :: (CmdCommon env, HasNodeError err)
-> Cmd' env err Int
deleteNode u nodeId = do
node' <- N.getNode nodeId
case (view node_typename node') of
num <- case (view node_typename node') of
nt | nt == toDBid NodeUser -> panicTrace "[G.D.A.D.deleteNode] Not allowed to delete NodeUser (yet)"
nt | nt == toDBid NodeTeam -> do
uId <- getUserId u
......@@ -57,6 +59,15 @@ deleteNode u nodeId = do
N.deleteNode nodeId
_ -> N.deleteNode nodeId
-- | Node was deleted, refresh its parent (if exists)
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
case view node_parent_id node' of
Nothing -> return ()
Just pId -> CE.notify $ CE.UpdateTreeFirstLevel pId
return num
-- if hasNodeType node' NodeUser
-- then panic "Not allowed to delete NodeUser (yet)"
-- else if hasNodeType node' NodeTeam
......
......@@ -65,6 +65,8 @@ import Data.Text qualified as T
import EPO.API.Client.Types qualified as EPO
import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.Core (Lang(..), NLPServerConfig, withDefaultLanguage)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
import Gargantext.Core.NodeStory.Types (HasNodeStory)
......@@ -329,6 +331,10 @@ createNodes mkCorpusUser ctype = do
_ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
-- _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
liftBase $ do
CE.notify $ CE.UpdateTreeFirstLevel listId
CE.notify $ CE.UpdateTreeFirstLevel userCorpusId
pure (userId, userCorpusId, listId)
......
......@@ -117,6 +117,20 @@ getNodesWithParentId n = runOpaQuery $ selectNodesWithParentID n'
Just n'' -> n''
Nothing -> 0
-- | Given a node id, find it's parent node id (if exists)
getParentId :: NodeId -> DBCmd err (Maybe NodeId)
getParentId nId = do
result <- runPGSQuery query (PGS.Only nId)
case result of
[PGS.Only parentId] -> pure $ Just $ UnsafeMkNodeId parentId
_ -> pure Nothing
where
query :: PGS.Query
query = [sql|
SELECT parent_id
FROM nodes
WHERE id = ?;
|]
-- | Given a node id, find it's closest parent of given type
-- NOTE: This isn't too optimal: can make successive queries depending on how
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment