From c41ca75bb2745a1912e73a56e9e25d681977c6b7 Mon Sep 17 00:00:00 2001 From: Przemek Kaminski <pk@intrepidus.pl> Date: Fri, 16 Dec 2022 13:46:37 +0100 Subject: [PATCH] [nodeStory] save archive immediately, delay saving of node story state We had a conflict here: archive history is used to resolve new patches. However, they were saved with delay and after that were cleared. We should save archive immediately and immediately clear it in mvar so it's ready for new patches. --- src/Gargantext/API/Admin/EnvTypes.hs | 6 ++ src/Gargantext/API/Ngrams.hs | 23 +++++++- src/Gargantext/Core/NodeStory.hs | 84 +++++++++++++++------------- src/Gargantext/Core/NodeStoryFile.hs | 2 + 4 files changed, 72 insertions(+), 43 deletions(-) diff --git a/src/Gargantext/API/Admin/EnvTypes.hs b/src/Gargantext/API/Admin/EnvTypes.hs index f3ce8717..ab9a3ee4 100644 --- a/src/Gargantext/API/Admin/EnvTypes.hs +++ b/src/Gargantext/API/Admin/EnvTypes.hs @@ -82,6 +82,9 @@ instance HasNodeStorySaver Env where instance HasNodeStoryImmediateSaver Env where hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate +instance HasNodeArchiveStoryImmediateSaver Env where + hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate + instance HasSettings Env where settings = env_settings @@ -138,5 +141,8 @@ instance HasNodeStorySaver DevEnv where instance HasNodeStoryImmediateSaver DevEnv where hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate +instance HasNodeArchiveStoryImmediateSaver DevEnv where + hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate + instance HasMail DevEnv where mailSettings = dev_env_mail diff --git a/src/Gargantext/API/Ngrams.hs b/src/Gargantext/API/Ngrams.hs index 0a22dcd3..c7f52864 100644 --- a/src/Gargantext/API/Ngrams.hs +++ b/src/Gargantext/API/Ngrams.hs @@ -201,7 +201,6 @@ saveNodeStoryImmediate = do saver --Gargantext.Prelude.putStrLn "---- Node story immediate saver finished ----" - listTypeConflictResolution :: ListType -> ListType -> ListType listTypeConflictResolution _ _ = undefined -- TODO Use Map User ListType @@ -293,13 +292,17 @@ newNgramsFromNgramsStatePatch p = -commitStatePatch :: (HasNodeStory env err m, HasMail env) +commitStatePatch :: ( HasNodeStory env err m + , HasNodeStoryImmediateSaver env + , HasNodeArchiveStoryImmediateSaver env + , HasMail env) => ListId -> Versioned NgramsStatePatch' -> m (Versioned NgramsStatePatch') commitStatePatch listId (Versioned _p_version p) = do -- printDebug "[commitStatePatch]" listId var <- getNodeStoryVar [listId] + archiveSaver <- view hasNodeArchiveStoryImmediateSaver vq' <- liftBase $ modifyMVar var $ \ns -> do let a = ns ^. unNodeStory . at listId . _Just @@ -329,10 +332,22 @@ commitStatePatch listId (Versioned _p_version p) = do -} -- printDebug "[commitStatePatch] a version" (a ^. a_version) -- printDebug "[commitStatePatch] a' version" (a' ^. a_version) - pure ( ns & unNodeStory . at listId .~ (Just a') + let newNs = ( ns & unNodeStory . at listId .~ (Just a') , Versioned (a' ^. a_version) q' ) + + -- NOTE Now is the only good time to save the archive history. We + -- have the handle to the MVar and we need to save its exact + -- snapshot. Node Story archive is a linear table, so it's only + -- couple of inserts, it shouldn't take long... + --newNs' <- saveNodeArchiveStoryImmediate $ fst newNs + newNs' <- archiveSaver $ fst newNs + + pure (newNs', snd newNs) + + -- NOTE State (i.e. `NodeStory` can be saved asynchronously, i.e. with debounce) saveNodeStory + --saveNodeStoryImmediate -- Save new ngrams _ <- insertNgrams (newNgramsFromNgramsStatePatch p) @@ -366,6 +381,8 @@ tableNgramsPull listId ngramsType p_version = do -- client. -- TODO-ACCESS check tableNgramsPut :: ( HasNodeStory env err m + , HasNodeStoryImmediateSaver env + , HasNodeArchiveStoryImmediateSaver env , HasInvalidError err , HasSettings env , HasMail env diff --git a/src/Gargantext/Core/NodeStory.hs b/src/Gargantext/Core/NodeStory.hs index 69226602..ebff31cd 100644 --- a/src/Gargantext/Core/NodeStory.hs +++ b/src/Gargantext/Core/NodeStory.hs @@ -56,6 +56,8 @@ module Gargantext.Core.NodeStory , hasNodeStorySaver , HasNodeStoryImmediateSaver , hasNodeStoryImmediateSaver + , HasNodeArchiveStoryImmediateSaver + , hasNodeArchiveStoryImmediateSaver , NodeStory(..) , NgramsStatePatch' , NodeListStory @@ -65,14 +67,12 @@ module Gargantext.Core.NodeStory , nse_getter , nse_saver , nse_saver_immediate + , nse_archive_saver_immediate , nse_var , unNodeStory , getNodeArchiveHistory , Archive(..) , initArchive - , insertArchiveList - , deleteArchiveList - , updateArchiveList , a_history , a_state , a_version @@ -86,7 +86,9 @@ module Gargantext.Core.NodeStory , upsertNodeStories , getNodeStory , nodeStoriesQuery - , currentVersion ) + , currentVersion + , archiveStateFromList + , archiveStateToList ) where -- import Debug.Trace (traceShow) @@ -129,6 +131,7 @@ data NodeStoryEnv = NodeStoryEnv { _nse_var :: !(MVar NodeListStory) , _nse_saver :: !(IO ()) , _nse_saver_immediate :: !(IO ()) + , _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory) , _nse_getter :: [NodeId] -> IO (MVar NodeListStory) --, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories -- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only) @@ -157,6 +160,9 @@ class HasNodeStorySaver env where class HasNodeStoryImmediateSaver env where hasNodeStoryImmediateSaver :: Getter env (IO ()) +class HasNodeArchiveStoryImmediateSaver env where + hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory) + ------------------------------------------------------------------------ {- | Node Story for each NodeType where the Key of the Map is NodeId @@ -401,6 +407,10 @@ getNodeStory c nId@(NodeId nodeId) = do , _a_history = [] , _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res -- NOTE Sanity check: all versions in the DB should be the same + -- TODO Maybe redesign the DB so that `node_stories` has only + -- `node_id`, `version` and there is a M2M table + -- `node_stories_ngrams` without the `version` colum? Then we would + -- have `version` in only one place. let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData if Set.size versionsS > 1 then panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS @@ -424,8 +434,8 @@ type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm) -- |Functions to convert archive state (which is a `Map NgramsType -- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list -archiveStateAsList :: NgramsState' -> ArchiveStateList -archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s +archiveStateToList :: NgramsState' -> ArchiveStateList +archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s archiveStateFromList :: ArchiveStateList -> NgramsState' archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l @@ -444,13 +454,14 @@ insertNodeStory c (NodeId nId) a = do termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64] case headMay termIdM of Nothing -> pure 0 - Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state - -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state + Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state + -- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state pure () where query :: PGS.Query - query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |] + query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) + VALUES (?, ?, ?, ?) |] -- insert ngramsType ngrams ngramsRepoElement = -- Insert { iTable = nodeStoryTable -- , iRows = [NodeStoryDB { node_id = sqlInt4 nId @@ -462,10 +473,9 @@ insertNodeStory c (NodeId nId) a = do -- , iReturning = rCount -- , iOnConflict = Nothing } -insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO () -insertArchiveList c nodeId a = do - _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state) - --_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state) +insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO () +insertArchiveStateList c nodeId version as = do + _ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as pure () where query :: PGS.Query @@ -474,19 +484,18 @@ insertArchiveList c nodeId a = do SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id |] -deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO () -deleteArchiveList c nodeId a = do - _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state) - --_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state) +deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO () +deleteArchiveStateList c nodeId as = do + _ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as pure () where query :: PGS.Query query = [sql| DELETE FROM node_stories WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |] -updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO () -updateArchiveList c nodeId a = do - let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state) +updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO () +updateArchiveStateList c nodeId version as = do + let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as --q <- PGS.format c query params --printDebug "[updateArchiveList] query" q _ <- mapM (\p -> runPGSExecute c query p) params @@ -494,7 +503,7 @@ updateArchiveList c nodeId a = do where query :: PGS.Query query = [sql| UPDATE node_stories - SET ngrams_repo_element = ? + SET ngrams_repo_element = ?, version = ? WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |] -- | This function updates the node story and archive for given node_id. @@ -505,18 +514,18 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do -- 0. We assume we're inside an advisory lock -- 1. Find differences (inserts/updates/deletes) - let currentList = archiveStateAsList $ currentArchive ^. a_state - let newList = archiveStateAsList $ newArchive ^. a_state + let currentList = archiveStateToList $ currentArchive ^. a_state + let newList = archiveStateToList $ newArchive ^. a_state let currentSet = archiveStateSet currentList let newSet = archiveStateSet newList printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList - printDebug "[updateNodeStory] inserts" inserts + -- printDebug "[updateNodeStory] inserts" inserts printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList - printDebug "[updateNodeStory] deletes" deletes + -- printDebug "[updateNodeStory] deletes" deletes -- updates are the things that are in new but not in current let commonSet = Set.intersection currentSet newSet @@ -527,20 +536,14 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do -- 2. Perform inserts/deletes/updates --printDebug "[updateNodeStory] applying insert" () - insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version - , _a_history = [] - , _a_state = archiveStateFromList inserts } + insertArchiveStateList c nodeId (newArchive ^. a_version) inserts --printDebug "[updateNodeStory] insert applied" () --TODO Use currentArchive ^. a_version in delete and report error -- if entries with (node_id, ngrams_type_id, ngrams_id) but -- different version are found. - deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version - , _a_history = [] - , _a_state = archiveStateFromList deletes } + deleteArchiveStateList c nodeId deletes --printDebug "[updateNodeStory] delete applied" () - updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version - , _a_history = [] - , _a_state = archiveStateFromList updates } + updateArchiveStateList c nodeId (newArchive ^. a_version) updates --printDebug "[updateNodeStory] update applied" () pure () @@ -567,11 +570,6 @@ upsertNodeStories c nodeId@(NodeId nId) newArchive = do printDebug "[upsertNodeStories] locking nId" nId runPGSAdvisoryXactLock c nId - -- whether it's insert or update, we can insert node archive history already - -- NOTE: It is assumed that the most recent change is the first in the - -- list, so we save these in reverse order - insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history - (NodeStory m) <- getNodeStory c nodeId case Map.lookup nodeId m of Nothing -> do @@ -639,7 +637,12 @@ readNodeStoryEnv pool = do withResource pool $ \c -> do --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns writeNodeStories c ns - pure $ clearHistory ns + pure ns + let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do + _ <- mapM (\(nId, a) -> do + insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history + ) $ Map.toList nls + pure $ clearHistory ns saver <- mkNodeStorySaver saver_immediate -- let saver = modifyMVar_ mvar $ \mv -> do -- writeNodeStories pool mv @@ -650,6 +653,7 @@ readNodeStoryEnv pool = do pure $ NodeStoryEnv { _nse_var = mvar , _nse_saver = saver , _nse_saver_immediate = saver_immediate + , _nse_archive_saver_immediate = archive_saver_immediate , _nse_getter = nodeStoryVar pool (Just mvar) } diff --git a/src/Gargantext/Core/NodeStoryFile.hs b/src/Gargantext/Core/NodeStoryFile.hs index d2da42a0..1e2b66bd 100644 --- a/src/Gargantext/Core/NodeStoryFile.hs +++ b/src/Gargantext/Core/NodeStoryFile.hs @@ -69,9 +69,11 @@ readNodeStoryEnv nsd = do mvar <- nodeStoryVar nsd Nothing [] saver <- mkNodeStorySaver nsd mvar let saver_immediate = withMVar mvar (writeNodeStories nsd) + let archive_saver_immediate ns = pure ns pure $ NodeStoryEnv { _nse_var = mvar , _nse_saver = saver , _nse_saver_immediate = saver_immediate + , _nse_archive_saver_immediate = archive_saver_immediate , _nse_getter = nodeStoryVar nsd (Just mvar) } ------------------------------------------------------------------------ -- 2.21.0