Commit e1dbfd70 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[nodeStory] insert/update/delete state handling

This doesn't work with locks yet.
parent 9166bb01
Pipeline #3129 passed with stage
in 93 minutes and 14 seconds
-- Start a new transaction. In case data migration goes wrong, we are
-- back to our original table.
BEGIN;
-- we will migrate data here
-- rename old table and create a new one
......@@ -56,3 +60,6 @@ INSERT INTO public.node_stories
FROM node_stories_old
CROSS JOIN jsonb_each(archive->'state'->'NgramsTerms') AS j
JOIN ngrams ON terms = j.key;
-- finally, write out the stuff
COMMIT;
......@@ -259,15 +259,6 @@ setListNgrams listId ngramsType ns = do
saveNodeStory
currentVersion :: HasNodeStory env err m
=> ListId -> m Version
currentVersion listId = do
--nls <- getRepo [listId]
pool <- view connPool
nls <- liftBase $ getNodeStory pool listId
pure $ nls ^. unNodeStory . at listId . _Just . a_version
newNgramsFromNgramsStatePatch :: NgramsStatePatch' -> [Ngrams]
newNgramsFromNgramsStatePatch p =
[ text2ngrams (unNgramsTerm n)
......
......@@ -19,6 +19,7 @@ import Control.Lens (_Just, (^.), at, view, At, Index, IxValue)
import Control.Monad.Reader
import Data.HashMap.Strict (HashMap)
import Data.Hashable (Hashable)
import Data.Pool (withResource)
import Data.Set (Set)
import Data.Validity
import Gargantext.API.Ngrams.Types
......@@ -202,15 +203,16 @@ migrateFromDirToDb :: (CmdM env err m, HasNodeStory env err m)
=> m ()
migrateFromDirToDb = do
pool <- view connPool
listIds <- liftBase $ getNodesIdWithType pool NodeList
printDebug "[migrateFromDirToDb] listIds" listIds
(NodeStory nls) <- NSF.getRepoReadConfig listIds
printDebug "[migrateFromDirToDb] nls" nls
_ <- mapM (\(nId, a) -> do
n <- liftBase $ nodeExists pool nId
case n of
False -> pure ()
True -> liftBase $ upsertNodeArchive pool nId a
) $ Map.toList nls
--_ <- nodeStoryIncs (Just $ NodeStory nls) listIds
pure ()
withResource pool $ \c -> do
listIds <- liftBase $ getNodesIdWithType c NodeList
printDebug "[migrateFromDirToDb] listIds" listIds
(NodeStory nls) <- NSF.getRepoReadConfig listIds
printDebug "[migrateFromDirToDb] nls" nls
_ <- mapM (\(nId, a) -> do
n <- liftBase $ nodeExists c nId
case n of
False -> pure ()
True -> liftBase $ upsertNodeStories c nId a
) $ Map.toList nls
--_ <- nodeStoryIncs (Just $ NodeStory nls) listIds
pure ()
......@@ -72,11 +72,13 @@ module Gargantext.Core.NodeStory
, a_version
, nodeExists
, runPGSQuery
, runPGSAdvisoryXactLock
, getNodesIdWithType
, readNodeStoryEnv
, upsertNodeArchive
, upsertNodeStories
, getNodeStory
, nodeStoriesQuery )
, nodeStoriesQuery
, currentVersion )
where
-- import Debug.Trace (traceShow)
......@@ -84,7 +86,7 @@ import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debo
import Codec.Serialise.Class
import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
import Control.Exception (catch, throw, SomeException(..))
import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), traverse)
import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
import Control.Monad.Except
import Control.Monad.Reader
import Data.Aeson hiding ((.=), decode)
......@@ -99,15 +101,17 @@ import Database.PostgreSQL.Simple.FromField (FromField(fromField), fromJSONField
import Data.Profunctor.Product.TH (makeAdaptorAndInstance)
import GHC.Generics (Generic)
import Gargantext.API.Ngrams.Types
import Gargantext.Core.Types (NodeId(..), NodeType)
import Gargantext.Core.Types (ListId, NodeId(..), NodeType)
import Gargantext.Core.Utils.Prefix (unPrefix)
import Gargantext.Database.Prelude (CmdM', HasConnectionPool, HasConfig)
import Gargantext.Database.Prelude (CmdM', HasConnectionPool(..), HasConfig)
import Gargantext.Database.Query.Table.Node.Error (HasNodeError())
import Gargantext.Prelude
import Opaleye (DefaultFromField(..), SqlJsonb, fromPGSFromField)
import System.IO (stderr)
import qualified Data.Map.Strict as Map
import qualified Data.Map.Strict.Patch as PM
import qualified Data.Set as Set
import qualified Data.Text as Text
import qualified Database.PostgreSQL.Simple as PGS
import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
......@@ -267,33 +271,38 @@ $(makeAdaptorAndInstance "pNodeArchiveStory" ''NodeStoryArchivePoly)
-- type NodeStoryArchiveWrite = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
-- type NodeStoryArchiveRead = NodeStoryArchivePoly (Column SqlInt4) (Column SqlJsonb)
type ArchiveQ = Archive NgramsState' NgramsStatePatch'
type ArchiveList = Archive NgramsState' NgramsStatePatch'
-- DB stuff
runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
where
printError _c (SomeException e) = do
printError (SomeException e) = do
--q' <- PGS.formatQuery c qs a
--hPutStrLn stderr q'
throw (SomeException e)
runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO [r]
runPGSQuery c q a = catch (PGS.query c q a) printError
where
printError c (SomeException e) = do
printError (SomeException e) = do
q' <- PGS.formatQuery c q a
hPutStrLn stderr q'
throw (SomeException e)
nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
nodeExists pool nId = (== [PGS.Only True])
<$> runPGSQuery pool [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
runPGSAdvisoryXactLock c id = do
_ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
pure ()
nodeExists :: PGS.Connection -> NodeId -> IO Bool
nodeExists c nId = (== [PGS.Only True])
<$> runPGSQuery c [sql| SELECT true FROM nodes WHERE id = ? LIMIT 1 |] (PGS.Only nId)
getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
getNodesIdWithType pool nt = do
ns <- runPGSQuery pool query (PGS.Only nt)
getNodesIdWithType :: PGS.Connection -> NodeType -> IO [NodeId]
getNodesIdWithType c nt = do
ns <- runPGSQuery c query (PGS.Only nt)
pure $ map (\(PGS.Only nId) -> NodeId nId) ns
where
query :: PGS.Query
......@@ -321,9 +330,9 @@ getNodesIdWithType pool nt = do
-- nodeStorySelect = selectTable nodeStoryTable
-- TODO Check ordering, "first patch in the _a_history list is the most recent"
getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
getNodeArchiveHistory pool nodeId = do
as <- runPGSQuery pool query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
getNodeArchiveHistory :: PGS.Connection -> NodeId -> IO [NgramsStatePatch']
getNodeArchiveHistory c nodeId = do
as <- runPGSQuery c query (PGS.Only nodeId) :: IO [(TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
pure $ (\(ngramsType, terms, patch) -> fst $ PM.singleton ngramsType (NgramsTablePatch $ fst $ PM.singleton terms patch)) <$> as
where
query :: PGS.Query
......@@ -336,28 +345,28 @@ ngramsIdQuery :: PGS.Query
ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
insertNodeArchiveHistory :: PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
insertNodeArchiveHistory _ _ [] = pure ()
insertNodeArchiveHistory pool nodeId (h:hs) = do
insertNodeArchiveHistory c nodeId (h:hs) = do
let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
(\(term, p) ->
(nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
tuplesM <- mapM (\(nId, nType, term, patch) -> do
ngrams <- runPGSQuery pool ngramsIdQuery (PGS.Only term)
ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
_ <- runPGSExecuteMany pool query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch)) <$> (catMaybes tuplesM))
_ <- insertNodeArchiveHistory pool nodeId hs
_ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch)) <$> (catMaybes tuplesM))
_ <- insertNodeArchiveHistory c nodeId hs
pure ()
where
query :: PGS.Query
query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch) VALUES (?, ?, ?, ?) |]
getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
getNodeStory pool nId@(NodeId nodeId) = do
getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
getNodeStory c nId@(NodeId nodeId) = do
--res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId Version Int Int NgramsRepoElement]
res <- runPGSQuery pool nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
res <- runPGSQuery c nodeStoriesQuery (PGS.Only nodeId) :: IO [(Version, TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
-- We have multiple rows with same node_id and different (ngrams_type_id, ngrams_id).
-- Need to create a map: {<node_id>: {<ngrams_type_id>: {<ngrams_id>: <data>}}}
let dbData = map (\(version, ngramsType, ngrams, ngrams_repo_element) ->
......@@ -383,27 +392,30 @@ nodeStoriesQuery = [sql| SELECT version, ngrams_type_id, terms, ngrams_repo_elem
JOIN ngrams ON ngrams.id = ngrams_id
WHERE node_id = ? |]
type ArchiveStateList = [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
-- Functions to convert archive state (which is a Map NgramsType (Map
-- NgramsTerm NgramsRepoElement)) to/from a flat list
archiveStateAsList :: NgramsState' -> [(TableNgrams.NgramsType, NgramsTerm, NgramsRepoElement)]
archiveStateAsList :: NgramsState' -> ArchiveStateList
archiveStateAsList s = mconcat $ (\(nt, ntm) -> (\(n, nre) -> (nt, n, nre)) <$> Map.toList ntm) <$> Map.toList s
archiveStateFromList :: ArchiveStateList -> NgramsState'
archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.singleton t nre)) <$> l
-- | This function inserts whole new node story and archive for given node_id.
-- | This function inserts whole new node story and archive for given node_id.
insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertNodeStory c nodeId@(NodeId nId) a = do
printDebug "[insertNodeStory] _a_state" $ a ^. a_state
_ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
insertNodeStory pool nodeId@(NodeId nId) (Archive {..}) = do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
Nothing -> pure 0
case headMay termIdM of
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
-- NOTE: It is assumed that the most recent change is the first in the
insertNodeArchiveHistory c nodeId $ reverse $ a ^. a_history
pure ()
where
query :: PGS.Query
......@@ -419,19 +431,74 @@ insertNodeStory pool nodeId@(NodeId nId) (Archive {..}) = do
-- , iReturning = rCount
-- , iOnConflict = Nothing }
-- | This function updates the node story and archive for given node_id.
updateNodeStory :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO ()
updateNodeStory pool nodeId@(NodeId _nId) (Archive {..}) = do
-- TODO This requires updating current DB state (which is spanned
-- along many rows)
insertArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertArchive c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
pure ()
where
query :: PGS.Query
query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
deleteArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
deleteArchive c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
pure ()
where
query :: PGS.Query
query = [sql| WITH (SELECT id FROM ngrams WHERE terms = ?) AS ngrams
DELETE FROM node_stories
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
-- The idea is this: fetch all current state data from the DB
-- (locking the rows), perform a diff and update what is necessary.
-- ret <- withResource pool $ \c -> runUpdate c update
updateArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
updateArchive c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
pure ()
where
query :: PGS.Query
query = [sql| UPDATE node_stories
SET ngrams_repo_element = ?
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
-- | This function updates the node story and archive for given node_id.
updateNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> ArchiveList -> IO ()
updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
-- STEPS
-- 0. We assume we're inside an advisory lock
-- 1. Find differences (inserts/updates/deletes)
let currentList = archiveStateAsList $ currentArchive ^. a_state
let newList = archiveStateAsList $ newArchive ^. a_state
let currentSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> currentList
let newSet = Set.fromList $ (\(nt, n, _) -> (nt, n)) <$> newList
let inserts = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference newSet currentSet) newList
printDebug "[updateNodeStory] inserts" inserts
let deletes = filter (\(nt, n, _) -> Set.member (nt, n) $ Set.difference currentSet newSet) currentList
printDebug "[updateNodeStory] deletes" deletes
-- updates are the things that are in new but not in current
let updates = Set.toList $ Set.difference (Set.fromList newList) (Set.fromList currentList)
printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
-- 2. Perform inserts/deletes/updates
insertArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList inserts }
-- TODO Use currentArchive ^. a_version in delete and report error
-- if entries with (node_id, ngrams_type_id, ngrams_id) but
-- different version are found.
deleteArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList deletes }
updateArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList updates }
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory pool nodeId $ reverse _a_history
insertNodeArchiveHistory c nodeId $ reverse $ newArchive ^. a_history
pure ()
-- where
-- update = Update { uTable = nodeStoryTable
......@@ -449,38 +516,43 @@ updateNodeStory pool nodeId@(NodeId _nId) (Archive {..}) = do
-- , dWhere = (\row -> node_id row .== sqlInt4 nId)
-- , dReturning = rCount }
upsertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO ()
upsertNodeArchive pool nId a = do
(NodeStory m) <- getNodeStory pool nId
case Map.lookup nId m of
upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
upsertNodeStories c nodeId@(NodeId nId) newArchive = do
printDebug "[upsertNodeStories] START nId" nId
PGS.begin c
--runPGSAdvisoryXactLock c nId
(NodeStory m) <- getNodeStory c nodeId
case Map.lookup nodeId m of
Nothing -> do
_ <- insertNodeStory pool nId a
_ <- insertNodeStory c nodeId newArchive
pure ()
Just _ -> do
_ <- updateNodeStory pool nId a
Just currentArchive -> do
_ <- updateNodeStory c nodeId currentArchive newArchive
pure ()
PGS.commit c
printDebug "[upsertNodeStories] STOP nId" nId
writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
writeNodeStories pool (NodeStory nls) = do
_ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
writeNodeStories c (NodeStory nls) = do
_ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
pure ()
-- | Returns a `NodeListStory`, updating the given one for given `NodeId`
nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
nodeStoryInc pool Nothing nId = getNodeStory pool nId
nodeStoryInc pool (Just ns@(NodeStory nls)) nId = do
nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
nodeStoryInc c Nothing nId = getNodeStory c nId
nodeStoryInc c (Just ns@(NodeStory nls)) nId = do
case Map.lookup nId nls of
Nothing -> do
(NodeStory nls') <- getNodeStory pool nId
(NodeStory nls') <- getNodeStory c nId
pure $ NodeStory $ Map.union nls nls'
Just _ -> pure ns
nodeStoryIncs :: Pool PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
nodeStoryIncs :: PGS.Connection -> Maybe NodeListStory -> [NodeId] -> IO NodeListStory
nodeStoryIncs _ Nothing [] = pure $ NodeStory $ Map.empty
nodeStoryIncs pool (Just nls) ns = foldM (\m n -> nodeStoryInc pool (Just m) n) nls ns
nodeStoryIncs pool Nothing (ni:ns) = do
m <- getNodeStory pool ni
nodeStoryIncs pool (Just m) ns
nodeStoryIncs c (Just nls) ns = foldM (\m n -> nodeStoryInc c (Just m) n) nls ns
nodeStoryIncs c Nothing (ni:ns) = do
m <- getNodeStory c ni
nodeStoryIncs c (Just m) ns
-- nodeStoryDec :: Pool PGS.Connection -> NodeListStory -> NodeId -> IO NodeListStory
-- nodeStoryDec pool ns@(NodeStory nls) ni = do
......@@ -510,10 +582,10 @@ readNodeStoryEnv pool = do
nodeStoryVar :: Pool PGS.Connection -> Maybe (MVar NodeListStory) -> [NodeId] -> IO (MVar NodeListStory)
nodeStoryVar pool Nothing nIds = do
state <- nodeStoryIncs pool Nothing nIds
state <- withResource pool $ \c -> nodeStoryIncs c Nothing nIds
newMVar state
nodeStoryVar pool (Just mv) nIds = do
_ <- modifyMVar_ mv $ \nsl -> (nodeStoryIncs pool (Just nsl) nIds)
_ <- withResource pool $ \c -> modifyMVar_ mv $ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
pure mv
-- Debounce is useful since it could delay the saving to some later
......@@ -523,7 +595,10 @@ mkNodeStorySaver pool mvns = mkDebounce settings
where
settings = defaultDebounceSettings
{ debounceAction = do
withMVar mvns (\ns -> writeNodeStories pool ns)
withResource pool $ \c -> do
withMVar mvns $ \ns -> do
--printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
writeNodeStories c ns
withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
, debounceFreq = 1*minute
......@@ -536,6 +611,13 @@ clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHi
where
emptyHistory = [] :: [NgramsStatePatch']
currentVersion :: (HasNodeStory env err m) => ListId -> m Version
currentVersion listId = do
pool <- view connPool
nls <- withResource pool $ \c -> liftBase $ getNodeStory c listId
pure $ nls ^. unNodeStory . at listId . _Just . a_version
-- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
-- mkNodeStorySaver mvns = mkDebounce settings
-- where
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment