{-| Module : Gargantext.Core.Notifications.Dispatcher.WebSocket Description : Dispatcher websocket server Copyright : (c) CNRS, 2017-Present License : AGPL + CECILL v3 Maintainer : team@gargantext.org Stability : experimental Portability : POSIX https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341 Docs: https://dev.sub.gargantext.org/#/share/Notes/187918 -} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeOperators #-} module Gargantext.Core.Notifications.Dispatcher.WebSocket ( -- * Types WSAPI(..) -- * Functions , wsServer ) where import Control.Concurrent.Async qualified as Async import Control.Exception.Safe qualified as Exc import Control.Lens (view) import Data.Aeson qualified as Aeson import Data.UUID.V4 as UUID import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id)) import Gargantext.API.Prelude (IsGargServer) import Gargantext.Core.Notifications.Dispatcher.Subscriptions import Gargantext.Core.Notifications.Dispatcher.Types import Gargantext.Core.Notifications.Dispatcher (Dispatcher, dispatcherSubscriptions) import Gargantext.Core.Config (HasJWTSettings(jwtSettings), HasConfig (..), LogConfig, gc_logging) import Gargantext.Prelude import Gargantext.System.Logging (LogLevel(..), logMsg, withLogger, logM) import Network.WebSockets qualified as WS import Servant import Servant.API.WebSocket qualified as WS (WebSocketPending) import Servant.Auth.Server (JWTSettings, verifyJWT) import Servant.Server.Generic (AsServerT) import StmContainers.Set as SSet newtype WSAPI mode = WSAPI { wsAPIServer :: mode :- "ws" :> Summary "WebSocket endpoint" :> WS.WebSocketPending } deriving Generic wsServer :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env, Exc.MonadCatch m ) => WSAPI (AsServerT m) wsServer = WSAPI { wsAPIServer = streamData } where -- NOTE Exc.catches is required by tests, otherwise disconnectin -- via ws doesn't work. But it does work "normally" when the -- server is running... streamData :: ( IsGargServer env err m, HasDispatcher env Dispatcher, HasJWTSettings env, Exc.MonadCatch m ) => WS.PendingConnection -> m () streamData pc = Exc.catches (do jwtS <- view jwtSettings log_cfg <- view (hasConfig . gc_logging) d <- view hasDispatcher let subscriptions = dispatcherSubscriptions d key <- getWSKey log_cfg pc c <- liftBase $ WS.acceptRequest pc let ws = WSKeyConnection (key, c) _ <- liftBase $ Async.concurrently (wsLoop log_cfg jwtS subscriptions ws) (pingLoop ws) -- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws) pure () ) [ Exc.Handler $ \(err :: WS.ConnectionException) -> case err of WS.ConnectionClosed -> logM DEBUG $ "[wsServer] connection closed" WS.CloseRequest _ _ -> logM DEBUG $ "[wsServer] close request" _ -> do logM ERROR $ "[wsServer] error: " <> show err Exc.throw err ] -- | Send a ping control frame periodically, otherwise the -- | connection is dropped. NOTE that 'onPing' message is not -- | supported in the JS API: either the browser supports this or -- | not: -- | https://stackoverflow.com/questions/10585355/sending-websocket-ping-pong-frame-from-browser pingLoop :: WSKeyConnection -> IO () pingLoop ws = do forever $ do -- WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode Ping) Nothing) WS.sendPing (wsConn ws) ("" :: Text) threadDelay $ 10 * 1000000 wsLoop :: LogConfig -> JWTSettings -> SSet.Set Subscription -> WSKeyConnection -> IO a wsLoop log_cfg jwtS subscriptions ws = flip finally disconnect $ do withLogger log_cfg $ \ioLogger -> do logMsg ioLogger DEBUG "[wsLoop] connecting" wsLoop' CUPublic ioLogger where wsLoop' user ioLogger = do dm <- WS.receiveDataMessage (wsConn ws) newUser <- case dm of WS.Text dm' _ -> do case Aeson.decode dm' of Nothing -> do logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm' return user Just (WSSubscribe topic) -> do logMsg ioLogger DEBUG $ "[wsLoop'] subscribe topic " <> show topic -- TODO Fix s_connected_user based on header let sub = Subscription { s_connected_user = user , s_ws_key_connection = ws , s_topic = topic } _ss <- insertSubscription subscriptions sub -- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss) return user Just (WSUnsubscribe topic) -> do logMsg ioLogger DEBUG $ "[wsLoop'] unsubscribe topic " <> show topic let sub = Subscription { s_connected_user = user , s_ws_key_connection = ws , s_topic = topic } _ss <- removeSubscription subscriptions sub -- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss) return user Just (WSAuthorize token) -> do mUser <- liftBase $ verifyJWT jwtS (encodeUtf8 token) logMsg ioLogger DEBUG $ "[wsLoop] authorized user: " <> show mUser -- TODO Update my subscriptions! return $ fromMaybe user (CUUser . _auth_user_id <$> mUser) Just WSDeauthorize -> do -- TODO Update my subscriptions! pure CUPublic _ -> do logMsg ioLogger DEBUG "[wsLoop] binary ws messages not supported" return user wsLoop' newUser ioLogger disconnect = do withLogger log_cfg $ \ioLogger -> do logMsg ioLogger DEBUG "[wsLoop] disconnecting..." _ss <- removeSubscriptionsForWSKey subscriptions ws -- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss) return () getWSKey :: MonadBase IO m => LogConfig -> WS.PendingConnection -> m ByteString getWSKey log_cfg pc = do let reqHead = WS.pendingRequest pc -- WebSocket specification says that a pending request should send -- some unique, Sec-WebSocket-Key string. We use this to compare -- connections (WS.Connection doesn't implement an Eq instance). liftBase $ withLogger log_cfg $ \ioLogger -> do logMsg ioLogger DEBUG $ "[wsLoop, getWSKey] headers: " <> show (WS.requestHeaders reqHead) let mKey = head $ filter (\(k, _) -> k == "Sec-WebSocket-Key") $ WS.requestHeaders reqHead let key' = snd $ fromMaybe (panicTrace "Sec-WebSocket-Key not found!") mKey -- Unfortunately, a single browsers sends the same -- Sec-WebSocket-Key so we want to make that even more unique. uuid <- liftBase $ UUID.nextRandom let key = key' <> "-" <> show uuid pure key