[websockets] some refactoring (split to smaller modules)

parent 991c637c
Pipeline #6241 failed with stages
...@@ -390,3 +390,11 @@ Maybe you need to change the port to 5433 for database connection in your gargan ...@@ -390,3 +390,11 @@ Maybe you need to change the port to 5433 for database connection in your gargan
## `haskell-language-server`
If you want to use `haskell-language-server` for GHC 9.4.7, install it
with `ghcup`:
```shell
ghcup compile hls --version 2.7.0.0 --ghc 9.4.7
```
https://haskell-language-server.readthedocs.io/en/latest/installation.html
...@@ -171,7 +171,9 @@ library ...@@ -171,7 +171,9 @@ library
Gargantext.Core.AsyncUpdates.CentralExchange.Types Gargantext.Core.AsyncUpdates.CentralExchange.Types
Gargantext.Core.AsyncUpdates.Constants Gargantext.Core.AsyncUpdates.Constants
Gargantext.Core.AsyncUpdates.Dispatcher Gargantext.Core.AsyncUpdates.Dispatcher
Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
Gargantext.Core.AsyncUpdates.Dispatcher.Types Gargantext.Core.AsyncUpdates.Dispatcher.Types
Gargantext.Core.AsyncUpdates.Dispatcher.WebSocket
Gargantext.Core.AsyncUpdates.Nanomsg Gargantext.Core.AsyncUpdates.Nanomsg
Gargantext.Core.Mail.Types Gargantext.Core.Mail.Types
Gargantext.Core.Methods.Similarities Gargantext.Core.Methods.Similarities
......
...@@ -28,7 +28,7 @@ import Gargantext.API.GraphQL ...@@ -28,7 +28,7 @@ import Gargantext.API.GraphQL
import Gargantext.API.Routes.Named.Private import Gargantext.API.Routes.Named.Private
import Gargantext.API.Routes.Named.Public import Gargantext.API.Routes.Named.Public
import Gargantext.API.Routes.Types import Gargantext.API.Routes.Types
import Gargantext.Core.AsyncUpdates.Dispatcher qualified as Dispatcher import Gargantext.Core.AsyncUpdates.Dispatcher.WebSocket qualified as Dispatcher
import Servant.API ((:>), (:-), JSON, ReqBody, Post, Get, QueryParam) import Servant.API ((:>), (:-), JSON, ReqBody, Post, Get, QueryParam)
import Servant.API.Description (Summary) import Servant.API.Description (Summary)
import Servant.API.NamedRoutes import Servant.API.NamedRoutes
......
...@@ -22,7 +22,7 @@ import Gargantext.API.Server.Named.Public (serverPublicGargAPI) ...@@ -22,7 +22,7 @@ import Gargantext.API.Server.Named.Public (serverPublicGargAPI)
import Gargantext.API.Routes.Named import Gargantext.API.Routes.Named
import Gargantext.API.Swagger (swaggerDoc) import Gargantext.API.Swagger (swaggerDoc)
import Gargantext.API.ThrowAll (serverPrivateGargAPI) import Gargantext.API.ThrowAll (serverPrivateGargAPI)
import Gargantext.Core.AsyncUpdates.Dispatcher qualified as Dispatcher import Gargantext.Core.AsyncUpdates.Dispatcher.WebSocket qualified as Dispatcher
import Gargantext.Database.Prelude (hasConfig) import Gargantext.Database.Prelude (hasConfig)
import Gargantext.Prelude hiding (Handler, catch) import Gargantext.Prelude hiding (Handler, catch)
import Gargantext.Prelude.Config (gc_url_backend_api) import Gargantext.Prelude.Config (gc_url_backend_api)
......
...@@ -16,16 +16,14 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -16,16 +16,14 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.AsyncUpdates.CentralExchange where module Gargantext.Core.AsyncUpdates.CentralExchange where
-- import Control.Concurrent (threadDelay)
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TChan qualified as TChan
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect, dispatcherConnect) import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect, dispatcherConnect)
-- import Gargantext.Core.AsyncUpdates.Nanomsg (withSafeSocket)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket) import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
{- {-
...@@ -58,36 +56,37 @@ gServer = do ...@@ -58,36 +56,37 @@ gServer = do
forever $ do forever $ do
-- putText "[central_exchange] receiving" -- putText "[central_exchange] receiving"
r <- recvMalloc s 1024 r <- recvMalloc s 1024
C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
worker s_dispatcher tChan = do worker s_dispatcher tChan = do
forever $ do withLogger () $ \ioLogger -> do
r <- atomically $ TChan.readTChan tChan forever $ do
case Aeson.decode (BSL.fromStrict r) of r <- atomically $ TChan.readTChan tChan
Just ujp@(UpdateJobProgress _jId _jobLog) -> do case Aeson.decode (BSL.fromStrict r) of
putText $ "[central_exchange] " <> show ujp Just ujp@(UpdateJobProgress _jId _jobLog) -> do
-- send the same message that we received logMsg ioLogger DEBUG $ "[central_exchange] " <> show ujp
send s_dispatcher r -- send the same message that we received
Just (UpdateTreeFirstLevel node_id) -> do send s_dispatcher r
putText $ "[central_exchange] update tree: " <> show node_id Just (UpdateTreeFirstLevel node_id) -> do
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id logMsg ioLogger DEBUG $ "[central_exchange] update tree: " <> show node_id
-- To make this more robust, use withAsync so we don't -- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- block the main thread (send is blocking) -- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
-- NOTE: If we're flooded with messages, and send is
-- slow, we might be spawning many threads... -- NOTE: If we're flooded with messages, and send is
-- slow, we might be spawning many threads...
-- NOTE: Currently we just forward the message that we
-- got. So in theory central exchange isn't needed (we -- NOTE: Currently we just forward the message that we
-- could ping dispatcher directly). However, I think -- got. So in theory central exchange isn't needed (we
-- it's better to have this as a separate -- could ping dispatcher directly). However, I think
-- component. Currently I built this inside -- it's better to have this as a separate
-- gargantext-server but maybe it can be a separate -- component. Currently I built this inside
-- process, independent of the server. -- gargantext-server but maybe it can be a separate
-- send the same message that we received -- process, independent of the server.
send s_dispatcher r -- send the same message that we received
_ -> putText $ "[central_exchange] unknown message" send s_dispatcher r
_ -> logMsg ioLogger DEBUG $ "[central_exchange] unknown message"
notify :: CEMessage -> IO () notify :: CEMessage -> IO ()
......
...@@ -23,7 +23,6 @@ import Data.ByteString.Lazy qualified as BSL ...@@ -23,7 +23,6 @@ import Data.ByteString.Lazy qualified as BSL
import Gargantext.API.Admin.Orchestrator.Types (JobLog) import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.Core.Types (NodeId) import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude import Gargantext.Prelude
-- import Gargantext.Utils.Jobs.Map qualified as JM
import Prelude qualified import Prelude qualified
import Servant.Job.Core (Safety(Safe)) import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobID) import Servant.Job.Types (JobID)
......
...@@ -15,42 +15,21 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -15,42 +15,21 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-}
module Gargantext.Core.AsyncUpdates.Dispatcher where module Gargantext.Core.AsyncUpdates.Dispatcher where
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan import Control.Concurrent.STM.TChan qualified as TChan
import Control.Lens (view)
import Data.Aeson ((.:), (.=))
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL import Data.ByteString.Lazy qualified as BSL
import Data.List (nubBy)
import DeferredFolds.UnfoldlM qualified as UnfoldlM import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.EnvTypes (env_dispatcher)
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings, HasSettings(settings))
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.AsyncUpdates.Dispatcher.Types import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg) import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
-- import Gargantext.Utils.Jobs.Monad (MonadJobStatus(getLatestJobStatus))
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket) import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Protolude.Base (Show(showsPrec))
import Servant
-- import Servant.API.NamedRoutes ((:-))
import Servant.API.WebSocket qualified as WS
import Servant.Auth.Server (verifyJWT)
import Servant.Server.Generic (AsServer, AsServerT)
import Servant.Swagger.UI
import StmContainers.Set as SSet import StmContainers.Set as SSet
{- {-
...@@ -76,135 +55,7 @@ dispatcher = do ...@@ -76,135 +55,7 @@ dispatcher = do
, d_ce_listener = d_ce_listener } , d_ce_listener = d_ce_listener }
-- | TODO Allow only 1 topic subscription per connection. It doesn't
-- | make sense to send multiple notifications of the same type to the
-- | same connection.
insertSubscription :: SSet.Set Subscription -> Subscription -> IO ()
insertSubscription subscriptions sub = do
atomically $ SSet.insert sub subscriptions
-- s <- readTVar subscriptions
-- let ss = nubBy eqSub $ s <> [sub]
-- writeTVar subscriptions ss
-- -- pure ss
-- pure ()
removeSubscription :: SSet.Set Subscription -> Subscription -> IO ()
removeSubscription subscriptions sub = do
atomically $ SSet.delete sub subscriptions
-- s <- readTVar subscriptions
-- let ss = filter (\sub' -> not $ sub `eqSub` sub') s
-- writeTVar subscriptions ss
-- pure ss
removeSubscriptionsForWSKey :: SSet.Set Subscription -> WSKeyConnection -> IO ()
removeSubscriptionsForWSKey subscriptions ws = do
atomically $ do
let toDelete = UnfoldlM.filter (\sub -> return $ subKey sub == wsKey ws) $ SSet.unfoldlM subscriptions
UnfoldlM.mapM_ (\sub -> SSet.delete sub subscriptions) toDelete
-- atomically $ do
-- s <- readTVar subscriptions
-- let ss = filter (\sub -> subKey sub /= wsKey ws) s
-- writeTVar subscriptions ss
-- pure ss
newtype WSAPI mode = WSAPI {
wsAPIServer :: mode :- "ws" :> Summary "WebSocket endpoint" :> WS.WebSocketPending
} deriving Generic
wsServer :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WSAPI (AsServerT m)
wsServer = WSAPI { wsAPIServer = streamData }
where
streamData :: ( IsGargServer env err m, HasDispatcher env, HasSettings env )
=> WS.PendingConnection -> m ()
streamData pc = do
authSettings <- view settings
d <- view hasDispatcher
let subscriptions = d_subscriptions d
let reqHead = WS.pendingRequest pc
-- WebSocket specification says that a pending request should send
-- some unique, Sec-WebSocket-Key string. We use this to compare
-- connections (WS.Connection doesn't implement an Eq instance).
let mKey = head $ filter (\(k, _) -> k == "Sec-WebSocket-Key") $ WS.requestHeaders reqHead
let key' = snd $ fromMaybe (panicTrace "Sec-WebSocket-Key not found!") mKey
-- Unfortunately, a single browsers sends the same
-- Sec-WebSocket-Key so we want to make that even more unique.
uuid <- liftBase $ UUID.nextRandom
let key = key' <> "-" <> show uuid
-- liftBase $ putText $ show $ WS.requestHeaders reqHead
c <- liftBase $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c)
_ <- liftBase $ Async.concurrently (wsLoop authSettings subscriptions ws) (pingLoop ws)
-- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure ()
-- | Send a ping control frame periodically, otherwise the
-- | connection is dropped. NOTE that 'onPing' message is not
-- | supported in the JS API: either the browser supports this or
-- | not:
-- | https://stackoverflow.com/questions/10585355/sending-websocket-ping-pong-frame-from-browser
pingLoop ws = do
forever $ do
-- WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode Ping) Nothing)
WS.sendPing (wsConn ws) ("" :: Text)
threadDelay $ 10 * 1000000
wsLoop authSettings subscriptions ws = flip finally disconnect $ do
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] connecting"
wsLoop' CUPublic ioLogger
where
wsLoop' user ioLogger = do
dm <- WS.receiveDataMessage (wsConn ws)
newUser <- case dm of
WS.Text dm' _ -> do
case Aeson.decode dm' of
Nothing -> do
logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm'
return user
Just (WSSubscribe topic) -> do
-- TODO Fix s_connected_user based on header
let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws
, s_topic = topic }
_ss <- insertSubscription subscriptions sub
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
return user
Just (WSUnsubscribe topic) -> do
let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws
, s_topic = topic }
_ss <- removeSubscription subscriptions sub
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
return user
Just (WSAuthorize token) -> do
let jwtS = authSettings ^. jwtSettings
mUser <- liftBase $ verifyJWT jwtS (encodeUtf8 token)
logMsg ioLogger DEBUG $ "[wsLoop] authorized user: " <> show mUser
-- TODO Update my subscriptions!
return $ fromMaybe user (CUUser . _auth_user_id <$> mUser)
Just WSDeauthorize -> do
-- TODO Update my subscriptions!
pure CUPublic
_ -> do
logMsg ioLogger DEBUG "[wsLoop] binary ws messages not supported"
return user
wsLoop' newUser ioLogger
disconnect = do
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] disconnecting..."
_ss <- removeSubscriptionsForWSKey subscriptions ws
-- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss)
return ()
-- | This is a nanomsg socket listener. We want to read the messages -- | This is a nanomsg socket listener. We want to read the messages
-- | as fast as possible and then process them gradually in a separate -- | as fast as possible and then process them gradually in a separate
-- | thread. -- | thread.
...@@ -218,14 +69,14 @@ dispatcher_listener subscriptions = do ...@@ -218,14 +69,14 @@ dispatcher_listener subscriptions = do
-- NOTE I'm not sure that we need more than 1 worker here, but in -- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication, -- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes. -- DB queries etc so it can be slow sometimes.
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker s tChan) $ do void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan) $ do
forever $ do forever $ do
-- putText "[dispatcher_listener] receiving" -- putText "[dispatcher_listener] receiving"
r <- recvMalloc s 1024 r <- recvMalloc s 1024
-- C.putStrLn $ "[dispatcher_listener] " <> r -- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r atomically $ TChan.writeTChan tChan r
where where
worker s tChan = do worker tChan = do
-- tId <- myThreadId -- tId <- myThreadId
forever $ do forever $ do
...@@ -257,11 +108,11 @@ dispatcher_listener subscriptions = do ...@@ -257,11 +108,11 @@ dispatcher_listener subscriptions = do
let topic = s_topic sub let topic = s_topic sub
notification <- notification <-
case ceMessage of case ceMessage of
CETypes.UpdateJobProgress jId jobLog -> do CETypes.UpdateJobProgress _jId jobLog -> do
-- js <- getLatestJobStatus jId -- js <- getLatestJobStatus jId
-- putText $ "[sendNotification] latestJobStatus" js -- putText $ "[sendNotification] latestJobStatus" js
pure $ Notification topic (MJobProgress jobLog) pure $ Notification topic (MJobProgress jobLog)
CETypes.UpdateTreeFirstLevel nodeId -> pure $ Notification topic MEmpty CETypes.UpdateTreeFirstLevel _nodeId -> pure $ Notification topic MEmpty
-- TODO send the same thing to everyone for now, this should be -- TODO send the same thing to everyone for now, this should be
-- converted to notifications -- converted to notifications
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode notification) Nothing) WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode notification) Nothing)
......
{-|
Module : Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
Description : Dispatcher (manage websocket subscriptions)
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions where
import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Prelude
import StmContainers.Set as SSet
-- | TODO Allow only 1 topic subscription per connection. It doesn't
-- | make sense to send multiple notifications of the same type to the
-- | same connection.
insertSubscription :: SSet.Set Subscription -> Subscription -> IO ()
insertSubscription subscriptions sub = do
atomically $ SSet.insert sub subscriptions
-- s <- readTVar subscriptions
-- let ss = nubBy eqSub $ s <> [sub]
-- writeTVar subscriptions ss
-- -- pure ss
-- pure ()
removeSubscription :: SSet.Set Subscription -> Subscription -> IO ()
removeSubscription subscriptions sub = do
atomically $ SSet.delete sub subscriptions
-- s <- readTVar subscriptions
-- let ss = filter (\sub' -> not $ sub `eqSub` sub') s
-- writeTVar subscriptions ss
-- pure ss
removeSubscriptionsForWSKey :: SSet.Set Subscription -> WSKeyConnection -> IO ()
removeSubscriptionsForWSKey subscriptions ws = do
atomically $ do
let toDelete = UnfoldlM.filter (\sub -> return $ subKey sub == wsKey ws) $ SSet.unfoldlM subscriptions
UnfoldlM.mapM_ (\sub -> SSet.delete sub subscriptions) toDelete
-- atomically $ do
-- s <- readTVar subscriptions
-- let ss = filter (\sub -> subKey sub /= wsKey ws) s
-- writeTVar subscriptions ss
-- pure ss
...@@ -14,7 +14,6 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -14,7 +14,6 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-} {-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-}
module Gargantext.Core.AsyncUpdates.Dispatcher.Types where module Gargantext.Core.AsyncUpdates.Dispatcher.Types where
......
{-|
Module : Gargantext.Core.AsyncUpdates.Dispatcher.WebSocket
Description : Dispatcher websocket server
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.Core.AsyncUpdates.Dispatcher.WebSocket where
import Control.Concurrent.Async qualified as Async
import Control.Lens (view)
import Data.Aeson qualified as Aeson
import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.Types (HasSettings(settings), Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.Dispatcher.Subscriptions
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), logMsg, withLogger)
import Network.WebSockets qualified as WS
import Servant
import Servant.API.WebSocket qualified as WS (WebSocketPending)
import Servant.Auth.Server (verifyJWT)
import Servant.Server.Generic (AsServerT)
import StmContainers.Set as SSet
newtype WSAPI mode = WSAPI {
wsAPIServer :: mode :- "ws" :> Summary "WebSocket endpoint" :> WS.WebSocketPending
} deriving Generic
wsServer :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WSAPI (AsServerT m)
wsServer = WSAPI { wsAPIServer = streamData }
where
streamData :: ( IsGargServer env err m, HasDispatcher env, HasSettings env )
=> WS.PendingConnection -> m ()
streamData pc = do
authSettings <- view settings
d <- view hasDispatcher
let subscriptions = d_subscriptions d
key <- getWSKey pc
c <- liftBase $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c)
_ <- liftBase $ Async.concurrently (wsLoop authSettings subscriptions ws) (pingLoop ws)
-- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure ()
-- | Send a ping control frame periodically, otherwise the
-- | connection is dropped. NOTE that 'onPing' message is not
-- | supported in the JS API: either the browser supports this or
-- | not:
-- | https://stackoverflow.com/questions/10585355/sending-websocket-ping-pong-frame-from-browser
pingLoop :: WSKeyConnection -> IO ()
pingLoop ws = do
forever $ do
-- WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode Ping) Nothing)
WS.sendPing (wsConn ws) ("" :: Text)
threadDelay $ 10 * 1000000
wsLoop :: Settings -> SSet.Set Subscription -> WSKeyConnection -> IO a
wsLoop authSettings subscriptions ws = flip finally disconnect $ do
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] connecting"
wsLoop' CUPublic ioLogger
where
wsLoop' user ioLogger = do
dm <- WS.receiveDataMessage (wsConn ws)
newUser <- case dm of
WS.Text dm' _ -> do
case Aeson.decode dm' of
Nothing -> do
logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm'
return user
Just (WSSubscribe topic) -> do
-- TODO Fix s_connected_user based on header
let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws
, s_topic = topic }
_ss <- insertSubscription subscriptions sub
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
return user
Just (WSUnsubscribe topic) -> do
let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws
, s_topic = topic }
_ss <- removeSubscription subscriptions sub
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss)
return user
Just (WSAuthorize token) -> do
let jwtS = authSettings ^. jwtSettings
mUser <- liftBase $ verifyJWT jwtS (encodeUtf8 token)
logMsg ioLogger DEBUG $ "[wsLoop] authorized user: " <> show mUser
-- TODO Update my subscriptions!
return $ fromMaybe user (CUUser . _auth_user_id <$> mUser)
Just WSDeauthorize -> do
-- TODO Update my subscriptions!
pure CUPublic
_ -> do
logMsg ioLogger DEBUG "[wsLoop] binary ws messages not supported"
return user
wsLoop' newUser ioLogger
disconnect = do
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] disconnecting..."
_ss <- removeSubscriptionsForWSKey subscriptions ws
-- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss)
return ()
getWSKey :: MonadBase IO m => WS.PendingConnection -> m ByteString
getWSKey pc = do
let reqHead = WS.pendingRequest pc
-- WebSocket specification says that a pending request should send
-- some unique, Sec-WebSocket-Key string. We use this to compare
-- connections (WS.Connection doesn't implement an Eq instance).
let mKey = head $ filter (\(k, _) -> k == "Sec-WebSocket-Key") $ WS.requestHeaders reqHead
let key' = snd $ fromMaybe (panicTrace "Sec-WebSocket-Key not found!") mKey
-- Unfortunately, a single browsers sends the same
-- Sec-WebSocket-Key so we want to make that even more unique.
uuid <- liftBase $ UUID.nextRandom
let key = key' <> "-" <> show uuid
-- liftBase $ putText $ show $ WS.requestHeaders reqHead
pure key
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment