Commit 91a2d6e2 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[nodeStory] insert/delete/update seems to work, needs more verification

parent e1dbfd70
Pipeline #3135 passed with stage
in 93 minutes and 41 seconds
......@@ -67,11 +67,16 @@ module Gargantext.Core.NodeStory
, getNodeArchiveHistory
, Archive(..)
, initArchive
, insertArchiveList
, deleteArchiveList
, updateArchiveList
, a_history
, a_state
, a_version
, nodeExists
, runPGSQuery
, runPGSAdvisoryLock
, runPGSAdvisoryUnlock
, runPGSAdvisoryXactLock
, getNodesIdWithType
, readNodeStoryEnv
......@@ -84,7 +89,7 @@ where
-- import Debug.Trace (traceShow)
import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
import Codec.Serialise.Class
import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
import Control.Concurrent (MVar(), newMVar, modifyMVar_)
import Control.Exception (catch, throw, SomeException(..))
import Control.Lens (makeLenses, Getter, (^.), (.~), (%~), _Just, at, traverse, view)
import Control.Monad.Except
......@@ -275,6 +280,14 @@ type ArchiveList = Archive NgramsState' NgramsStatePatch'
-- DB stuff
runPGSExecute :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> q -> IO Int64
runPGSExecute c qs a = catch (PGS.execute c qs a) printError
where
printError (SomeException e) = do
--q' <- PGS.formatQuery c qs a
--hPutStrLn stderr q'
throw (SomeException e)
runPGSExecuteMany :: (PGS.ToRow q) => PGS.Connection -> PGS.Query -> [q] -> IO Int64
runPGSExecuteMany c qs a = catch (PGS.executeMany c qs a) printError
where
......@@ -291,9 +304,19 @@ runPGSQuery c q a = catch (PGS.query c q a) printError
hPutStrLn stderr q'
throw (SomeException e)
runPGSAdvisoryLock :: PGS.Connection -> Int -> IO ()
runPGSAdvisoryLock c id = do
_ <- runPGSQuery c [sql| SELECT pg_advisory_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
pure ()
runPGSAdvisoryUnlock :: PGS.Connection -> Int -> IO ()
runPGSAdvisoryUnlock c id = do
_ <- runPGSQuery c [sql| SELECT pg_advisory_unlock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
pure ()
runPGSAdvisoryXactLock :: PGS.Connection -> Int -> IO ()
runPGSAdvisoryXactLock c id = do
_ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only Bool]
_ <- runPGSQuery c [sql| SELECT pg_advisory_xact_lock(?) |] (PGS.Only id) :: IO [PGS.Only ()]
pure ()
nodeExists :: PGS.Connection -> NodeId -> IO Bool
......@@ -404,8 +427,7 @@ archiveStateFromList l = Map.fromListWith (<>) $ (\(nt, t, nre) -> (nt, Map.sing
-- | This function inserts whole new node story and archive for given node_id.
insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertNodeStory c nodeId@(NodeId nId) a = do
printDebug "[insertNodeStory] _a_state" $ a ^. a_state
insertNodeStory c (NodeId nId) a = do
_ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
......@@ -413,9 +435,6 @@ insertNodeStory c nodeId@(NodeId nId) a = do
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateAsList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateAsList _a_state
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory c nodeId $ reverse $ a ^. a_history
pure ()
where
query :: PGS.Query
......@@ -431,8 +450,8 @@ insertNodeStory c nodeId@(NodeId nId) a = do
-- , iReturning = rCount
-- , iOnConflict = Nothing }
insertArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertArchive c nodeId a = do
insertArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertArchiveList c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nodeId, a ^. a_version, nt, nre, n)) <$> (archiveStateAsList $ a ^. a_state)
pure ()
where
......@@ -440,8 +459,8 @@ insertArchive c nodeId a = do
query = [sql| INSERT INTO node_stories(node_id, version, ngrams_type_id, ngrams_id, ngrams_repo_element)
SELECT ?, ?, ?, ngrams.id, ? FROM ngrams WHERE terms = ? |]
deleteArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
deleteArchive c nodeId a = do
deleteArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
deleteArchiveList c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, _) -> (nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
pure ()
where
......@@ -450,9 +469,12 @@ deleteArchive c nodeId a = do
DELETE FROM node_stories
WHERE node_id = ? AND ngrams_type_id = ? AND ngrams_id IN (SELECT id FROM ngrams WHERE terms = ?) |]
updateArchive :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
updateArchive c nodeId a = do
_ <- runPGSExecuteMany c query $ (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
updateArchiveList :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
updateArchiveList c nodeId a = do
let params = (\(nt, n, nre) -> (nre, nodeId, nt, n)) <$> (archiveStateAsList $ a ^. a_state)
--q <- PGS.format c query params
--printDebug "[updateArchiveList] query" q
_ <- mapM (\p -> runPGSExecute c query p) params
pure ()
where
query :: PGS.Query
......@@ -483,22 +505,22 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
printDebug "[updateNodeStory] updates" $ Text.unlines $ (Text.pack . show) <$> updates
-- 2. Perform inserts/deletes/updates
insertArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList inserts }
insertArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList inserts }
printDebug "[updateNodeStory] insert applied" ()
-- TODO Use currentArchive ^. a_version in delete and report error
-- if entries with (node_id, ngrams_type_id, ngrams_id) but
-- different version are found.
deleteArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList deletes }
updateArchive c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList updates }
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory c nodeId $ reverse $ newArchive ^. a_history
deleteArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList deletes }
printDebug "[updateNodeStory] delete applied" ()
updateArchiveList c nodeId $ Archive { _a_version = newArchive ^. a_version
, _a_history = []
, _a_state = archiveStateFromList updates }
printDebug "[updateNodeStory] update applied" ()
pure ()
-- where
-- update = Update { uTable = nodeStoryTable
......@@ -519,18 +541,25 @@ updateNodeStory c nodeId@(NodeId _nId) currentArchive newArchive = do
upsertNodeStories :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
upsertNodeStories c nodeId@(NodeId nId) newArchive = do
printDebug "[upsertNodeStories] START nId" nId
PGS.begin c
--runPGSAdvisoryXactLock c nId
(NodeStory m) <- getNodeStory c nodeId
case Map.lookup nodeId m of
Nothing -> do
_ <- insertNodeStory c nodeId newArchive
pure ()
Just currentArchive -> do
_ <- updateNodeStory c nodeId currentArchive newArchive
pure ()
PGS.commit c
printDebug "[upsertNodeStories] STOP nId" nId
PGS.withTransaction c $ do
printDebug "[upsertNodeStories] locking nId" nId
runPGSAdvisoryXactLock c nId
-- whether it's insert or update, we can insert node archive history already
-- NOTE: It is assumed that the most recent change is the first in the
-- list, so we save these in reverse order
insertNodeArchiveHistory c nodeId $ reverse $ newArchive ^. a_history
(NodeStory m) <- getNodeStory c nodeId
case Map.lookup nodeId m of
Nothing -> do
_ <- insertNodeStory c nodeId newArchive
pure ()
Just currentArchive -> do
_ <- updateNodeStory c nodeId currentArchive newArchive
pure ()
printDebug "[upsertNodeStories] STOP nId" nId
writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
writeNodeStories c (NodeStory nls) = do
......@@ -595,12 +624,15 @@ mkNodeStorySaver pool mvns = mkDebounce settings
where
settings = defaultDebounceSettings
{ debounceAction = do
withResource pool $ \c -> do
withMVar mvns $ \ns -> do
-- NOTE: Lock MVar first, then use resource pool.
-- Otherwise we could wait for MVar, while
-- blocking the pool connection.
modifyMVar_ mvns $ \ns -> do
withResource pool $ \c -> do
--printDebug "[mkNodeStorySaver] will call writeNodeStories, ns" ns
writeNodeStories c ns
withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
modifyMVar_ mvns $ \ns -> pure $ clearHistory ns
pure $ clearHistory ns
--withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
, debounceFreq = 1*minute
}
minute = 60*second
......
......@@ -211,14 +211,6 @@ pgContextId = pgNodeId
------------------------------------------------------------------------
newtype NodeId = NodeId Int
deriving (Read, Generic, Num, Eq, Ord, Enum, ToJSONKey, FromJSONKey, ToJSON, FromJSON, Hashable, Csv.ToField)
-- TODO make another type
type ContextId = NodeId
newtype NodeContextId = NodeContextId Int
deriving (Read, Generic, Num, Eq, Ord, Enum, ToJSONKey, FromJSONKey, ToJSON, FromJSON, Hashable, Csv.ToField)
instance GQLType NodeId
instance Show NodeId where
show (NodeId n) = "nodeId-" <> show n
......@@ -232,6 +224,14 @@ instance FromField NodeId where
then return $ NodeId n
else mzero
instance ToSchema NodeId
-- TODO make another type
type ContextId = NodeId
newtype NodeContextId = NodeContextId Int
deriving (Read, Generic, Num, Eq, Ord, Enum, ToJSONKey, FromJSONKey, ToJSON, FromJSON, Hashable, Csv.ToField)
--instance Csv.ToField NodeId where
-- toField (NodeId nodeId) = Csv.toField nodeId
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment