[websockets] named routes compiles now, but /ws endpoint not reachable...

parent e67a7435
Pipeline #6218 failed with stages
in 66 minutes and 17 seconds
...@@ -37,7 +37,7 @@ import Gargantext.API.Admin.Types ...@@ -37,7 +37,7 @@ import Gargantext.API.Admin.Types
import Gargantext.API.Errors.Types import Gargantext.API.Errors.Types
import Gargantext.API.Job import Gargantext.API.Job
import Gargantext.API.Prelude (GargM, IsGargServer) import Gargantext.API.Prelude (GargM, IsGargServer)
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (Dispatcher) import Gargantext.Core.AsyncUpdates.Dispatcher.Types (Dispatcher, HasDispatcher(..))
import Gargantext.Core.Mail.Types (HasMail, mailSettings) import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..)) import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
import Gargantext.Core.NodeStory import Gargantext.Core.NodeStory
...@@ -156,6 +156,9 @@ instance HasMail Env where ...@@ -156,6 +156,9 @@ instance HasMail Env where
instance HasNLPServer Env where instance HasNLPServer Env where
nlpServer = env_nlp nlpServer = env_nlp
instance HasDispatcher Env where
hasDispatcher = env_dispatcher
instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
_env = env_scrapers . Servant.Job.Core._env _env = env_scrapers . Servant.Job.Core._env
......
...@@ -205,7 +205,7 @@ newEnv logger port file = do ...@@ -205,7 +205,7 @@ newEnv logger port file = do
!central_exchange <- forkIO CE.gServer !central_exchange <- forkIO CE.gServer
!dispatcher <- D.dispatcher settings' !dispatcher <- D.dispatcher
{- An 'Env' by default doesn't have strict fields, but when constructing one in production {- An 'Env' by default doesn't have strict fields, but when constructing one in production
we want to force them to WHNF to avoid accumulating unnecessary thunks. we want to force them to WHNF to avoid accumulating unnecessary thunks.
......
...@@ -12,7 +12,7 @@ import Data.Text.Encoding qualified as TE ...@@ -12,7 +12,7 @@ import Data.Text.Encoding qualified as TE
import Data.Version (showVersion) import Data.Version (showVersion)
import Gargantext.API.Admin.Auth (auth, forgotPassword, forgotPasswordAsync) import Gargantext.API.Admin.Auth (auth, forgotPassword, forgotPasswordAsync)
import Gargantext.API.Admin.Auth.Types (AuthContext) import Gargantext.API.Admin.Auth.Types (AuthContext)
import Gargantext.API.Admin.EnvTypes (Env, env_dispatcher) import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Admin.FrontEnd (frontEndServer) import Gargantext.API.Admin.FrontEnd (frontEndServer)
import Gargantext.API.Auth.PolicyCheck () import Gargantext.API.Auth.PolicyCheck ()
import Gargantext.API.Errors import Gargantext.API.Errors
...@@ -62,7 +62,11 @@ server env = ...@@ -62,7 +62,11 @@ server env =
(transformJSONGQL errScheme) (transformJSONGQL errScheme)
GraphQL.api GraphQL.api
, frontendAPI = frontEndServer , frontendAPI = frontEndServer
, wsAPI = Dispatcher.d_ws_server $ env ^. env_dispatcher , wsAPI = hoistServerWithContext
(Proxy :: Proxy (NamedRoutes Dispatcher.WSAPI))
(Proxy :: Proxy AuthContext)
(transformJSON errScheme)
Dispatcher.wsServer
} }
where where
transformJSON :: forall a. GargErrorScheme -> GargM Env BackendInternalError a -> Handler a transformJSON :: forall a. GargErrorScheme -> GargM Env BackendInternalError a -> Handler a
......
...@@ -31,7 +31,7 @@ import DeferredFolds.UnfoldlM qualified as UnfoldlM ...@@ -31,7 +31,7 @@ import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Data.UUID.V4 as UUID import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id)) import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.EnvTypes (env_dispatcher) import Gargantext.API.Admin.EnvTypes (env_dispatcher)
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings) import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings, HasSettings(settings))
import Gargantext.API.Prelude (IsGargServer) import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants import Gargantext.Core.AsyncUpdates.Constants as AUConstants
...@@ -47,6 +47,7 @@ import Servant ...@@ -47,6 +47,7 @@ import Servant
import Servant.API.WebSocket qualified as WS import Servant.API.WebSocket qualified as WS
import Servant.Auth.Server (verifyJWT) import Servant.Auth.Server (verifyJWT)
import Servant.Server.Generic (AsServer, AsServerT) import Servant.Server.Generic (AsServer, AsServerT)
import Servant.Swagger.UI
import StmContainers.Set as SSet import StmContainers.Set as SSet
{- {-
...@@ -59,8 +60,8 @@ Dispatcher is a service, which provides couple of functionalities: ...@@ -59,8 +60,8 @@ Dispatcher is a service, which provides couple of functionalities:
-} -}
dispatcher :: Settings -> IO Dispatcher dispatcher :: IO Dispatcher
dispatcher authSettings = do dispatcher = do
subscriptions <- SSet.newIO subscriptions <- SSet.newIO
-- let server = wsServer authSettings subscriptions -- let server = wsServer authSettings subscriptions
...@@ -104,17 +105,18 @@ removeSubscriptionsForWSKey subscriptions ws = do ...@@ -104,17 +105,18 @@ removeSubscriptionsForWSKey subscriptions ws = do
-- pure ss -- pure ss
newtype WSAPI mode = WSAPI { newtype WSAPI mode = WSAPI {
wsAPI :: mode :- "ws" :> WS.WebSocketPending wsAPIServer :: mode :- "ws" :> Summary "WebSocket endpoint" :> WS.WebSocketPending
} deriving Generic } deriving Generic
-- wsServer :: IsGargServer env err m => Settings -> SSet.Set Subscription -> WSAPI (AsServerT m) -- wsServer :: IsGargServer env err m => Settings -> SSet.Set Subscription -> WSAPI (AsServerT m)
-- wsServer authSettings subscriptions = WSAPI { wsAPI = streamData } -- wsServer authSettings subscriptions = WSAPI { wsAPI = streamData }
wsServer :: IsGargServer env err m => Settings -> WSAPI (AsServerT m) wsServer :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WSAPI (AsServerT m)
wsServer authSettings = WSAPI { wsAPI = streamData } wsServer = WSAPI { wsAPIServer = streamData }
where where
streamData :: IsGargServer env err m => WS.PendingConnection -> m () streamData :: ( IsGargServer env err m, HasDispatcher env, HasSettings env ) => WS.PendingConnection -> m ()
streamData pc = do streamData pc = do
d <- view env_dispatcher authSettings <- view settings
d <- view hasDispatcher
let subscriptions = d_subscriptions d let subscriptions = d_subscriptions d
let reqHead = WS.pendingRequest pc let reqHead = WS.pendingRequest pc
-- WebSocket specification says that a pending request should send -- WebSocket specification says that a pending request should send
...@@ -129,7 +131,7 @@ wsServer authSettings = WSAPI { wsAPI = streamData } ...@@ -129,7 +131,7 @@ wsServer authSettings = WSAPI { wsAPI = streamData }
liftBase $ putText $ show $ WS.requestHeaders reqHead liftBase $ putText $ show $ WS.requestHeaders reqHead
c <- liftBase $ WS.acceptRequest pc c <- liftBase $ WS.acceptRequest pc
let ws = WSKeyConnection (key, c) let ws = WSKeyConnection (key, c)
_ <- liftBase $ Async.concurrently (wsLoop subscriptions ws) (pingLoop ws) _ <- liftBase $ Async.concurrently (wsLoop authSettings subscriptions ws) (pingLoop ws)
-- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws) -- _ <- liftIO $ Async.withAsync (pure ()) (\_ -> wsLoop ws)
pure () pure ()
...@@ -144,7 +146,7 @@ wsServer authSettings = WSAPI { wsAPI = streamData } ...@@ -144,7 +146,7 @@ wsServer authSettings = WSAPI { wsAPI = streamData }
WS.sendPing (wsConn ws) ("" :: Text) WS.sendPing (wsConn ws) ("" :: Text)
threadDelay $ 10 * 1000000 threadDelay $ 10 * 1000000
wsLoop subscriptions ws = flip finally disconnect $ do wsLoop authSettings subscriptions ws = flip finally disconnect $ do
putText "[wsLoop] connecting" putText "[wsLoop] connecting"
wsLoop' CUPublic wsLoop' CUPublic
......
{-|
Module : Gargantext.Core.AsyncUpdates.Dispatcher.Types
Description : Dispatcher (handles websocket connections, accepts message from central exchange)
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-}
module Gargantext.Core.AsyncUpdates.Dispatcher.Types where
import Control.Concurrent.Async qualified as Async
import Control.Lens (Getter, view)
import Data.Aeson ((.:), (.=))
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL
import Data.List (nubBy)
import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg
import Network.WebSockets qualified as WS
import Protolude.Base (Show(showsPrec))
import Servant
-- import Servant.API.NamedRoutes ((:-))
import Servant.API.WebSocket qualified as WS
import Servant.Auth.Server (verifyJWT)
import Servant.Server.Generic (AsServer, AsServerT)
import StmContainers.Set as SSet
data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
-- UpdateJob JobID
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
UpdateTree NodeId
deriving (Eq, Show)
instance Hashable Topic where
hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId)
instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_tree" -> do
node_id <- o .: "node_id"
pure $ UpdateTree node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON Topic where
toJSON (UpdateTree node_id) = Aeson.object [
"type" .= toJSON ("update_tree" :: Text)
, "node_id" .= toJSON node_id
]
data ConnectedUser =
CUUser UserId
| CUPublic
deriving (Eq, Show)
instance Hashable ConnectedUser where
hashWithSalt salt (CUUser userId) = hashWithSalt salt ("cuuser" :: Text, userId)
hashWithSalt salt CUPublic = hashWithSalt salt ("cupublic" :: Text)
newtype WSKeyConnection = WSKeyConnection (ByteString, WS.Connection)
instance Hashable WSKeyConnection where
hashWithSalt salt (WSKeyConnection (key, _conn)) = hashWithSalt salt key
instance Eq WSKeyConnection where
(==) (WSKeyConnection (key1, _conn1)) (WSKeyConnection (key2, _conn2)) = key1 == key2
instance Show WSKeyConnection where
showsPrec d (WSKeyConnection (key, _conn)) = showsPrec d $ "WSKeyConnection " <> key
showWSKeyConnection :: WSKeyConnection -> Text
showWSKeyConnection ws = "WSKeyConnection " <> show (wsKey ws)
wsKey :: WSKeyConnection -> ByteString
wsKey (WSKeyConnection (key, _conn)) = key
wsConn :: WSKeyConnection -> WS.Connection
wsConn (WSKeyConnection (_key, conn)) = conn
data Subscription =
Subscription {
s_connected_user :: ConnectedUser
, s_ws_key_connection :: WSKeyConnection
, s_topic :: Topic }
deriving (Eq, Show)
instance Hashable Subscription where
hashWithSalt salt (Subscription { .. }) =
hashWithSalt salt ( s_connected_user, s_ws_key_connection, s_topic )
subKey :: Subscription -> ByteString
subKey sub = wsKey $ s_ws_key_connection sub
type Token = Text
{-
We accept requests for subscription/unsubscription via websocket.
We could instead handle 1 websocket connection per every topic
subscription (e.g. parse headers in WS.PendingConnection. However, WS
by default can handle 65k concurrent connections. With multiple users
having multiple components open, we could exhaust that limit quickly.
Hence, we architect this to have 1 websocket connection per web
browser.
-}
data WSRequest =
WSSubscribe Topic
| WSUnsubscribe Topic
| WSAuthorize Token
| WSDeauthorize
deriving (Eq, Show)
instance FromJSON WSRequest where
parseJSON = Aeson.withObject "WSRequest" $ \o -> do
request <- o .: "request"
case request of
"subscribe" -> do
topic <- o .: "topic"
pure $ WSSubscribe topic
"unsubscribe" -> do
topic <- o .: "topic"
pure $ WSUnsubscribe topic
"authorize" -> do
token <- o .: "token"
pure $ WSAuthorize token
"deauthorize" -> pure $ WSDeauthorize
s -> prependFailure "parsing request type failed, " (typeMismatch "request" s)
data Dispatcher =
Dispatcher { d_subscriptions :: SSet.Set Subscription
-- , d_ws_server :: WSAPI AsServer
, d_ce_listener :: ThreadId
}
class HasDispatcher env where
hasDispatcher :: Getter env Dispatcher
data Notification =
Notification Topic
deriving (Eq, Show)
instance ToJSON Notification where
toJSON (Notification topic) = Aeson.object [
"notification" .= toJSON topic
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment