[websockets] implement CE typeclass, fixes to haskell-nanomsg

parent ea87bb15
......@@ -25,14 +25,16 @@ import Options.Applicative
data Command =
Server
CEServer
| SimpleServer
| WSServer
| Client
parser :: Parser (IO ())
parser = subparser
( command "server" (info (pure gServer) idm)
( command "ce-server" (info (pure gServer) idm)
<> command "simple-server" (info (pure simpleServer) idm)
<> command "ws-server" (info (pure wsServer) idm)
<> command "client" (info (pure gClient) idm) )
......@@ -40,10 +42,23 @@ parser = subparser
main :: IO ()
main = join $ execParser (info parser idm)
simpleServer :: IO ()
simpleServer = do
withSocket Pull $ \s -> do
_ <- bind s "tcp://*:5560"
putText "[simpleServer] receiving"
forever $ do
mr <- recvMalloc s 1024
C.putStrLn mr
-- case mr of
-- Nothing -> pure ()
-- Just r -> C.putStrLn r
-- threadDelay 10000
wsServer :: IO ()
wsServer = do
withSocket Pull $ \ws -> do
_ <- connect ws "ws://localhost:5566"
_ <- bind ws "ws://*:5560"
forever $ do
putText "[wsServer] receiving"
r <- recv ws
......
......@@ -169,7 +169,7 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/garganscript/nanomsg-haskell
tag: 5e4e119881d81b8a8f77a79b3caaebb1bb304790
tag: 23be4130804d86979eaee5caffe323a1c7f2b0d6
-- source-repository-package
-- type: git
......
......@@ -172,6 +172,7 @@ library
Gargantext.Core.AsyncUpdates.Constants
Gargantext.Core.AsyncUpdates.Dispatcher
Gargantext.Core.AsyncUpdates.Dispatcher.Types
Gargantext.Core.AsyncUpdates.Nanomsg
Gargantext.Core.Mail.Types
Gargantext.Core.Methods.Similarities
Gargantext.Core.Methods.Similarities.Conditional
......@@ -294,6 +295,7 @@ library
Gargantext.API.GraphQL.Team
Gargantext.API.GraphQL.TreeFirstLevel
Gargantext.API.GraphQL.Types
Gargantext.API.GraphQL.UnPrefix
Gargantext.API.GraphQL.User
Gargantext.API.GraphQL.UserInfo
Gargantext.API.GraphQL.Utils
......
......@@ -37,6 +37,8 @@ import Gargantext.API.Admin.Types
import Gargantext.API.Errors.Types
import Gargantext.API.Job
import Gargantext.API.Prelude (GargM, IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET
import Gargantext.Core.AsyncUpdates.Dispatcher.Types (Dispatcher, HasDispatcher(..))
import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.NLP (NLPServerMap, HasNLPServer(..))
......@@ -168,6 +170,9 @@ instance HasJobEnv Env JobLog JobLog where
instance Jobs.MonadJob (GargM Env err) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs)
instance CET.HasCentralExchangeNotification Env where
ce_notify m = liftBase $ CE.notify m
-- | The /concrete/ 'JobHandle' in use with our 'GargM' (production) monad. Its
-- constructor it's not exported, to not leak internal details of its implementation.
data ConcreteJobHandle err =
......@@ -187,8 +192,15 @@ mkJobHandle jId = JobHandle jId
-- | Updates the status of a 'JobHandle' by using the input 'updateJobStatus' function.
updateJobProgress :: ConcreteJobHandle err -> (JobLog -> JobLog) -> GargM Env err ()
updateJobProgress ConcreteNullHandle _ = pure ()
updateJobProgress hdl@(JobHandle _ logStatus) updateJobStatus =
Jobs.getLatestJobStatus hdl >>= logStatus . updateJobStatus
updateJobProgress hdl@(JobHandle jId logStatus) updateJobStatus = do
jobLog <- Jobs.getLatestJobStatus hdl
let jobLogNew = updateJobStatus jobLog
logStatus jobLogNew
CET.ce_notify $ CET.UpdateJobProgress jId jobLogNew
-- mJob <- Jobs.findJob jId
-- case mJob of
-- Nothing -> pure ()
-- Just job -> liftBase $ CE.ce_notify $ CET.UpdateJobProgress jId job
instance Jobs.MonadJobStatus (GargM Env err) where
......
......@@ -11,7 +11,7 @@ import Data.Morpheus.Types ( GQLType, typeOptions )
import Data.Proxy
import Data.Swagger hiding (URL, url, port)
import GHC.Generics hiding (to)
import Gargantext.API.GraphQL.Utils qualified as GQLU
import Gargantext.API.GraphQL.UnPrefix qualified as GQLU
import Gargantext.Core.Types (TODO(..))
import Gargantext.Prelude
import Servant
......
{-|
Module : Gargantext.API.GraphQL.UnPrefix
Description : Un-prefix for GraphQL API
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# OPTIONS_GHC -Wno-deprecations #-} -- FIXME(adn) GraphQL will need updating.
module Gargantext.API.GraphQL.UnPrefix where
import Data.Morpheus.Types (GQLTypeOptions, fieldLabelModifier)
import Data.Text qualified as T
import Gargantext.Core.Utils.Prefix (unCapitalize, dropPrefix)
import Gargantext.Prelude
unPrefix :: T.Text -> GQLTypeOptions -> GQLTypeOptions
unPrefix prefix options = options { fieldLabelModifier = nflm }
where
nflm label = unCapitalize $ dropPrefix (T.unpack prefix) $ ( fieldLabelModifier options ) label
......@@ -13,21 +13,13 @@ Portability : POSIX
module Gargantext.API.GraphQL.Utils where
import Control.Lens (view)
import Data.Morpheus.Types (GQLTypeOptions, fieldLabelModifier)
import Data.Text qualified as T
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser (..), auth_node_id)
import Gargantext.API.Admin.Types (jwtSettings, HasSettings (settings))
import Gargantext.Core.Utils.Prefix (unCapitalize, dropPrefix)
import Gargantext.Database.Admin.Types.Node (NodeId)
import Gargantext.Database.Prelude (Cmd')
import Gargantext.Prelude
import Servant.Auth.Server (verifyJWT, JWTSettings)
unPrefix :: T.Text -> GQLTypeOptions -> GQLTypeOptions
unPrefix prefix options = options { fieldLabelModifier = nflm }
where
nflm label = unCapitalize $ dropPrefix (T.unpack prefix) $ ( fieldLabelModifier options ) label
data AuthStatus = Valid | Invalid
authUser :: (HasSettings env) => NodeId -> Text -> Cmd' env err AuthStatus
......
......@@ -28,7 +28,6 @@ import Gargantext.API.Errors.Types
import Gargantext.API.Node.New.Types
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Node qualified as Named
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Database.Action.Flow.Types
import Gargantext.Database.Action.Node
......@@ -50,9 +49,8 @@ postNode authenticatedUser pId (PostNode nodeName nt) = do
let userId = authenticatedUser ^. auth_user_id
nodeIds <- mkNodeWithParent nt (Just pId) userId nodeName
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
CE.notify $ CE.UpdateTreeFirstLevel pId
-- mapM_ (CE.ce_notify . CE.UpdateTreeFirstLevel) nodeIds
CE.ce_notify $ CE.UpdateTreeFirstLevel pId
return nodeIds
......@@ -66,7 +64,7 @@ postNodeAsyncAPI authenticatedUser nId = Named.PostNodeAsyncAPI $ AsyncJobs $
serveJobsAPI NewNodeJob $ \jHandle p -> postNodeAsync authenticatedUser nId p jHandle
------------------------------------------------------------------------
postNodeAsync :: (FlowCmdM env err m, MonadJobStatus m)
postNodeAsync :: (FlowCmdM env err m, MonadJobStatus m, CE.HasCentralExchangeNotification env)
=> AuthenticatedUser
-- ^ The logged in user
-> NodeId
......@@ -85,8 +83,7 @@ postNodeAsync authenticatedUser nId (PostNode nodeName tn) jobHandle = do
let userId = authenticatedUser ^. auth_user_id
_nodeIds <- mkNodeWithParent tn (Just nId) userId nodeName
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
CE.notify $ CE.UpdateTreeFirstLevel nId
-- mapM_ (CE.ce_notify . CE.UpdateTreeFirstLevel) nodeIds
CE.ce_notify $ CE.UpdateTreeFirstLevel nId
markComplete jobHandle
......@@ -20,6 +20,7 @@ import Data.Text qualified as Text
import Gargantext.API.Node.Share.Types
import Gargantext.API.Prelude
import Gargantext.API.Routes.Named.Share qualified as Named
import Gargantext.Core.AsyncUpdates.CentralExchange.Types (HasCentralExchangeNotification)
import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Core.Types.Individu (User(..), arbitraryUsername)
import Gargantext.Database.Action.Share (ShareNodeWith(..))
......@@ -37,7 +38,10 @@ import Servant.Server.Generic (AsServerT)
-- TODO permission
-- TODO refactor userId which is used twice
-- TODO change return type for better warning/info/success/error handling on the front
api :: (HasNodeError err, HasNLPServer env, CmdRandom env err m)
api :: ( HasNodeError err
, HasNLPServer env
, CmdRandom env err m
, HasCentralExchangeNotification env )
=> User
-> NodeId
-> ShareNodeParams
......
......@@ -26,6 +26,7 @@ import Gargantext.API.Admin.Auth.Types (AuthenticationError)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.Types
import Gargantext.API.Errors.Class
import Gargantext.Core.AsyncUpdates.CentralExchange.Types (HasCentralExchangeNotification)
import Gargantext.Core.Mail.Types (HasMail)
import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Core.NodeStory
......@@ -53,6 +54,7 @@ type EnvC env =
, HasNodeStoryEnv env
, HasMail env
, HasNLPServer env
, HasCentralExchangeNotification env
)
type ErrC err =
......
......@@ -18,57 +18,82 @@ module Gargantext.Core.AsyncUpdates.CentralExchange where
-- import Control.Concurrent (threadDelay)
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan
import Data.Aeson qualified as Aeson
import Data.ByteString.Char8 qualified as C
import Data.ByteString.Lazy qualified as BSL
import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (cePort, dispatcherInternalPort)
-- import Gargantext.Core.AsyncUpdates.Nanomsg (withSafeSocket)
import Gargantext.Prelude
import Nanomsg
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
{-
Central exchange is a service, which gathers messages from various
places and informs the Dispatcher (which will then inform users about
various events).
The primary goal is to be able to read as many messages as possible
and then send them to the Dispatcher. Although nanomsg does some
message buffering, we don't want these messages to pile up, especially
with many users having updates.
-}
gServer :: IO ()
gServer = do
withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do
_ <- bind s ("tcp://*:" <> show cePort)
_ <- connect s_dispatcher ("tcp://localhost:" <> show dispatcherInternalPort)
forever $ do
putText "[central_exchange] receiving"
r <- recv s
C.putStrLn r
case Aeson.decode (BSL.fromStrict r) of
Just (UpdateTreeFirstLevel node_id) -> do
putText $ "[central_exchange] update tree: " <> show node_id
putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
-- NOTE: If we're flooded with messages, and send is
-- slow, we might be spawning many threads...
withSocket Pull $ \s -> do
withSocket Push $ \s_dispatcher -> do
_ <- bind s ("tcp://*:" <> show cePort)
_ <- connect s_dispatcher ("tcp://localhost:" <> show dispatcherInternalPort)
-- NOTE: Currently we just forward the message that we
-- got. So in theory central exchange isn't needed (we
-- could ping dispatcher directly). However, I think
-- it's better to have this as a separate
-- component. Currently I built this inside
-- gargantext-server but maybe it can be a separate
-- process, independent of the server.
Async.withAsync (pure ()) $ \_ -> do
send s_dispatcher r
_ -> putText "[central_exchange] unknown"
tChan <- TChan.newTChanIO
-- | We have 2 threads: one that listens for nanomsg messages
-- | and puts them on the 'tChan' and the second one that reads
-- | the 'tChan' and calls Dispatcher accordingly. This is to
-- | make reading nanomsg as fast as possible.
void $ Async.concurrently (worker s_dispatcher tChan) $ do
forever $ do
-- putText "[central_exchange] receiving"
r <- recvMalloc s 1024
C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
where
worker s_dispatcher tChan = do
forever $ do
r <- atomically $ TChan.readTChan tChan
case Aeson.decode (BSL.fromStrict r) of
Just ujp@(UpdateJobProgress _jId _jobLog) -> do
putText $ "[central_exchange] " <> show ujp
-- send the same message that we received
send s_dispatcher r
Just (UpdateTreeFirstLevel node_id) -> do
putText $ "[central_exchange] update tree: " <> show node_id
-- putText $ "[central_exchange] sending that to the dispatcher: " <> show node_id
-- To make this more robust, use withAsync so we don't
-- block the main thread (send is blocking)
-- NOTE: If we're flooded with messages, and send is
-- slow, we might be spawning many threads...
-- NOTE: Currently we just forward the message that we
-- got. So in theory central exchange isn't needed (we
-- could ping dispatcher directly). However, I think
-- it's better to have this as a separate
-- component. Currently I built this inside
-- gargantext-server but maybe it can be a separate
-- process, independent of the server.
-- send the same message that we received
send s_dispatcher r
_ -> putText $ "[central_exchange] unknown message"
notify :: CEMessage -> IO ()
notify ceMessage = do
withSocket Push $ \s -> do
_ <- connect s ("tcp://localhost:" <> show cePort)
let str = Aeson.encode ceMessage
send s $ BSL.toStrict str
Async.withAsync (pure ()) $ \_ -> do
withSocket Push $ \s -> do
_ <- connect s ("tcp://localhost:" <> show cePort)
let str = Aeson.encode ceMessage
send s $ BSL.toStrict str
......@@ -15,10 +15,18 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.AsyncUpdates.CentralExchange.Types where
import Codec.Binary.UTF8.String qualified as CBUTF8
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson qualified as Aeson
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.ByteString.Lazy qualified as BSL
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.Core.Types (NodeId)
import Gargantext.Prelude
-- import Gargantext.Utils.Jobs.Map qualified as JM
import Prelude qualified
import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobID)
{-
......@@ -30,21 +38,37 @@ various events).
-- INTERNAL MESSAGES
data CEMessage =
UpdateTreeFirstLevel NodeId
deriving (Show, Eq)
-- UpdateJobProgress (JobID 'Safe) (JM.JobEntry (JobID 'Safe) (Seq JobLog) JobLog)
UpdateJobProgress (JobID 'Safe) JobLog
| UpdateTreeFirstLevel NodeId
-- deriving (Eq)
instance Prelude.Show CEMessage where
show (UpdateJobProgress jId jobLog) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId) <> " " <> show jobLog
show (UpdateTreeFirstLevel nodeId) = "UpdateTreeFirstLevel " <> show nodeId
instance FromJSON CEMessage where
parseJSON = withObject "CEMessage" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_job_progress" -> do
jId <- o .: "j_id"
jobLog <- o .: "job_log"
pure $ UpdateJobProgress jId jobLog
"update_tree_first_level" -> do
node_id <- o .: "node_id"
pure $ UpdateTreeFirstLevel node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON CEMessage where
toJSON (UpdateJobProgress jId jobLog) = object [
"type" .= toJSON ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
, "job_log" .= toJSON jobLog
]
toJSON (UpdateTreeFirstLevel node_id) = object [
"type" .= toJSON ("update_tree_first_level" :: Text)
, "node_id" .= toJSON node_id
]
class HasCentralExchangeNotification env where
ce_notify :: (MonadReader env m, MonadBase IO m) => CEMessage -> m ()
......@@ -20,6 +20,7 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.AsyncUpdates.Dispatcher where
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM.TChan qualified as TChan
import Control.Lens (view)
import Data.Aeson ((.:), (.=))
import Data.Aeson qualified as Aeson
......@@ -38,8 +39,9 @@ import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude
-- import Gargantext.Utils.Jobs.Monad (MonadJobStatus(getLatestJobStatus))
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg
import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
import Network.WebSockets qualified as WS
import Protolude.Base (Show(showsPrec))
import Servant
......@@ -158,7 +160,7 @@ wsServer = WSAPI { wsAPIServer = streamData }
WS.Text dm' _ -> do
case Aeson.decode dm' of
Nothing -> do
putText "[wsLoop] unknown message"
putText $ "[wsLoop] unknown message: " <> show dm'
return user
Just (WSSubscribe topic) -> do
-- TODO Fix s_connected_user based on header
......@@ -199,37 +201,66 @@ wsServer = WSAPI { wsAPIServer = streamData }
-- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss)
return ()
-- | This is a nanomsg socket listener. We want to read the messages
-- | as fast as possible and then process them gradually in a separate
-- | thread.
dispatcher_listener :: SSet.Set Subscription -> IO ()
dispatcher_listener subscriptions = do
withSocket Pull $ \s -> do
_ <- bind s ("tcp://*:" <> show AUConstants.dispatcherInternalPort)
forever $ do
putText "[ce_listener] receiving"
r <- recv s
C.putStrLn r
case Aeson.decode (BSL.fromStrict r) of
Nothing -> putText "[ce_listener] unknown message from central exchange"
Just ceMessage -> do
-- subs <- atomically $ readTVar subscriptions
filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs'
-- NOTE This isn't safe: we atomically fetch subscriptions,
-- then send notifications one by one. In the meantime, a
-- subscription could end or new ones could appear (but is
-- this really a problem? I new subscription comes up, then
-- probably they already fetch new tree anyways, and if old
-- one drops in the meantime, it won't listen to what we
-- send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification ceMessage) filteredSubs
tChan <- TChan.newTChanIO
-- NOTE I'm not sure that we need more than 1 worker here, but in
-- theory, the worker can perform things like user authentication,
-- DB queries etc so it can be slow sometimes.
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker s tChan) $ do
forever $ do
-- putText "[dispatcher_listener] receiving"
r <- recvMalloc s 1024
-- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r
where
worker s tChan = do
-- tId <- myThreadId
forever $ do
r <- atomically $ TChan.readTChan tChan
-- putText $ "[" <> show tId <> "] received a message: " <> decodeUtf8 r
case Aeson.decode (BSL.fromStrict r) of
Nothing -> putText "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do
-- putText $ "[dispatcher_listener] received message: " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions
filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs'
-- NOTE This isn't safe: we atomically fetch subscriptions,
-- then send notifications one by one. In the meantime, a
-- subscription could end or new ones could appear (but is
-- this really a problem? I new subscription comes up, then
-- probably they already fetch new tree anyways, and if old
-- one drops in the meantime, it won't listen to what we
-- send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification ceMessage) filteredSubs
sendNotification :: CETypes.CEMessage -> Subscription -> IO ()
sendNotification ceMessage sub = do
let ws = s_ws_key_connection sub
-- send the same thing to everyone for now
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode $ Notification $ s_topic sub) Nothing)
let topic = s_topic sub
notification <-
case ceMessage of
CETypes.UpdateJobProgress jId jobLog -> do
-- js <- getLatestJobStatus jId
-- putText $ "[sendNotification] latestJobStatus" js
pure $ Notification topic (MJobProgress jobLog)
CETypes.UpdateTreeFirstLevel nodeId -> pure $ Notification topic MEmpty
-- TODO send the same thing to everyone for now, this should be
-- converted to notifications
WS.sendDataMessage (wsConn ws) (WS.Text (Aeson.encode notification) Nothing)
-- Custom filtering of list of Subscriptions based on
......@@ -240,5 +271,7 @@ filterCEMessageSubs :: CETypes.CEMessage -> [Subscription] -> [Subscription]
filterCEMessageSubs ceMessage subscriptions = filter (ceMessageSubPred ceMessage) subscriptions
ceMessageSubPred :: CETypes.CEMessage -> Subscription -> Bool
ceMessageSubPred (CETypes.UpdateJobProgress jId _jobLog) (Subscription { s_topic }) =
s_topic == UpdateJobProgress jId
ceMessageSubPred (CETypes.UpdateTreeFirstLevel node_id) (Subscription { s_topic }) =
s_topic == UpdateTree node_id
......@@ -19,6 +19,7 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
module Gargantext.Core.AsyncUpdates.Dispatcher.Types where
import Codec.Binary.UTF8.String qualified as CBUTF8
import Control.Concurrent.Async qualified as Async
import Control.Lens (Getter, view)
import Data.Aeson ((.:), (.=))
......@@ -30,6 +31,7 @@ import Data.List (nubBy)
import DeferredFolds.UnfoldlM qualified as UnfoldlM
import Data.UUID.V4 as UUID
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Admin.Types (jwtSettings, Settings, jwtSettings)
import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CETypes
......@@ -39,39 +41,73 @@ import Gargantext.Prelude
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg
import Network.WebSockets qualified as WS
import Prelude qualified
import Protolude.Base (Show(showsPrec))
import Servant
-- import Servant.API.NamedRoutes ((:-))
import Servant.API.WebSocket qualified as WS
import Servant.Auth.Server (verifyJWT)
import Servant.Job.Core (Safety(Safe))
import Servant.Job.Types (JobID)
import Servant.Server.Generic (AsServer, AsServerT)
import StmContainers.Set as SSet
-- | A topic is sent, when a client wants to subscribe to specific
-- | types of notifications
data Topic =
-- | Update given Servant Job (we currently send a request every
-- | second to get job status).
-- UpdateJob JobID
UpdateJobProgress (JobID 'Safe)
-- | Given parent node id, trigger update of the node and its
-- children (e.g. list is automatically created in a corpus)
UpdateTree NodeId
deriving (Eq, Show)
| UpdateTree NodeId
deriving (Eq)
instance Prelude.Show Topic where
show (UpdateJobProgress jId) = "UpdateJobProgress " <> (CBUTF8.decode $ BSL.unpack $ Aeson.encode jId)
show (UpdateTree nodeId) = "UpdateTree " <> show nodeId
instance Hashable Topic where
hashWithSalt salt (UpdateJobProgress jId) = hashWithSalt salt ("update-job-progress" :: Text, Aeson.encode jId)
hashWithSalt salt (UpdateTree nodeId) = hashWithSalt salt ("update-tree" :: Text, nodeId)
instance FromJSON Topic where
parseJSON = Aeson.withObject "Topic" $ \o -> do
type_ <- o .: "type"
case type_ of
"update_job_progress" -> do
jId <- o .: "j_id"
pure $ UpdateJobProgress jId
"update_tree" -> do
node_id <- o .: "node_id"
pure $ UpdateTree node_id
s -> prependFailure "parsing type failed, " (typeMismatch "type" s)
instance ToJSON Topic where
toJSON (UpdateJobProgress jId) = Aeson.object [
"type" .= toJSON ("update_job_progress" :: Text)
, "j_id" .= toJSON jId
]
toJSON (UpdateTree node_id) = Aeson.object [
"type" .= toJSON ("update_tree" :: Text)
, "node_id" .= toJSON node_id
]
-- | A message to be sent inside a Notification
data Message =
MJobProgress JobLog
| MEmpty
deriving (Eq)
instance Prelude.Show Message where
show (MJobProgress jobProgress) = "MJobProgress " <> show jobProgress
show MEmpty = "MEmpty"
instance ToJSON Message where
toJSON (MJobProgress jobProgress) = Aeson.object [
"type" .= toJSON ("MJobProgress" :: Text)
, "job_progress" .= toJSON jobProgress
]
toJSON MEmpty = Aeson.object [
"type" .= toJSON ("MEmpty" :: Text)
]
data ConnectedUser =
CUUser UserId
......@@ -155,13 +191,16 @@ class HasDispatcher env where
hasDispatcher :: Getter env Dispatcher
-- | A notification is sent to clients who subscribed to specific topics
data Notification =
Notification Topic
Notification Topic Message
deriving (Eq, Show)
instance ToJSON Notification where
toJSON (Notification topic) = Aeson.object [
"notification" .= toJSON topic
toJSON (Notification topic message) = Aeson.object [
"notification" .= toJSON (Aeson.object [
"topic" .= toJSON topic
, "message" .= toJSON message
])
]
{-|
Module : Gargantext.Core.AsyncUpdates.Nanomsg
Description : Nanomsg utils
Copyright : (c) CNRS, 2017-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918
-}
module Gargantext.Core.AsyncUpdates.Nanomsg where
import Gargantext.Prelude
import Nanomsg
withSafeSocket :: SocketType a => Text -> a -> (Socket a -> IO c) -> IO c
withSafeSocket socketName t =
bracket onOpen onClose
where
onOpen = do
s <- socket t
setRcvBuf s 1
setSndBuf s 1
rcvBufInt <- rcvBuf s
sndBufInt <- sndBuf s
putText $ "[" <> socketName <> "] rcvBuf: " <> show rcvBufInt <> ", sndBuf: " <> show sndBufInt
pure s
onClose s = do
close s
panicTrace $ "[withSafeSocket] " <> socketName <> " closed"
......@@ -20,8 +20,7 @@ module Gargantext.Database.Action.Delete
import Control.Lens (view)
import Data.Text (unpack)
import Gargantext.Core (HasDBid(..))
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types (ce_notify, CEMessage(..))
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Database.Action.Share (delFolderTeam)
import Gargantext.Database.Action.User (getUserId)
......@@ -60,11 +59,10 @@ deleteNode u nodeId = do
_ -> N.deleteNode nodeId
-- | Node was deleted, refresh its parent (if exists)
liftBase $ do
-- mapM_ (CE.notify . CE.UpdateTreeFirstLevel) nodeIds
case view node_parent_id node' of
Nothing -> return ()
Just pId -> CE.notify $ CE.UpdateTreeFirstLevel pId
-- mapM_ (CE.ce_notify . CE.UpdateTreeFirstLevel) nodeIds
case view node_parent_id node' of
Nothing -> return ()
Just pId -> ce_notify $ UpdateTreeFirstLevel pId
return num
......
......@@ -65,8 +65,7 @@ import Data.Text qualified as T
import EPO.API.Client.Types qualified as EPO
import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.Core (Lang(..), NLPServerConfig, withDefaultLanguage)
import Gargantext.Core.AsyncUpdates.CentralExchange qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CE
import Gargantext.Core.AsyncUpdates.CentralExchange.Types (HasCentralExchangeNotification(ce_notify), CEMessage(..))
import Gargantext.Core.Ext.IMTUser (readFile_Annuaire)
import Gargantext.Core.NLP (HasNLPServer, nlpServerGet)
import Gargantext.Core.NodeStory.Types (HasNodeStory)
......@@ -167,6 +166,7 @@ flowDataText :: forall env err m.
, HasTreeError err
, HasValidationError err
, MonadJobStatus m
, HasCentralExchangeNotification env
)
=> User
-> DataText
......@@ -195,7 +195,8 @@ flowAnnuaire :: ( DbCmd' env err m
, HasNLPServer env
, HasTreeError err
, HasValidationError err
, MonadJobStatus m )
, MonadJobStatus m
, HasCentralExchangeNotification env )
=> MkCorpusUser
-> TermType Lang
-> FilePath
......@@ -213,7 +214,8 @@ flowCorpusFile :: ( DbCmd' env err m
, HasNLPServer env
, HasTreeError err
, HasValidationError err
, MonadJobStatus m )
, MonadJobStatus m
, HasCentralExchangeNotification env )
=> MkCorpusUser
-> Limit -- Limit the number of docs (for dev purpose)
-> TermType Lang
......@@ -242,7 +244,8 @@ flowCorpus :: ( DbCmd' env err m
, HasTreeError err
, HasValidationError err
, FlowCorpus a
, MonadJobStatus m )
, MonadJobStatus m
, HasCentralExchangeNotification env )
=> MkCorpusUser
-> TermType Lang
-> Maybe FlowSocialListWith
......@@ -262,6 +265,7 @@ flow :: forall env err m a c.
, FlowCorpus a
, MkCorpus c
, MonadJobStatus m
, HasCentralExchangeNotification env
)
=> Maybe c
-> MkCorpusUser
......@@ -275,7 +279,7 @@ flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
-- TODO if public insertMasterDocs else insertUserDocs
nlpServer <- view $ nlpServerGet (_tt_lang la)
runConduit $ zipSources (yieldMany ([1..] :: [Int])) docsC
.| CList.chunksOf 100
.| CList.chunksOf 2
.| mapM_C (addDocumentsWithProgress nlpServer userCorpusId)
.| sinkNull
......@@ -313,6 +317,7 @@ addDocumentsToHyperCorpus ncs mb_hyper la corpusId docs = do
------------------------------------------------------------------------
createNodes :: ( DbCmd' env err m, HasNodeError err
, MkCorpus c
, HasCentralExchangeNotification env
)
=> MkCorpusUser
-> Maybe c
......@@ -331,9 +336,8 @@ createNodes mkCorpusUser ctype = do
_ <- insertDefaultNodeIfNotExists NodeGraph userCorpusId userId
-- _ <- insertDefaultNodeIfNotExists NodeDashboard userCorpusId userId
liftBase $ do
CE.notify $ CE.UpdateTreeFirstLevel listId
CE.notify $ CE.UpdateTreeFirstLevel userCorpusId
ce_notify $ UpdateTreeFirstLevel listId
ce_notify $ UpdateTreeFirstLevel userCorpusId
pure (userId, userCorpusId, listId)
......
......@@ -27,7 +27,7 @@ module Gargantext.Database.Admin.Types.Hyperdata.Contact
import Data.Morpheus.Types (GQLType(..))
import Data.Time.Segment (jour)
import Gargantext.API.GraphQL.Utils qualified as GAGU
import Gargantext.API.GraphQL.UnPrefix qualified as GAGU
import Gargantext.Core.Text (HasText(..))
import Gargantext.Database.Admin.Types.Hyperdata.Prelude
import Gargantext.Prelude
......
......@@ -25,7 +25,7 @@ module Gargantext.Database.Admin.Types.Hyperdata.User
where
import Data.Morpheus.Types (GQLType(typeOptions))
import qualified Gargantext.API.GraphQL.Utils as GAGU
import qualified Gargantext.API.GraphQL.UnPrefix as GAGU
import Gargantext.Core (Lang(..))
import Gargantext.Database.Admin.Types.Hyperdata.Prelude
import Gargantext.Database.Admin.Types.Hyperdata.Contact
......
......@@ -29,6 +29,7 @@ import Database.PostgreSQL.Simple qualified as PGS
import Database.PostgreSQL.Simple.FromField ( Conversion, ResultError(ConversionFailed), fromField, returnError)
import Database.PostgreSQL.Simple.Internal (Field)
import Database.PostgreSQL.Simple.Types (Query(..))
import Gargantext.Core.AsyncUpdates.CentralExchange.Types qualified as CET
import Gargantext.Core.Mail.Types (HasMail)
import Gargantext.Core.NLP (HasNLPServer)
import Gargantext.Prelude
......@@ -81,7 +82,8 @@ type CmdCommon env =
( DbCommon env
, HasConfig env
, HasMail env
, HasNLPServer env )
, HasNLPServer env
, CET.HasCentralExchangeNotification env )
type CmdM env err m =
( CmdM' env err m
......
......@@ -24,7 +24,7 @@ module Gargantext.Database.Schema.User where
import Data.Morpheus.Types (GQLType(typeOptions))
import Data.Time (UTCTime)
import Database.PostgreSQL.Simple.FromField (FromField, fromField)
import Gargantext.API.GraphQL.Utils qualified as GAGU
import Gargantext.API.GraphQL.UnPrefix qualified as GAGU
import Gargantext.Core.Types.Individu (GargPassword, toGargPassword)
import Gargantext.Core.Utils.Prefix (unPrefix)
import Gargantext.Database.Prelude (fromField')
......
......@@ -31,27 +31,26 @@ module Gargantext.Utils.Jobs.Monad (
, markFailureNoErr
) where
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.State
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Control.Monad.Reader
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Proxy
import Data.Text qualified as T
import Data.Time.Clock
import Data.Void (Void)
import qualified Data.Text as T
import Gargantext.Utils.Jobs.Error
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.State
import Network.HTTP.Client (Manager)
import Prelude
import Servant.Job.Core qualified as SJ
import Servant.Job.Types qualified as SJ
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
import Data.Proxy
data JobEnv t w a = JobEnv
{ jeSettings :: JobSettings
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment