[flow] small refactorings

I tried to analyze why there are no updates to tree after file upload,
but haven't found it yet.
parent d19839d8
Pipeline #7660 failed with stages
in 12 minutes and 43 seconds
...@@ -164,7 +164,7 @@ source-repository-package ...@@ -164,7 +164,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-throttle location: https://gitlab.iscpif.fr/gargantext/haskell-throttle
tag: 02f5ed9ee2d6cce45161addf945b88bc6adf9059 tag: e0d9a8b32afde9652dd99e99df611a8c2a88f2e9
allow-newer: MissingH:base allow-newer: MissingH:base
, *:base , *:base
......
...@@ -230,7 +230,6 @@ constraints: any.Boolean ==0.2.4, ...@@ -230,7 +230,6 @@ constraints: any.Boolean ==0.2.4,
any.haskell-pgmq ==0.1.0.0, any.haskell-pgmq ==0.1.0.0,
any.haskell-src-exts ==1.23.1, any.haskell-src-exts ==1.23.1,
any.haskell-src-meta ==0.8.15, any.haskell-src-meta ==0.8.15,
any.haskell-throttle ==0.1.0.0,
any.hedgehog ==1.5, any.hedgehog ==1.5,
any.hedis ==0.15.2, any.hedis ==0.15.2,
hedis -dev, hedis -dev,
......
...@@ -42,7 +42,7 @@ import Gargantext.Core.Config (gc_jobs, hasConfig) ...@@ -42,7 +42,7 @@ import Gargantext.Core.Config (gc_jobs, hasConfig)
import Gargantext.Core.Config.Types (jc_max_docs_parsers) import Gargantext.Core.Config.Types (jc_max_docs_parsers)
import Gargantext.Core.NodeStory (currentVersion, NgramsStatePatch', HasNodeStoryEnv (..)) import Gargantext.Core.NodeStory (currentVersion, NgramsStatePatch', HasNodeStoryEnv (..))
import Gargantext.Core.Text.Corpus.Parsers qualified as Parser (FileType(..), parseFormatC, _ParseFormatError) import Gargantext.Core.Text.Corpus.Parsers qualified as Parser (FileType(..), parseFormatC, _ParseFormatError)
import Gargantext.Core.Text.Corpus.Parsers.Types import Gargantext.Core.Text.Corpus.Parsers.Types ( ParseCorpusResult(ParseTsvRecordFailed, ParseRecordSucceeded), AtRow(..) )
import Gargantext.Core.Text.Corpus.Query qualified as API import Gargantext.Core.Text.Corpus.Query qualified as API
import Gargantext.Core.Types.Individu (User(..)) import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Core.Utils.Prefix (unPrefix) import Gargantext.Core.Utils.Prefix (unPrefix)
...@@ -55,7 +55,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument( ...@@ -55,7 +55,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(
import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) ) import Gargantext.Database.Admin.Types.Hyperdata.File ( HyperdataFile(..) )
import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), ParentId) import Gargantext.Database.Admin.Types.Node (CorpusId, NodeType(..), ParentId)
import Gargantext.Database.GargDB qualified as GargDB import Gargantext.Database.GargDB qualified as GargDB
import Gargantext.Database.Prelude import Gargantext.Database.Prelude ( IsDBCmd, runDBTx, readLargeObject )
import Gargantext.Database.Query.Table.Node (getNodeWith, getOrMkList) import Gargantext.Database.Query.Table.Node (getNodeWith, getOrMkList)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError) import Gargantext.Database.Query.Table.Node.Error (HasNodeError)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata) import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
......
...@@ -68,7 +68,7 @@ gServer cfg = do ...@@ -68,7 +68,7 @@ gServer cfg = do
-- | make reading nanomsg as fast as possible. -- | make reading nanomsg as fast as possible.
void $ Async.concurrently (worker s_dispatcher tChan) $ do void $ Async.concurrently (worker s_dispatcher tChan) $ do
forever $ do forever $ do
$(logLoc) ioLogger DEBUG $ "[central_exchange] receiving" -- $(logLoc) ioLogger DEBUG $ "[central_exchange] receiving"
r <- recv s r <- recv s
$(logLoc) ioLogger DEBUG $ "[central_exchange] received: " <> show r $(logLoc) ioLogger DEBUG $ "[central_exchange] received: " <> show r
-- C.putStrLn $ "[central_exchange] " <> r -- C.putStrLn $ "[central_exchange] " <> r
...@@ -121,7 +121,8 @@ sendTimeout ioLogger sock payload = withFrozenCallStack $ do ...@@ -121,7 +121,8 @@ sendTimeout ioLogger sock payload = withFrozenCallStack $ do
Nothing -> Nothing ->
$(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion." $(logLoc) ioLogger ERROR $ "[central_exchange] couldn't send msg in timely fashion."
Just () -> Just () ->
$(logLoc) ioLogger DEBUG $ "[central_exchange] message sent." pure ()
-- $(logLoc) ioLogger DEBUG $ "[central_exchange] message sent."
notify :: HasCallStack => GargConfig -> CEMessage -> IO () notify :: HasCallStack => GargConfig -> CEMessage -> IO ()
notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do notify cfg ceMessage = withLogger log_cfg $ \ioLogger -> do
......
...@@ -39,8 +39,8 @@ import Gargantext.Prelude ...@@ -39,8 +39,8 @@ import Gargantext.Prelude
import Nanomsg (Pull(..), bind, recv, withSocket) import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import StmContainers.Set qualified as SSet import StmContainers.Set qualified as SSet
import Gargantext.Core.Config import Gargantext.Core.Config ( GargConfig, gc_logging, gc_notifications_config, LogConfig )
import Gargantext.System.Logging import Gargantext.System.Logging ( LogLevel(DEBUG), HasLogger(logMsg), logLoc, withLogger )
{- {-
...@@ -110,13 +110,13 @@ dispatcherListener config subscriptions = do ...@@ -110,13 +110,13 @@ dispatcherListener config subscriptions = do
filteredSubs <- atomically $ do filteredSubs <- atomically $ do
let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions let subs' = UnfoldlM.filter (pure . ceMessageSubPred ceMessage) $ SSet.unfoldlM subscriptions
UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs' UnfoldlM.foldlM' (\acc sub -> pure $ acc <> [sub]) [] subs'
-- NOTE This isn't safe: we atomically fetch subscriptions, -- NOTE This isn't safe: we atomically fetch
-- then send notifications one by one. In the meantime, a -- subscriptions, then send notifications one by one. In
-- subscription could end or new ones could appear (but is -- the meantime, a subscription could end or new ones
-- this really a problem? I new subscription comes up, then -- could appear (but is this really a problem? If a new
-- probably they already fetch new tree anyways, and if old -- subscription comes up, then probably they already fetch
-- one drops in the meantime, it won't listen to what we -- new tree anyways, and if old one drops in the meantime,
-- send...) -- it won't listen to what we send...)
-- let filteredSubs = filterCEMessageSubs ceMessage subs -- let filteredSubs = filterCEMessageSubs ceMessage subs
mapM_ (sendNotification throttleTChan ceMessage) filteredSubs mapM_ (sendNotification throttleTChan ceMessage) filteredSubs
...@@ -155,9 +155,15 @@ sendNotification throttleTChan ceMessage sub = do ...@@ -155,9 +155,15 @@ sendNotification throttleTChan ceMessage sub = do
Just NPing Just NPing
_ -> Nothing _ -> Nothing
case (topic, ceMessage) of
(UpdateTree nodeId, CETypes.UpdateTreeFirstLevel nodeId') ->
putText $ "[sendNotification] nodeId = " <> show nodeId <> ", nodeId' = " <> show nodeId'
_ -> pure ()
case mNotification of case mNotification of
Nothing -> pure () Nothing -> pure ()
Just notification -> do Just notification -> do
-- | The key upon which throttling is done.
let id' = (wsKey ws, topic) let id' = (wsKey ws, topic)
atomically $ do atomically $ do
TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing)) TChan.writeTChan throttleTChan (id', (wsConn ws, WS.Text (Aeson.encode notification) Nothing))
......
...@@ -28,7 +28,7 @@ import Control.Concurrent.Async qualified as Async ...@@ -28,7 +28,7 @@ import Control.Concurrent.Async qualified as Async
import Control.Exception.Safe qualified as Exc import Control.Exception.Safe qualified as Exc
import Control.Lens (view) import Control.Lens (view)
import Data.Aeson qualified as Aeson import Data.Aeson qualified as Aeson
import Data.UUID.V4 as UUID import Data.UUID.V4 as UUID ( nextRandom )
import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id)) import Gargantext.API.Admin.Auth.Types (AuthenticatedUser(_auth_user_id))
import Gargantext.API.Prelude (IsGargServer) import Gargantext.API.Prelude (IsGargServer)
import Gargantext.Core.Notifications.Dispatcher.Subscriptions import Gargantext.Core.Notifications.Dispatcher.Subscriptions
...@@ -38,7 +38,7 @@ import Gargantext.Core.Config (HasJWTSettings(jwtSettings), HasConfig (..), LogC ...@@ -38,7 +38,7 @@ import Gargantext.Core.Config (HasJWTSettings(jwtSettings), HasConfig (..), LogC
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(..), logMsg, withLogger, logM) import Gargantext.System.Logging (LogLevel(..), logMsg, withLogger, logM)
import Network.WebSockets qualified as WS import Network.WebSockets qualified as WS
import Servant import Servant ( type (:>), Summary, GenericMode(type (:-)) )
import Servant.API.WebSocket qualified as WS (WebSocketPending) import Servant.API.WebSocket qualified as WS (WebSocketPending)
import Servant.Auth.Server (JWTSettings, verifyJWT) import Servant.Auth.Server (JWTSettings, verifyJWT)
import Servant.Server.Generic (AsServerT) import Servant.Server.Generic (AsServerT)
......
...@@ -16,16 +16,15 @@ JSON parser for Gargantext corpus files. ...@@ -16,16 +16,15 @@ JSON parser for Gargantext corpus files.
module Gargantext.Core.Text.Corpus.Parsers.JSON where module Gargantext.Core.Text.Corpus.Parsers.JSON where
-- import Gargantext.Database.Schema.Node (NodePoly(..)) -- import Gargantext.Database.Schema.Node (NodePoly(..))
import Conduit import Conduit ( ConduitT, (.|), yieldMany, mapC )
import Data.Aeson import Data.Aeson ( eitherDecode )
import Data.ByteString.Lazy qualified as BL import Data.ByteString.Lazy qualified as BL
import Data.Text qualified as T import Data.Text qualified as T
import GHC.Generics
import Gargantext.Core (Lang) import Gargantext.Core (Lang)
import Gargantext.Core.Text.Corpus.Parsers.JSON.Istex qualified as Istex import Gargantext.Core.Text.Corpus.Parsers.JSON.Istex qualified as Istex
import Gargantext.Database.Admin.Types.Hyperdata (HyperdataDocument(..)) import Gargantext.Database.Admin.Types.Hyperdata (HyperdataDocument(..))
import Gargantext.Prelude hiding (length) import Gargantext.Prelude hiding (length)
import Protolude import Protolude ( Foldable(length) )
data JSONStruct = data JSONStruct =
......
...@@ -56,7 +56,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -56,7 +56,7 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
import Conduit import Conduit
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Monad.Catch import Control.Monad.Catch ( MonadCatch(..) )
import Data.Conduit qualified as C import Data.Conduit qualified as C
import Data.Conduit.Internal (zipSources) import Data.Conduit.Internal (zipSources)
import Data.Conduit.List qualified as CL import Data.Conduit.List qualified as CL
...@@ -97,7 +97,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact ) ...@@ -97,7 +97,8 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact ( HyperdataContact )
import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) ) import Gargantext.Database.Admin.Types.Hyperdata.Corpus ( HyperdataAnnuaire, HyperdataCorpus(_hc_lang) )
import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument ) import Gargantext.Database.Admin.Types.Hyperdata.Document ( ToHyperdataDocument(toHyperdataDocument), HyperdataDocument )
import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId) import Gargantext.Database.Admin.Types.Node hiding (ERROR, DEBUG) -- (HyperdataDocument(..), NodeType(..), NodeId, UserId, ListId, CorpusId, RootId, MasterCorpusId, MasterUserId)
import Gargantext.Database.Prelude import Gargantext.Database.Class ( IsDBCmd, DBCmdWithEnv )
import Gargantext.Database.Transactional ( DBUpdate, runDBTx )
import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 ) import Gargantext.Database.Query.Table.ContextNodeNgrams2 ( ContextNodeNgrams2Poly(..), insertContextNodeNgrams2 )
import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith ) import Gargantext.Database.Query.Table.Node ( MkCorpus, insertDefaultNodeIfNotExists, getOrMkList, getNodeWith )
import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add) import Gargantext.Database.Query.Table.Node.Document.Add qualified as Doc (add)
...@@ -107,8 +108,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId) ...@@ -107,8 +108,8 @@ import Gargantext.Database.Query.Table.NodeContext (selectDocNodesOnlyId)
import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId) import Gargantext.Database.Query.Table.NodeNgrams (listInsertDb , getCgramsId)
import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser) import Gargantext.Database.Query.Tree.Root (MkCorpusUser(..), getOrMkRoot, getOrMkRootWithCorpus, userFromMkCorpusUser)
import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId ) import Gargantext.Database.Schema.Ngrams ( indexNgrams, NgramsId )
import Gargantext.Database.Schema.Node import Gargantext.Database.Schema.Node ( NodePoly(_node_id, _node_hash_id), node_hyperdata )
import Gargantext.Database.Types import Gargantext.Database.Types ( Indexed(Indexed) )
import Gargantext.Prelude hiding (catch, onException, to) import Gargantext.Prelude hiding (catch, onException, to)
import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger ) import Gargantext.System.Logging ( logLocM, LogLevel(DEBUG, ERROR), MonadLogger )
import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..) ) import Gargantext.Utils.Jobs.Monad ( JobHandle, MonadJobStatus(..) )
...@@ -291,6 +292,7 @@ flow :: forall env err m a c. ...@@ -291,6 +292,7 @@ flow :: forall env err m a c.
flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do flow c mkCorpusUser la mfslw (count, docsC) jobHandle = do
cfg <- view hasConfig cfg <- view hasConfig
(_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c (_userId, userCorpusId, listId, msgs) <- runDBTx $ createNodes cfg mkCorpusUser c
-- liftBase $ putText $ "[flow] msgs: " <> show msgs
forM_ msgs ce_notify forM_ msgs ce_notify
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
runConduit $ zipSources (yieldMany ([1..] :: [Int])) docsC runConduit $ zipSources (yieldMany ([1..] :: [Int])) docsC
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment