Commit f12b9df7 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli Committed by Alfredo Di Napoli

refactor(logging): Add more debug logs for dispatcher/workers

They can be enabled with `GGTX_LOG_LEVEL` during tests.
parent 68dbf45c
...@@ -14,6 +14,8 @@ https://dev.sub.gargantext.org/#/share/Notes/187918 ...@@ -14,6 +14,8 @@ https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# LANGUAGE TemplateHaskell #-}
module Gargantext.Core.Notifications.Dispatcher ( module Gargantext.Core.Notifications.Dispatcher (
Dispatcher -- opaque Dispatcher -- opaque
, withDispatcher , withDispatcher
...@@ -34,11 +36,11 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes ...@@ -34,11 +36,11 @@ import Gargantext.Core.Notifications.CentralExchange.Types qualified as CETypes
import Gargantext.Core.Notifications.Dispatcher.Types import Gargantext.Core.Notifications.Dispatcher.Types
import Gargantext.Core.Worker.Types (JobInfo(..)) import Gargantext.Core.Worker.Types (JobInfo(..))
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(..), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recv, withSocket) import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set qualified as SSet import StmContainers.Set qualified as SSet
import Gargantext.Core.Config import Gargantext.Core.Config
import Gargantext.System.Logging
{- {-
...@@ -92,20 +94,18 @@ dispatcherListener config subscriptions = do ...@@ -92,20 +94,18 @@ dispatcherListener config subscriptions = do
where where
NotificationsConfig { _nc_dispatcher_bind } = config ^. gc_notifications_config NotificationsConfig { _nc_dispatcher_bind } = config ^. gc_notifications_config
log_cfg = config ^. gc_logging log_cfg = config ^. gc_logging
worker tChan throttleTChan = do worker tChan throttleTChan = withLogger log_cfg $ \ioL -> do
-- tId <- myThreadId tId <- myThreadId
forever $ do forever $ do
r <- atomically $ TChan.readTChan tChan r <- atomically $ TChan.readTChan tChan
-- putText $ "[" <> show tId <> "] received a message: " <> decodeUtf8 r $(logLoc) ioL DEBUG $ "[" <> show tId <> "] received a message: " <> decodeUtf8 r
case Aeson.decode (BSL.fromStrict r) of case Aeson.decode (BSL.fromStrict r) of
Nothing -> Nothing ->
withLogger log_cfg $ \ioL ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange" logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do Just ceMessage -> do
withLogger log_cfg $ \ioL -> logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage
logMsg ioL DEBUG $ "[dispatcher_listener] received " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions -- subs <- atomically $ readTVar subscriptions
filteredSubs <- atomically $ do filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
......
...@@ -11,11 +11,11 @@ https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341 ...@@ -11,11 +11,11 @@ https://gitlab.iscpif.fr/gargantext/haskell-gargantext/issues/341
Docs: Docs:
https://dev.sub.gargantext.org/#/share/Notes/187918 https://dev.sub.gargantext.org/#/share/Notes/187918
-} -}
{-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-} {-# OPTIONS_GHC -fno-warn-unused-matches -fno-warn-unused-imports #-}
module Gargantext.Core.Notifications.Dispatcher.Types where module Gargantext.Core.Notifications.Dispatcher.Types where
import Codec.Binary.UTF8.String qualified as CBUTF8 import Codec.Binary.UTF8.String qualified as CBUTF8
...@@ -120,7 +120,7 @@ instance ToJSON Topic where ...@@ -120,7 +120,7 @@ instance ToJSON Topic where
-- pure $ MJobLog jl -- pure $ MJobLog jl
data ConnectedUser = data ConnectedUser =
CUUser UserId CUUser UserId
| CUPublic | CUPublic
...@@ -128,7 +128,7 @@ data ConnectedUser = ...@@ -128,7 +128,7 @@ data ConnectedUser =
instance Hashable ConnectedUser where instance Hashable ConnectedUser where
hashWithSalt salt (CUUser userId) = hashWithSalt salt ("cuuser" :: Text, userId) hashWithSalt salt (CUUser userId) = hashWithSalt salt ("cuuser" :: Text, userId)
hashWithSalt salt CUPublic = hashWithSalt salt ("cupublic" :: Text) hashWithSalt salt CUPublic = hashWithSalt salt ("cupublic" :: Text)
newtype WSKeyConnection = WSKeyConnection (ByteString, WS.Connection) newtype WSKeyConnection = WSKeyConnection (ByteString, WS.Connection)
instance Hashable WSKeyConnection where instance Hashable WSKeyConnection where
hashWithSalt salt (WSKeyConnection (key, _conn)) = hashWithSalt salt key hashWithSalt salt (WSKeyConnection (key, _conn)) = hashWithSalt salt key
...@@ -142,7 +142,7 @@ wsKey :: WSKeyConnection -> ByteString ...@@ -142,7 +142,7 @@ wsKey :: WSKeyConnection -> ByteString
wsKey (WSKeyConnection (key, _conn)) = key wsKey (WSKeyConnection (key, _conn)) = key
wsConn :: WSKeyConnection -> WS.Connection wsConn :: WSKeyConnection -> WS.Connection
wsConn (WSKeyConnection (_key, conn)) = conn wsConn (WSKeyConnection (_key, conn)) = conn
data Subscription = data Subscription =
Subscription { Subscription {
s_connected_user :: ConnectedUser s_connected_user :: ConnectedUser
...@@ -158,7 +158,7 @@ subKey sub = wsKey $ s_ws_key_connection sub ...@@ -158,7 +158,7 @@ subKey sub = wsKey $ s_ws_key_connection sub
type Token = Text type Token = Text
{- {-
We accept requests for subscription/unsubscription via websocket. We accept requests for subscription/unsubscription via websocket.
...@@ -200,7 +200,7 @@ instance ToJSON WSRequest where ...@@ -200,7 +200,7 @@ instance ToJSON WSRequest where
toJSON (WSAuthorize token) = Aeson.object [ "request" .= ( "authorize" :: Text ) toJSON (WSAuthorize token) = Aeson.object [ "request" .= ( "authorize" :: Text )
, "token" .= token ] , "token" .= token ]
toJSON WSDeauthorize = Aeson.object [ "request" .= ( "deauthorize" :: Text ) ] toJSON WSDeauthorize = Aeson.object [ "request" .= ( "deauthorize" :: Text ) ]
class HasDispatcher env dispatcher where class HasDispatcher env dispatcher where
hasDispatcher :: Getter env dispatcher hasDispatcher :: Getter env dispatcher
......
...@@ -100,6 +100,7 @@ wsLoop log_cfg jwtS subscriptions ws = flip finally disconnect $ do ...@@ -100,6 +100,7 @@ wsLoop log_cfg jwtS subscriptions ws = flip finally disconnect $ do
where where
wsLoop' user ioLogger = do wsLoop' user ioLogger = do
dm <- WS.receiveDataMessage (wsConn ws) dm <- WS.receiveDataMessage (wsConn ws)
logMsg ioLogger DEBUG $ "[wsLoop'] handling new message.."
newUser <- case dm of newUser <- case dm of
WS.Text dm' _ -> do WS.Text dm' _ -> do
...@@ -113,8 +114,8 @@ wsLoop log_cfg jwtS subscriptions ws = flip finally disconnect $ do ...@@ -113,8 +114,8 @@ wsLoop log_cfg jwtS subscriptions ws = flip finally disconnect $ do
let sub = Subscription { s_connected_user = user let sub = Subscription { s_connected_user = user
, s_ws_key_connection = ws , s_ws_key_connection = ws
, s_topic = topic } , s_topic = topic }
_ss <- insertSubscription subscriptions sub insertSubscription subscriptions sub
-- putText $ "[wsLoop] subscriptions: " <> show (showSub <$> ss) logMsg ioLogger DEBUG $ "[wsLoop] added subscription: " <> show sub
return user return user
Just (WSUnsubscribe topic) -> do Just (WSUnsubscribe topic) -> do
logMsg ioLogger DEBUG $ "[wsLoop'] unsubscribe topic " <> show topic logMsg ioLogger DEBUG $ "[wsLoop'] unsubscribe topic " <> show topic
......
...@@ -12,6 +12,7 @@ Portability : POSIX ...@@ -12,6 +12,7 @@ Portability : POSIX
{-# OPTIONS_GHC -Wno-orphans #-} {-# OPTIONS_GHC -Wno-orphans #-}
{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
module Test.API.Worker ( module Test.API.Worker (
...@@ -20,9 +21,11 @@ module Test.API.Worker ( ...@@ -20,9 +21,11 @@ module Test.API.Worker (
import Control.Concurrent.Async (withAsync) import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Lens
import Control.Monad.STM (atomically) import Control.Monad.STM (atomically)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.Maybe (isJust) import Data.Maybe (isJust)
import Gargantext.Core.Config
import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT import Gargantext.Core.Notifications.Dispatcher.Types qualified as DT
import Gargantext.Core.Worker.Jobs (sendJobWithCfg) import Gargantext.Core.Worker.Jobs (sendJobWithCfg)
import Gargantext.Core.Worker.Jobs.Types (Job(Ping)) import Gargantext.Core.Worker.Jobs.Types (Job(Ping))
...@@ -34,6 +37,9 @@ import Test.Database.Types (test_config) ...@@ -34,6 +37,9 @@ import Test.Database.Types (test_config)
import Test.Hspec import Test.Hspec
import Test.Instances () import Test.Instances ()
import Test.Utils.Notifications import Test.Utils.Notifications
import Gargantext.System.Logging
import qualified Data.Text.Encoding as TE
import qualified Data.ByteString as BL
...@@ -42,18 +48,12 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -42,18 +48,12 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
describe "Worker" $ do describe "Worker" $ do
it "simple Ping job works" $ \(SpecContext testEnv port _app _) -> do it "simple Ping job works" $ \(SpecContext testEnv port _app _) -> do
let cfg = test_config testEnv let cfg = test_config testEnv
let log_cfg = (test_config testEnv) ^. gc_logging
let topic = DT.Ping let topic = DT.Ping
tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification)) tchan <- newTChanIO :: IO (TChan (Maybe DT.Notification))
-- setup a websocket connection
let wsConnect =
withWSConnection ("127.0.0.1", port) $ \conn -> do
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
withAsync wsConnect $ \_a -> do withAsync (setupWsThread log_cfg topic tchan port) $ \_a -> do
_ <- sendJobWithCfg cfg Ping _ <- sendJobWithCfg cfg Ping
mTimeout <- Timeout.timeout (5 * 1_000_000) $ do mTimeout <- Timeout.timeout (5 * 1_000_000) $ do
...@@ -62,3 +62,14 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -62,3 +62,14 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
md `shouldBe` Just DT.NPing md `shouldBe` Just DT.NPing
mTimeout `shouldSatisfy` isJust mTimeout `shouldSatisfy` isJust
setupWsThread :: LogConfig -> DT.Topic -> TChan (Maybe DT.Notification) -> Int -> IO ()
setupWsThread log_cfg topic tchan port = withLogger log_cfg $ \ioL -> do
withWSConnection ("127.0.0.1", port) $ \conn -> do
let payload = Aeson.encode (DT.WSSubscribe topic)
$(logLoc) ioL DEBUG $ "Sending payload: " <> (TE.decodeUtf8 $ BL.toStrict $ payload)
WS.sendTextData conn $ Aeson.encode (DT.WSSubscribe topic)
d <- WS.receiveData conn
$(logLoc) ioL DEBUG $ "Received: " <> (TE.decodeUtf8 $ BL.toStrict d)
let dec = Aeson.decode d :: Maybe DT.Notification
atomically $ writeTChan tchan dec
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment