[nodestory] fix INSERT function so that it is performed only...

...when node_id exists in the DB

Warning: this silently avoids insertion of patches when node_id is
missing for some reason in the history.
parent b3126623
Pipeline #3606 failed with stage
in 53 minutes and 43 seconds
......@@ -137,7 +137,7 @@ data NodeStoryEnv = NodeStoryEnv
, _nse_saver :: !(IO ())
, _nse_saver_immediate :: !(IO ())
, _nse_archive_saver_immediate :: !(NodeListStory -> IO NodeListStory)
, _nse_getter :: [NodeId] -> IO (MVar NodeListStory)
, _nse_getter :: !([NodeId] -> IO (MVar NodeListStory))
--, _nse_cleaner :: !(IO ()) -- every 12 hours: cleans the repos of unused NodeStories
-- , _nse_lock :: !FileLock -- TODO (it depends on the option: if with database or file only)
}
......@@ -265,16 +265,16 @@ makeLenses ''Archive
----------------------------------------------------------------------
data NodeStoryPoly nid v ngtid ngid nre =
NodeStoryDB { node_id :: nid
, version :: v
, ngrams_type_id :: ngtid
, ngrams_id :: ngid
, ngrams_repo_element :: nre }
NodeStoryDB { node_id :: !nid
, version :: !v
, ngrams_type_id :: !ngtid
, ngrams_id :: !ngid
, ngrams_repo_element :: !nre }
deriving (Eq)
data NodeStoryArchivePoly nid a =
NodeStoryArchiveDB { a_node_id :: nid
, archive :: a }
NodeStoryArchiveDB { a_node_id :: !nid
, archive :: !a }
deriving (Eq)
$(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
......@@ -385,22 +385,30 @@ ngramsIdQuery = [sql| SELECT id FROM ngrams WHERE terms = ? |]
insertNodeArchiveHistory :: PGS.Connection -> NodeId -> Version -> [NgramsStatePatch'] -> IO ()
insertNodeArchiveHistory _ _ _ [] = pure ()
insertNodeArchiveHistory c nodeId version (h:hs) = do
let tuples = mconcat $ (\(nType, (NgramsTablePatch patch)) ->
let tuples = mconcat $ (\(nType, NgramsTablePatch patch) ->
(\(term, p) ->
(nodeId, nType, term, p)) <$> PM.toList patch) <$> PM.toList h :: [(NodeId, TableNgrams.NgramsType, NgramsTerm, NgramsPatch)]
tuplesM <- mapM (\(nId, nType, term, patch) -> do
ngrams <- runPGSQuery c ngramsIdQuery (PGS.Only term)
pure $ (\(PGS.Only termId) -> (nId, nType, termId, term, patch)) <$> (headMay ngrams)
) tuples :: IO [Maybe (NodeId, TableNgrams.NgramsType, Int, NgramsTerm, NgramsPatch)]
_ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> (catMaybes tuplesM))
_ <- runPGSExecuteMany c query $ ((\(nId, nType, termId, _term, patch) -> (nId, nType, termId, patch, version)) <$> catMaybes tuplesM)
_ <- insertNodeArchiveHistory c nodeId version hs
pure ()
where
-- https://dba.stackexchange.com/questions/265554/how-to-check-other-table-for-value-during-insert
query :: PGS.Query
query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, ngrams_id, patch, version)
VALUES (?, ?, ?, ?, ?) |]
SELECT node_id, ngrams_type_id, ngrams_id, patch::jsonb, version FROM (
VALUES (?, ?, ?, ?, ?)
) AS i(node_id, ngrams_type_id, ngrams_id, patch, version)
WHERE NOT EXISTS (
SELECT FROM ngrams
CROSS JOIN nodes
WHERE ngrams.id = ngrams_id
AND nodes.id = node_id
)|]
getNodeStory :: PGS.Connection -> NodeId -> IO NodeListStory
getNodeStory c nId@(NodeId nodeId) = do
......@@ -459,18 +467,26 @@ archiveStateListFilterFromSet set =
-- | This function inserts whole new node story and archive for given node_id.
insertNodeStory :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
insertNodeStory c (NodeId nId) a = do
_ <- mapM (\(ngramsType, ngrams, ngramsRepoElement) -> do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
Nothing -> pure 0
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
mapM_ (\(ngramsType, ngrams, ngramsRepoElement) -> do
termIdM <- runPGSQuery c ngramsIdQuery (PGS.Only ngrams) :: IO [PGS.Only Int64]
case headMay termIdM of
Nothing -> pure 0
Just (PGS.Only termId) -> runPGSExecuteMany c query [(nId, a ^. a_version, ngramsType, termId, ngramsRepoElement)]) $ archiveStateToList $ a ^. a_state
-- runInsert c $ insert ngramsType ngrams ngramsRepoElement) $ archiveStateToList _a_state
pure ()
where
-- https://dba.stackexchange.com/questions/265554/how-to-check-other-table-for-value-during-insert
query :: PGS.Query
query = [sql| INSERT INTO node_stories(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
VALUES (?, ?, ?, ?) |]
SELECT * FROM (
VALUES (?, ?, ?, ?)
) AS i(node_id, ngrams_type_id, ngrams_id, ngrams_repo_element)
WHERE NOT EXISTS (
SELECT FROM ngrams
CROSS JOIN nodes
WHERE ngrams.id = ngrams_id
AND nodes.id = node_id
)|]
-- insert ngramsType ngrams ngramsRepoElement =
-- Insert { iTable = nodeStoryTable
-- , iRows = [NodeStoryDB { node_id = sqlInt4 nId
......@@ -484,8 +500,7 @@ insertNodeStory c (NodeId nId) a = do
insertArchiveStateList :: PGS.Connection -> NodeId -> Version -> ArchiveStateList -> IO ()
insertArchiveStateList c nodeId version as = do
_ <- mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
pure ()
mapM_ (\(nt, n, nre) -> runPGSExecute c query (nodeId, version, nt, nre, n)) as
where
query :: PGS.Query
query = [sql| WITH s as (SELECT ? as sid, ? sversion, ? sngrams_type_id, ngrams.id as sngrams_id, ?::jsonb as srepo FROM ngrams WHERE terms = ?)
......@@ -495,8 +510,7 @@ insertArchiveStateList c nodeId version as = do
deleteArchiveStateList :: PGS.Connection -> NodeId -> ArchiveStateList -> IO ()
deleteArchiveStateList c nodeId as = do
_ <- mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
pure ()
mapM_ (\(nt, n, _) -> runPGSExecute c query (nodeId, nt, n)) as
where
query :: PGS.Query
query = [sql| DELETE FROM node_stories
......@@ -507,8 +521,7 @@ updateArchiveStateList c nodeId version as = do
let params = (\(nt, n, nre) -> (nre, version, nodeId, nt, n)) <$> as
--q <- PGS.format c query params
--printDebug "[updateArchiveList] query" q
_ <- mapM (\p -> runPGSExecute c query p) params
pure ()
mapM_ (runPGSExecute c query) params
where
query :: PGS.Query
query = [sql| UPDATE node_stories
......@@ -596,8 +609,7 @@ upsertNodeStories c nodeId@(NodeId nId) newArchive = do
fixNodeStoryVersion :: PGS.Connection -> NodeId -> ArchiveList -> IO ()
fixNodeStoryVersion c nodeId newArchive = do
let ngramsTypes = Map.keys $ newArchive ^. a_state
_ <- mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
pure ()
mapM_ (\nt -> runPGSExecute c query (newArchive ^. a_version, nodeId, nt)) ngramsTypes
where
query :: PGS.Query
query = [sql|UPDATE node_stories
......@@ -607,8 +619,7 @@ fixNodeStoryVersion c nodeId newArchive = do
writeNodeStories :: PGS.Connection -> NodeListStory -> IO ()
writeNodeStories c (NodeStory nls) = do
_ <- mapM (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
pure ()
mapM_ (\(nId, a) -> upsertNodeStories c nId a) $ Map.toList nls
-- | Returns a `NodeListStory`, updating the given one for given `NodeId`
nodeStoryInc :: PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
......@@ -648,9 +659,9 @@ readNodeStoryEnv pool = do
writeNodeStories c ns
pure ns
let archive_saver_immediate ns@(NodeStory nls) = withResource pool $ \c -> do
_ <- mapM (\(nId, a) -> do
insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
) $ Map.toList nls
mapM_ (\(nId, a) -> do
insertNodeArchiveHistory c nId (a ^. a_version) $ reverse $ a ^. a_history
) $ Map.toList nls
pure $ clearHistory ns
saver <- mkNodeStorySaver saver_immediate
-- let saver = modifyMVar_ mvar $ \mv -> do
......@@ -676,7 +687,7 @@ nodeStoryVar pool Nothing nIds = do
nodeStoryVar pool (Just mv) nIds = do
_ <- withResource pool
$ \c -> modifyMVar_ mv
$ \nsl -> (nodeStoryIncs c (Just nsl) nIds)
$ \nsl -> nodeStoryIncs c (Just nsl) nIds
pure mv
-- Debounce is useful since it could delay the saving to some later
......@@ -735,19 +746,18 @@ fixNodeStoryVersions = do
_ <- withResource pool $ \c -> liftBase $ PGS.withTransaction c $ do
nIds <- runPGSQuery c [sql| SELECT id FROM nodes WHERE ? |] (PGS.Only True) :: IO [PGS.Only Int64]
printDebug "[fixNodeStoryVersions] nIds" nIds
_ <- mapM_ (\(PGS.Only nId) -> do
printDebug "[fixNodeStoryVersions] nId" nId
updateVer c TableNgrams.Authors nId
mapM_ (\(PGS.Only nId) -> do
printDebug "[fixNodeStoryVersions] nId" nId
updateVer c TableNgrams.Authors nId
updateVer c TableNgrams.Institutes nId
updateVer c TableNgrams.Institutes nId
updateVer c TableNgrams.Sources nId
updateVer c TableNgrams.Sources nId
updateVer c TableNgrams.NgramsTerms nId
updateVer c TableNgrams.NgramsTerms nId
pure ()
) nIds
pure ()
pure ()
) nIds
pure ()
where
maxVerQuery :: PGS.Query
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment