Commit d18e6db2 authored by Alexandre Delanoë's avatar Alexandre Delanoë

Merge remote-tracking branch 'origin/476-dev-fix-node-story-versions' into dev-merge

parents f9dca095 90131b53
......@@ -82,6 +82,9 @@ instance HasNodeStorySaver Env where
instance HasNodeStoryImmediateSaver Env where
hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate
instance HasNodeArchiveStoryImmediateSaver Env where
hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate
instance HasSettings Env where
settings = env_settings
......@@ -138,5 +141,8 @@ instance HasNodeStorySaver DevEnv where
instance HasNodeStoryImmediateSaver DevEnv where
hasNodeStoryImmediateSaver = hasNodeStory . nse_saver_immediate
instance HasNodeArchiveStoryImmediateSaver DevEnv where
hasNodeArchiveStoryImmediateSaver = hasNodeStory . nse_archive_saver_immediate
instance HasMail DevEnv where
mailSettings = dev_env_mail
......@@ -201,7 +201,6 @@ saveNodeStoryImmediate = do
saver
--Gargantext.Prelude.putStrLn "---- Node story immediate saver finished ----"
listTypeConflictResolution :: ListType -> ListType -> ListType
listTypeConflictResolution _ _ = undefined -- TODO Use Map User ListType
......@@ -293,13 +292,17 @@ newNgramsFromNgramsStatePatch p =
commitStatePatch :: (HasNodeStory env err m, HasMail env)
commitStatePatch :: ( HasNodeStory env err m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
, HasMail env)
=> ListId
-> Versioned NgramsStatePatch'
-> m (Versioned NgramsStatePatch')
commitStatePatch listId (Versioned _p_version p) = do
-- printDebug "[commitStatePatch]" listId
var <- getNodeStoryVar [listId]
archiveSaver <- view hasNodeArchiveStoryImmediateSaver
vq' <- liftBase $ modifyMVar var $ \ns -> do
let
a = ns ^. unNodeStory . at listId . _Just
......@@ -329,10 +332,28 @@ commitStatePatch listId (Versioned _p_version p) = do
-}
-- printDebug "[commitStatePatch] a version" (a ^. a_version)
-- printDebug "[commitStatePatch] a' version" (a' ^. a_version)
pure ( ns & unNodeStory . at listId .~ (Just a')
let newNs = ( ns & unNodeStory . at listId .~ (Just a')
, Versioned (a' ^. a_version) q'
)
-- NOTE Now is the only good time to save the archive history. We
-- have the handle to the MVar and we need to save its exact
-- snapshot. Node Story archive is a linear table, so it's only
-- couple of inserts, it shouldn't take long...
-- If we postponed saving the archive to the debounce action, we
-- would have issues like
-- https://gitlab.iscpif.fr/gargantext/purescript-gargantext/issues/476
-- where the `q` computation from above (which uses the archive)
-- would cause incorrect patch application (before the previous
-- archive was saved and applied)
newNs' <- archiveSaver $ fst newNs
pure (newNs', snd newNs)
-- NOTE State (i.e. `NodeStory` can be saved asynchronously, i.e. with debounce)
saveNodeStory
--saveNodeStoryImmediate
-- Save new ngrams
_ <- insertNgrams (newNgramsFromNgramsStatePatch p)
......@@ -366,6 +387,8 @@ tableNgramsPull listId ngramsType p_version = do
-- client.
-- TODO-ACCESS check
tableNgramsPut :: ( HasNodeStory env err m
, HasNodeStoryImmediateSaver env
, HasNodeArchiveStoryImmediateSaver env
, HasInvalidError err
, HasSettings env
, HasMail env
......
......@@ -26,7 +26,7 @@ columns: `node_id`, `ngrams_type_id`, `patch`. This is because each
history item is in fact a map from `NgramsType` to `NgramsTablePatch`
(see the `NgramsStatePatch'` type).
Moreover, since in ~G.A.Ngrams.commitStatePatch~ we use current state
Moreover, since in `G.A.Ngrams.commitStatePatch` we use current state
only, with only recent history items, I concluded that it is not
necessary to load whole history into memory. Instead, it is kept in DB
(history is immutable) and only recent changes are added to
......@@ -56,6 +56,8 @@ module Gargantext.Core.NodeStory
, hasNodeStorySaver
, HasNodeStoryImmediateSaver
, hasNodeStoryImmediateSaver
, HasNodeArchiveStoryImmediateSaver
, hasNodeArchiveStoryImmediateSaver
, NodeStory(..)
, NgramsStatePatch'
, NodeListStory
......@@ -65,14 +67,12 @@ module Gargantext.Core.NodeStory
, nse_getter
, nse_saver
, nse_saver_immediate
, nse_archive_saver_immediate
, nse_var
, unNodeStory
, getNodeArchiveHistory
, Archive(..)
, initArchive
, insertArchiveList
, deleteArchiveList
, updateArchiveList
, a_history
, a_state
, a_version
......@@ -86,7 +86,9 @@ module Gargantext.Core.NodeStory
, upsertNodeStories
, getNodeStory
, nodeStoriesQuery
, currentVersion )
, currentVersion
, archiveStateFromList
, archiveStateToList )
where
-- import Debug.Trace (traceShow)
......@@ -129,6 +131,7 @@ data NodeStoryEnv = NodeStoryEnv
{ _nse_var :: !(MVar NodeListStory)
, _nse_saver :: !(IO ())
, _nse_saver_immediate :: !(IO ())
, _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
, _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
--, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
-- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
......@@ -157,6 +160,9 @@ class HasNodeStorySaver env where
class HasNodeStoryImmediateSaver env where
hasNodeStoryImmediateSaver :: Getter env (IO ())
class HasNodeArchiveStoryImmediateSaver env where
hasNodeArchiveStoryImmediateSaver :: Getter env (NodeListStory -> IO NodeListStory)
------------------------------------------------------------------------
{- | Node Story for each NodeType where the Key of the Map is NodeId
......@@ -386,7 +392,9 @@ insertNodeArchiveHistory c nodeId version (h:hs) = do
where
query :: PGS.Query
query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version) VALUES (?, ?, ?, ?, ?) |]
query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
VALUES (?, ?, ?, ?, ?) |]
getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
getNodeStory c nId@(NodeId nodeId) = do
......@@ -398,9 +406,18 @@ getNodeStory c nId@(NodeId nodeId) = do
Archive { _a_version = version
, _a_history = []
, _a_state = Map.singleton ngramsType $ Map.singleton ngrams ngrams_repo_element }) res
-- NOTE When concatenating, check that the same version is for all states
-- NOTE Sanity check: all versions in the DB should be the same
-- TODO Maybe redesign the DB so that `node_stories` has only
-- `node_id`, `version` and there is a M2M table
-- `node_stories_ngrams` without the `version` colum? Then we would
-- have `version` in only one place.
let versionsS = Set.fromList $ map (\a -> a ^. a_version) dbData
if Set.size versionsS > 1 then
panic $ Text.pack $ "[getNodeStory] versions for " <> show nodeId <> " differ! " <> show versionsS
else
pure ()
pure $ NodeStory $ Map.singleton nId $ foldl combine mempty dbData
--pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
where
-- NOTE (<>) for Archive doesn't concatenate states, so we have to use `combine`
combine a1 a2 = a1 & a_state %~ combineState (a2 ^. a_state)
......@@ -413,15 +430,23 @@ nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_elem
WHERE node_id = ? |]
type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
type ArchiveStateSet = Set.Set (TableNgrams.NgramsType, NgramsTerm)
-- Functions to convert archive state (which is a Map NgramsType (Map
-- NgramsTerm NgramsRepoElement)) to/from a flat list
archiveStateAsList :: NgramsState' -> ArchiveStateList
archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
-- |Functions to convert archive state (which is a `Map NgramsType
-- (Map NgramsTerm NgramsRepoElement`)) to/from a flat list
archiveStateToList :: NgramsState' -> ArchiveStateList
archiveStateToList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
archiveStateFromList :: ArchiveStateList -> NgramsState'
archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
archiveStateSet :: ArchiveStateList -> ArchiveStateSet
archiveStateSet lst = Set.fromList $ (\(nt, term, _) -> (nt, term)) <$> lst
archiveStateListFilterFromSet :: ArchiveStateSet -> ArchiveStateList -> ArchiveStateList
archiveStateListFilterFromSet set =
filter (\(nt, term, _) -> Set.member (nt, term) set)
-- | This function inserts whole new node story and archive for given node_id.
insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertNodeStory c (NodeId nId) a = do
......@@ -429,13 +454,14 @@ insertNodeStory c (NodeId nId) a = do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
Nothing -> pure 0
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
pure ()
where
query :: PGS.Query
query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element) VALUES (?, ?, ?, ?) |]
query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
VALUES (?, ?, ?, ?) |]
-- insert ngramsType ngrams ngramsRepoElement =
-- Insert { iTable = nodeStoryTable
-- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
......@@ -447,10 +473,9 @@ insertNodeStory c (NodeId nId) a = do
-- , iReturning = rCount
-- , iOnConflict = Nothing }
insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertArchiveList c nodeId a = do
_ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, a ^. a_version, nt, nre, n)) (archiveStateAsList $ a ^. a_state)
--_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
insertArchiveStateList c nodeId version as = do
_ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
pure ()
where
query :: PGS.Query
......@@ -459,19 +484,18 @@ insertArchiveList c nodeId a = do
SELECT s.sid, s.sversion, s.sngrams_type_id, s.sngrams_id, s.srepo from s s join nodes n on s.sid = n.id
|]
deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
deleteArchiveList c nodeId a = do
_ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) (archiveStateAsList $ a ^. a_state)
--_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
deleteArchiveStateList c nodeId as = do
_ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
pure ()
where
query :: PGS.Query
query = [sql| DELETE FROM node_stories
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
updateArchiveList c nodeId a = do
let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
updateArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
updateArchiveStateList c nodeId version as = do
let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
--q <- PGS.format c query params
--printDebug "[updateArchiveList] query" q
_ <- mapM (\p -> runPGSExecute c query p) params
......@@ -479,7 +503,7 @@ updateArchiveList c nodeId a = do
where
query :: PGS.Query
query = [sql| UPDATE node_stories
SET ngrams_repo_element = ?
SET ngrams_repo_element = ?, version = ?
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
-- | This function updates the node story and archive for given node_id.
......@@ -490,36 +514,36 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
-- 0. We assume we're inside an advisory lock
-- 1. Find differences (inserts/updates/deletes)
let currentList = archiveStateAsList $ currentArchive ^. a_state
let newList = archiveStateAsList $ newArchive ^. a_state
let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
let currentList = archiveStateToList $ currentArchive ^. a_state
let newList = archiveStateToList $ newArchive ^. a_state
let currentSet = archiveStateSet currentList
let newSet = archiveStateSet newList
let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
--printDebug "[updateNodeStory] inserts" inserts
let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
--printDebug "[updateNodeStory] deletes" deletes
printDebug "[updateNodeStory] new - current = " $ Set.difference newSet currentSet
let inserts = archiveStateListFilterFromSet (Set.difference newSet currentSet) newList
-- printDebug "[updateNodeStory] inserts" inserts
printDebug "[updateNodeStory] current - new" $ Set.difference currentSet newSet
let deletes = archiveStateListFilterFromSet (Set.difference currentSet newSet) currentList
-- printDebug "[updateNodeStory] deletes" deletes
-- updates are the things that are in new but not in current
let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
--printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
let commonSet = Set.intersection currentSet newSet
let commonNewList = archiveStateListFilterFromSet commonSet newList
let commonCurrentList = archiveStateListFilterFromSet commonSet currentList
let updates = Set.toList $ Set.difference (Set.fromList commonNewList) (Set.fromList commonCurrentList)
printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
-- 2. Perform inserts/deletes/updates
--printDebug "[updateNodeStory] applying insert" ()
insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList inserts }
insertArchiveStateList c nodeId (newArchive ^. a_version) inserts
--printDebug "[updateNodeStory] insert applied" ()
--TODO Use currentArchive ^. a_version in delete and report error
-- if entries with (node_id, ngrams_type_id, ngrams_id) but
-- different version are found.
deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList deletes }
deleteArchiveStateList c nodeId deletes
--printDebug "[updateNodeStory] delete applied" ()
updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList updates }
updateArchiveStateList c nodeId (newArchive ^. a_version) updates
--printDebug "[updateNodeStory] update applied" ()
pure ()
......@@ -546,11 +570,6 @@ upsertNodeStories c nodeId@(NodeId nId) newArchive = do
printDebug "[upsertNodeStories] locking nId" nId
runPGSAdvisoryXactLock c nId
-- whether it's insert or update, we can insert node archive history already
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory c nodeId (newArchive ^. a_version) $ reverse $ newArchive ^. a_history
(NodeStory m) <- getNodeStory c nodeId
case Map.lookup nodeId m of
Nothing -> do
......@@ -560,8 +579,23 @@ upsertNodeStories c nodeId@(NodeId nId) newArchive = do
_ <- updateNodeStory c nodeId currentArchive newArchive
pure ()
-- 3. Now we need to set versions of all node state to be the same
fixNodeStoryVersion c nodeId newArchive
printDebug "[upsertNodeStories] STOP nId" nId
fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
fixNodeStoryVersion c nodeId newArchive = do
let ngramsTypes = Map.keys $ newArchive ^. a_state
_ <- mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
pure ()
where
query :: PGS.Query
query = [sql|UPDATE node_stories
SET version = ?
WHERE node_id = ?
AND ngrams_type_id = ?|]
writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
writeNodeStories c (NodeStory nls) = do
_ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
......@@ -579,10 +613,10 @@ nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
nodeStoryIncs c Nothing (ni:ns) = do
m <- getNodeStory c ni
nodeStoryIncs c (Just m) ns
nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
-- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
-- nodeStoryDec pool ns@(NodeStory nls) ni = do
......@@ -599,12 +633,17 @@ nodeStoryIncs c Nothing (ni:ns) = do
readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
readNodeStoryEnv pool = do
mvar <- nodeStoryVar pool Nothing []
saver <- mkNodeStorySaver pool mvar
let saver_immediate = modifyMVar_ mvar $ \ns -> do
withResource pool $ \c -> do
--printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
writeNodeStories c ns
pure ns
let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
_ <- mapM (\(nId, a) -> do
insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
) $ Map.toList nls
pure $ clearHistory ns
saver <- mkNodeStorySaver saver_immediate
-- let saver = modifyMVar_ mvar $ \mv -> do
-- writeNodeStories pool mv
-- printDebug "[readNodeStoryEnv] saver" mv
......@@ -614,6 +653,7 @@ readNodeStoryEnv pool = do
pure $ NodeStoryEnv { _nse_var = mvar
, _nse_saver = saver
, _nse_saver_immediate = saver_immediate
, _nse_archive_saver_immediate = archive_saver_immediate
, _nse_getter = nodeStoryVar pool (Just mvar)
}
......@@ -632,19 +672,22 @@ nodeStoryVar pool (Just mv) nIds = do
-- Debounce is useful since it could delay the saving to some later
-- time, asynchronously and we keep operating on memory only.
mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
mkNodeStorySaver pool mvns = mkDebounce settings
-- mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
-- mkNodeStorySaver pool mvns = do
mkNodeStorySaver :: IO () -> IO (IO ())
mkNodeStorySaver saver = mkDebounce settings
where
settings = defaultDebounceSettings
{ debounceAction = do
-- NOTE: Lock MVar first, then use resource pool.
-- Otherwise we could wait for MVar, while
-- blocking the pool connection.
modifyMVar_ mvns $ \ns -> do
withResource pool $ \c -> do
--printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
writeNodeStories c ns
pure $ clearHistory ns
{ debounceAction = saver
-- do
-- -- NOTE: Lock MVar first, then use resource pool.
-- -- Otherwise we could wait for MVar, while
-- -- blocking the pool connection.
-- modifyMVar_ mvns $ \ns -> do
-- withResource pool $ \c -> do
-- --printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
-- writeNodeStories c ns
-- pure $ clearHistory ns
--withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
, debounceFreq = 1*minute
}
......
......@@ -69,9 +69,11 @@ readNodeStoryEnv nsd = do
mvar <- nodeStoryVar nsd Nothing []
saver <- mkNodeStorySaver nsd mvar
let saver_immediate = withMVar mvar (writeNodeStories nsd)
let archive_saver_immediate ns = pure ns
pure $ NodeStoryEnv { _nse_var = mvar
, _nse_saver = saver
, _nse_saver_immediate = saver_immediate
, _nse_archive_saver_immediate = archive_saver_immediate
, _nse_getter = nodeStoryVar nsd (Just mvar) }
------------------------------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment