[refactor] openalex fix, refactoring, fix dispatcher logging

parent 5bf220f1
Pipeline #6239 failed with stages
......@@ -108,7 +108,7 @@ source-repository-package
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/crawlers/openalex.git
tag: ceb8f2cebd4890b6d9d151ab01ee14e925bc0499
tag: c2114adb0382770e419e5a7ae1b3a1ee5b09ee50
source-repository-package
type: git
......
......@@ -39,6 +39,7 @@ import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Core.Types (NodeId, UserId)
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
-- import Gargantext.Utils.Jobs.Monad (MonadJobStatus(getLatestJobStatus))
import GHC.Conc (TVar, newTVarIO, readTVar, writeTVar)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
......@@ -149,18 +150,19 @@ wsServer = WSAPI { wsAPIServer = streamData }
threadDelay $ 10 * 1000000
wsLoop authSettings subscriptions ws = flip finally disconnect $ do
putText "[wsLoop] connecting"
wsLoop' CUPublic
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] connecting"
wsLoop' CUPublic ioLogger
where
wsLoop' user = do
wsLoop' user ioLogger = do
dm <- WS.receiveDataMessage (wsConn ws)
newUser <- case dm of
WS.Text dm' _ -> do
case Aeson.decode dm' of
Nothing -> do
putText $ "[wsLoop] unknown message: " <> show dm'
logMsg ioLogger DEBUG $ "[wsLoop] unknown message: " <> show dm'
return user
Just (WSSubscribe topic) -> do
-- TODO Fix s_connected_user based on header
......@@ -181,7 +183,7 @@ wsServer = WSAPI { wsAPIServer = streamData }
let jwtS = authSettings ^. jwtSettings
mUser <- liftBase $ verifyJWT jwtS (encodeUtf8 token)
putText $ "[wsLoop] authorized user: " <> show mUser
logMsg ioLogger DEBUG $ "[wsLoop] authorized user: " <> show mUser
-- TODO Update my subscriptions!
......@@ -190,16 +192,17 @@ wsServer = WSAPI { wsAPIServer = streamData }
-- TODO Update my subscriptions!
pure CUPublic
_ -> do
putText "[wsLoop] binary ws messages not supported"
logMsg ioLogger DEBUG "[wsLoop] binary ws messages not supported"
return user
wsLoop' newUser
wsLoop' newUser ioLogger
disconnect = do
putText "[wsLoop] disconnecting..."
_ss <- removeSubscriptionsForWSKey subscriptions ws
-- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss)
return ()
withLogger () $ \ioLogger -> do
logMsg ioLogger DEBUG "[wsLoop] disconnecting..."
_ss <- removeSubscriptionsForWSKey subscriptions ws
-- putText $ "[wsLoop] subscriptions: " <> show (show <$> ss)
return ()
-- | This is a nanomsg socket listener. We want to read the messages
......@@ -230,7 +233,8 @@ dispatcher_listener subscriptions = do
-- putText $ "[" <> show tId <> "] received a message: " <> decodeUtf8 r
case Aeson.decode (BSL.fromStrict r) of
Nothing -> putText "[dispatcher_listener] unknown message from central exchange"
Nothing -> withLogger () $ \ioL ->
logMsg ioL DEBUG "[dispatcher_listener] unknown message from central exchange"
Just ceMessage -> do
-- putText $ "[dispatcher_listener] received message: " <> show ceMessage
-- subs <- atomically $ readTVar subscriptions
......
......@@ -36,7 +36,7 @@ multiterms nsc l txt = do
let txt' = cleanTextForNLP txt
if txt' == ""
then do
printDebug "[G.C.T.Terms.Multi] becomes empty after cleanTextForNLP" txt
-- printDebug "[G.C.T.Terms.Multi] becomes empty after cleanTextForNLP" txt
pure []
else do
ret <- multiterms' tokenTag2terms l txt'
......
......@@ -70,8 +70,8 @@ getTficf_withSample cId mId nt = do
<$> getOccByNgramsOnlyFast_withSample mId countGlobal nt
(HM.keys mapTextDoubleLocal)
printDebug "[getTficf_withSample] mapTextDoubleLocal: " mapTextDoubleLocal
printDebug "[getTficf_withSample] mapTextDoubleGlobal: " mapTextDoubleGlobal
-- printDebug "[getTficf_withSample] mapTextDoubleLocal: " mapTextDoubleLocal
-- printDebug "[getTficf_withSample] mapTextDoubleGlobal: " mapTextDoubleGlobal
--printDebug "getTficf_withSample" (mapTextDoubleLocal, mapTextDoubleGlobal, countLocal, countGlobal)
pure $ HM.mapWithKey (\t n ->
......
......@@ -24,7 +24,7 @@ module Gargantext.Utils.Jobs.Map (
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
......@@ -71,7 +71,7 @@ data QueuedJob w r where
-- | A running job points to the async computation for the job and provides a
-- function to peek at the current logs.
data RunningJob w a = RunningJob
{ rjAsync :: Async a
{ rjAsync :: Async.Async a
, rjGetLog :: IO w
}
......@@ -183,18 +183,18 @@ waitJobDone jid rj (JobMap mvar) = do
runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
runJ (QueuedJob a f) = do
logs <- newTVarIO mempty
act <- async $ f a (jobLog logs)
act <- Async.async $ f a (jobLog logs)
let readLogs = readTVarIO logs
pure (RunningJob act readLogs)
-- | Wait for a running job to return (blocking).
waitJ :: RunningJob w a -> IO (Either SomeException a)
waitJ (RunningJob act _) = waitCatch act
waitJ (RunningJob act _) = Async.waitCatch act
-- | Poll a running job to see if it's done.
pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
pollJ (RunningJob act _) = poll act
pollJ (RunningJob act _) = Async.poll act
-- | Kill a running job by cancelling the action.
killJ :: RunningJob w a -> IO ()
killJ (RunningJob act _) = cancel act
killJ (RunningJob act _) = Async.cancel act
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment