Commit c41ca75b authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[nodeStory] save archive immediately, delay saving of node story state

We had a conflict here: archive history is used to resolve new
patches. However, they were saved with delay and after that were
cleared. We should save archive immediately and immediately clear it
in mvar so it's ready for new patches.
parent 5efae317
Pipeline #3477 canceled with stage
......@@ -82,6 +82,9 @@ instance HasNodeStorySaver Env where
instance HasNodeStoryImmediateSaver Env where
hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate
instance HasNodeArchiveStoryImmediateSaver Env where
hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate
instance HasSettings Env where
settings = env_settings
......@@ -138,5 +141,8 @@ instance HasNodeStorySaver DevEnv where
instance HasNodeStoryImmediateSaver DevEnv where
hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate
instance HasNodeArchiveStoryImmediateSaver DevEnv where
hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate
instance HasMail DevEnv where
mailSettings = dev_env_mail
......@@ -201,7 +201,6 @@ saveNodeStoryImmediate = do
--Gargantext.Prelude.putStrLn "---- Node story immediate saver finished ----"
listTypeConflictResolution :: ListType -> ListType -> ListType
listTypeConflictResolution _ _ = undefined -- TODO Use Map User ListType
......@@ -293,13 +292,17 @@ newNgramsFromNgramsStatePatch p =
commitStatePatch :: (HasNodeStory env err m, HasMail env)
commitStatePatch :: ( HasNodeStory env err m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
, HasMail env)
=> ListId
-> Versioned NgramsStatePatch'
-> m (Versioned NgramsStatePatch')
commitStatePatch listId (Versioned _p_version p) = do
-- printDebug "[commitStatePatch]" listId
var <- getNodeStoryVar [listId]
archiveSaver <- view hasNodeArchiveStoryImmediateSaver
vq' <- liftBase $ modifyMVar var $ \ns -> do
a = ns ^. unNodeStory . at listId . _Just
......@@ -329,10 +332,22 @@ commitStatePatch listId (Versioned _p_version p) = do
-- printDebug "[commitStatePatch] a version" (a ^. a_version)
-- printDebug "[commitStatePatch] a' version" (a' ^. a_version)
pure ( ns & unNodeStory . at listId .~ (Just a')
let newNs = ( ns & unNodeStory . at listId .~ (Just a')
, Versioned (a' ^. a_version) q'
-- NOTE Now is the only good time to save the archive history. We
-- have the handle to the MVar and we need to save its exact
-- snapshot. Node Story archive is a linear table, so it's only
-- couple of inserts, it shouldn't take long...
--newNs' <- saveNodeArchiveStoryImmediate $ fst newNs
newNs' <- archiveSaver $ fst newNs
pure (newNs', snd newNs)
-- NOTE State (i.e. `NodeStory` can be saved asynchronously, i.e. with debounce)
-- Save new ngrams
_ <- insertNgrams (newNgramsFromNgramsStatePatch p)
......@@ -366,6 +381,8 @@ tableNgramsPull listId ngramsType p_version = do
-- client.
-- TODO-ACCESS check
tableNgramsPut :: ( HasNodeStory env err m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
, HasInvalidError err
, HasSettings env
, HasMail env
......@@ -56,6 +56,8 @@ module Gargantext.Core.NodeStory
, hasNodeStorySaver
, HasNodeStoryImmediateSaver
, hasNodeStoryImmediateSaver
, HasNodeArchiveStoryImmediateSaver
, hasNodeArchiveStoryImmediateSaver
, NodeStory(..)
, NgramsStatePatch'
, NodeListStory
......@@ -65,14 +67,12 @@ module Gargantext.Core.NodeStory
, nse_getter
, nse_saver
, nse_saver_immediate
, nse_archive_saver_immediate
, nse_var
, unNodeStory
, getNodeArchiveHistory
, Archive(..)
, initArchive
, insertArchiveList
, deleteArchiveList
, updateArchiveList
, a_history
, a_state
, a_version
......@@ -86,7 +86,9 @@ module Gargantext.Core.NodeStory
, upsertNodeStories
, getNodeStory
, nodeStoriesQuery
, currentVersion )
, currentVersion
, archiveStateFromList
, archiveStateToList )
-- import Debug.Trace (traceShow)
......@@ -129,6 +131,7 @@ data NodeStoryEnv = NodeStoryEnv
{ _nse_var :: !(MVar NodeListStory)
, _nse_saver :: !(IO ())
, _nse_saver_immediate :: !(IO ())
, _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
, _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
--, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
-- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
......@@ -157,6 +160,9 @@ class HasNodeStorySaver env where
class HasNodeStoryImmediateSaver env where
hasNodeStoryImmediateSaver :: Getter env (IO ())
class HasNodeArchiveStoryImmediateSaver env where
hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
{- | Node Story for each NodeType where the Key of the Map is NodeId
......@@ -401,6 +407,10 @@ getNodeStory c nId@(NodeId nodeId) = do
, _a_history = []
, _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
-- NOTE Sanity check: all versions in the DB should be the same
-- TODO Maybe redesign the DB so that `node_stories` has only
-- `node_id`, `version` and there is a M2M table
-- `node_stories_ngrams` without the `version` colum? Then we would
-- have `version` in only one place.
let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
if Set.size versionsS > 1 then
panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
......@@ -424,8 +434,8 @@ type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
-- |Functions to convert archive state (which is a `Map NgramsType
-- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
archiveStateAsList :: NgramsState' -> ArchiveStateList
archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
archiveStateToList :: NgramsState' -> ArchiveStateList
archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
archiveStateFromList :: ArchiveStateList -> NgramsState'
archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
......@@ -444,13 +454,14 @@ insertNodeStory c (NodeId nId) a = do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
Nothing -> pure 0
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
pure ()
query :: PGS.Query
query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
VALUES (?, ?, ?, ?) |]
-- insert ngramsType ngrams ngramsRepoElement =
-- Insert { iTable = nodeStoryTable
-- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
......@@ -462,10 +473,9 @@ insertNodeStory c (NodeId nId) a = do
-- , iReturning = rCount
-- , iOnConflict = Nothing }
insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertArchiveList c nodeId a = do
_ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
--_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
insertArchiveStateList c nodeId version as = do
_ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
pure ()
query :: PGS.Query
......@@ -474,19 +484,18 @@ insertArchiveList c nodeId a = do
SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid =
deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
deleteArchiveList c nodeId a = do
_ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
--_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
deleteArchiveStateList c nodeId as = do
_ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
pure ()
query :: PGS.Query
query = [sql| DELETE FROM node_stories
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
updateArchiveList c nodeId a = do
let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
updateArchiveStateList c nodeId version as = do
let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
--q <- PGS.format c query params
--printDebug "[updateArchiveList] query" q
_ <- mapM (\p -> runPGSExecute c query p) params
......@@ -494,7 +503,7 @@ updateArchiveList c nodeId a = do
query :: PGS.Query
query = [sql| UPDATE node_stories
SET ngrams_repo_element = ?
SET ngrams_repo_element = ?, version = ?
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
-- | This function updates the node story and archive for given node_id.
......@@ -505,18 +514,18 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
-- 0. We assume we're inside an advisory lock
-- 1. Find differences (inserts/updates/deletes)
let currentList = archiveStateAsList $ currentArchive ^. a_state
let newList = archiveStateAsList $ newArchive ^. a_state
let currentList = archiveStateToList $ currentArchive ^. a_state
let newList = archiveStateToList $ newArchive ^. a_state
let currentSet = archiveStateSet currentList
let newSet = archiveStateSet newList
printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
printDebug "[updateNodeStory] inserts" inserts
-- printDebug "[updateNodeStory] inserts" inserts
printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
printDebug "[updateNodeStory] deletes" deletes
-- printDebug "[updateNodeStory] deletes" deletes
-- updates are the things that are in new but not in current
let commonSet = Set.intersection currentSet newSet
......@@ -527,20 +536,14 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
-- 2. Perform inserts/deletes/updates
--printDebug "[updateNodeStory] applying insert" ()
insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList inserts }
insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
--printDebug "[updateNodeStory] insert applied" ()
--TODO Use currentArchive ^. a_version in delete and report error
-- if entries with (node_id, ngrams_type_id, ngrams_id) but
-- different version are found.
deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList deletes }
deleteArchiveStateList c nodeId deletes
--printDebug "[updateNodeStory] delete applied" ()
updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList updates }
updateArchiveStateList c nodeId (newArchive ^. a_version) updates
--printDebug "[updateNodeStory] update applied" ()
pure ()
......@@ -567,11 +570,6 @@ upsertNodeStories c nodeId@(NodeId nId) newArchive = do
printDebug "[upsertNodeStories] locking nId" nId
runPGSAdvisoryXactLock c nId
-- whether it's insert or update, we can insert node archive history already
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
(NodeStory m) <- getNodeStory c nodeId
case Map.lookup nodeId m of
Nothing -> do
......@@ -639,7 +637,12 @@ readNodeStoryEnv pool = do
withResource pool $ \c -> do
--printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
writeNodeStories c ns
pure $ clearHistory ns
pure ns
let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
_ <- mapM (\(nId, a) -> do
insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
) $ Map.toList nls
pure $ clearHistory ns
saver <- mkNodeStorySaver saver_immediate
-- let saver = modifyMVar_ mvar $ \mv -> do
-- writeNodeStories pool mv
......@@ -650,6 +653,7 @@ readNodeStoryEnv pool = do
pure $ NodeStoryEnv { _nse_var = mvar
, _nse_saver = saver
, _nse_saver_immediate = saver_immediate
, _nse_archive_saver_immediate = archive_saver_immediate
, _nse_getter = nodeStoryVar pool (Just mvar)
......@@ -69,9 +69,11 @@ readNodeStoryEnv nsd = do
mvar <- nodeStoryVar nsd Nothing []
saver <- mkNodeStorySaver nsd mvar
let saver_immediate = withMVar mvar (writeNodeStories nsd)
let archive_saver_immediate ns = pure ns
pure $ NodeStoryEnv { _nse_var = mvar
, _nse_saver = saver
, _nse_saver_immediate = saver_immediate
, _nse_archive_saver_immediate = archive_saver_immediate
, _nse_getter = nodeStoryVar nsd (Just mvar) }
