Commit 6421aac1 authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[nodeStory] implement history in the DB

parent 1b2ff615
Pipeline #3039 failed with stage
in 60 minutes and 39 seconds
......@@ -117,9 +117,9 @@ makeMockApp env = do
blocking <- fireWall req (env ^. menv_firewall)
case blocking of
True -> app req resp
False -> resp ( responseLBS status401 []
False -> resp ( responseLBS status401 []
"Invalid Origin or Host header")
let corsMiddleware = cors $ \_ -> Just CorsResourcePolicy
-- { corsOrigins = Just ([env^.settings.allowedOrigin], False)
{ corsOrigins = Nothing -- == /*
......@@ -135,7 +135,7 @@ makeMockApp env = do
--let warpS = Warp.setPort (8008 :: Int) -- (env^.settings.appPort)
-- $ Warp.defaultSettings
--pure (warpS, logWare $ checkOriginAndHost $ corsMiddleware $ serverApp)
pure $ logStdoutDev $ checkOriginAndHost $ corsMiddleware $ serverApp
-}
......@@ -149,7 +149,7 @@ makeDevMiddleware mode = do
-- blocking <- fireWall req (env ^. menv_firewall)
-- case blocking of
-- True -> app req resp
-- False -> resp ( responseLBS status401 []
-- False -> resp ( responseLBS status401 []
-- "Invalid Origin or Host header")
--
let corsMiddleware = cors $ \_ -> Just CorsResourcePolicy
......
......@@ -11,7 +11,7 @@ Ngrams API
-- | TODO
get ngrams filtered by NgramsType
add get
add get
-}
......@@ -284,13 +284,16 @@ commitStatePatch :: (HasNodeStory env err m, HasMail env)
=> ListId
-> Versioned NgramsStatePatch'
-> m (Versioned NgramsStatePatch')
commitStatePatch listId (Versioned p_version p) = do
commitStatePatch listId (Versioned _p_version p) = do
-- printDebug "[commitStatePatch]" listId
var <- getNodeStoryVar [listId]
vq' <- liftBase $ modifyMVar var $ \ns -> do
let
a = ns ^. unNodeStory . at listId . _Just
q = mconcat $ take (a ^. a_version - p_version) (a ^. a_history)
-- apply patches from version p_version to a ^. a_version
-- TODO Check this
--q = mconcat $ take (a ^. a_version - p_version) (a ^. a_history)
q = mconcat $ a ^. a_history
(p', q') = transformWith ngramsStatePatchConflictResolution p q
a' = a & a_version +~ 1
& a_state %~ act p'
......@@ -810,5 +813,3 @@ listNgramsChangedSince listId ngramsType version
Versioned <$> currentVersion listId <*> pure True
| otherwise =
tableNgramsPull listId ngramsType version & mapped . v_data %~ (== mempty)
......@@ -28,7 +28,8 @@ import Data.String (IsString, fromString)
import Data.Swagger hiding (version, patch)
import Data.Text (Text, pack, strip)
import Data.Validity
import Database.PostgreSQL.Simple.FromField (FromField, fromField, ResultError(ConversionFailed), returnError)
import Database.PostgreSQL.Simple.FromField (FromField, fromField, fromJSONField, ResultError(ConversionFailed), returnError)
import Database.PostgreSQL.Simple.ToField (ToField, toJSONField, toField)
import GHC.Generics (Generic)
import Gargantext.Core.Text (size)
import Gargantext.Core.Types (ListType(..), ListId, NodeId, TODO)
......@@ -524,7 +525,11 @@ instance Serialise (PatchMap NgramsTerm NgramsPatch)
instance FromField NgramsTablePatch
where
fromField = fromField'
fromField = fromJSONField
--fromField = fromField'
instance ToField NgramsTablePatch
where
toField = toJSONField
instance FromField (PatchMap TableNgrams.NgramsType (PatchMap NodeId NgramsTablePatch))
where
......
......@@ -40,6 +40,7 @@ module Gargantext.Core.NodeStory
, nse_saver
, nse_var
, unNodeStory
, getNodeArchiveHistory
, Archive(..)
, initArchive
, a_history
......@@ -53,17 +54,18 @@ module Gargantext.Core.NodeStory
where
-- import Debug.Trace (traceShow)
import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
import Codec.Serialise.Class
--import Control.Debounce (mkDebounce, defaultDebounceSettings, debounceFreq, debounceAction)
import Codec.Serialise.Class
import Control.Arrow (returnA)
import Control.Concurrent (MVar(), withMVar, newMVar, modifyMVar_)
import Control.Concurrent (MVar(), {-withMVar,-} newMVar, modifyMVar_)
import Control.Exception (catch, throw, SomeException(..))
import Control.Lens (makeLenses, Getter, (^.))
import Control.Lens (makeLenses, Getter, (^.), (.~), traverse)
import Control.Monad.Except
import Control.Monad.Reader
import Data.Aeson hiding ((.=), decode)
import Data.ByteString.Char8 (hPutStrLn)
import Data.Map.Strict (Map)
import Data.Maybe (mapMaybe)
import Data.Monoid
import Data.Pool (Pool, withResource)
import Data.Semigroup
......@@ -83,6 +85,7 @@ import Opaleye (Column, DefaultFromField(..), Insert(..), Select, SqlInt4, SqlJs
import Opaleye.Internal.Table (Table(..))
import System.IO (stderr)
import qualified Data.Map.Strict as Map
import qualified Data.Map.Strict.Patch as PM
import qualified Gargantext.Database.Query.Table.Ngrams as TableNgrams
------------------------------------------------------------------------
......@@ -120,7 +123,7 @@ class HasNodeStorySaver env where
TODO : generalize for any NodeType, let's start with NodeList which
is implemented already
-}
data NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
newtype NodeStory s p = NodeStory { _unNodeStory :: Map NodeId (Archive s p) }
deriving (Generic, Show)
instance (FromJSON s, FromJSON p) => FromJSON (NodeStory s p)
......@@ -128,10 +131,18 @@ instance (ToJSON s, ToJSON p) => ToJSON (NodeStory s p)
instance (Serialise s, Serialise p) => Serialise (NodeStory s p)
data Archive s p = Archive
{ _a_version :: !Version
, _a_state :: !s
, _a_history :: ![p]
{ _a_version :: !Version
, _a_state :: !s
, _a_history :: ![p]
-- first patch in the list is the most recent
-- We use `take` in `commitStatePatch`, that's why.
-- History is immutable, we just insert things on top of existing
-- list.
-- We don't need to store the whole history in memory, this
-- structure holds only recent history, the one that will be
-- inserted to the DB.
}
deriving (Generic, Show)
......@@ -155,12 +166,13 @@ instance DefaultFromField SqlJsonb (Archive NgramsState' NgramsStatePatch')
instance (Semigroup s, Semigroup p) => Semigroup (Archive s p) where
(<>) (Archive { _a_history = p }) (Archive { _a_version = v'
, _a_state = s'
, _a_history = p'}) =
, _a_history = p' }) =
Archive { _a_version = v'
, _a_state = s'
, _a_history = p' <> p }
instance Monoid (Archive NgramsState' NgramsStatePatch') where
-- instance Monoid (Archive NgramsState' NgramsStatePatch') where
instance (Monoid s, Semigroup p) => Monoid (Archive s p) where
mempty = Archive { _a_version = 0
, _a_state = mempty
, _a_history = [] }
......@@ -173,13 +185,11 @@ instance (ToJSON s, ToJSON p) => ToJSON (Archive s p) where
toEncoding = genericToEncoding $ unPrefix "_a_"
------------------------------------------------------------------------
initNodeStory :: Monoid s => NodeId -> NodeStory s p
initNodeStory :: (Monoid s, Semigroup p) => NodeId -> NodeStory s p
initNodeStory ni = NodeStory $ Map.singleton ni initArchive
initArchive :: Monoid s => Archive s p
initArchive = Archive { _a_version = 0
, _a_state = mempty
, _a_history = [] }
initArchive :: (Monoid s, Semigroup p) => Archive s p
initArchive = mempty
initNodeListStoryMock :: NodeListStory
initNodeListStoryMock = NodeStory $ Map.singleton nodeListId archive
......@@ -218,25 +228,30 @@ type NodeStoryRead = NodeStoryPoly (Column SqlInt4) (Column SqlJsonb)
$(makeAdaptorAndInstance "pNodeStory" ''NodeStoryPoly)
runPGSExecuteMany :: (PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> [q] -> IO Int64
runPGSExecuteMany pool qs a = withResource pool $ \c -> catch (PGS.executeMany c qs a) (printError c)
where
printError _c (SomeException e) = do
--q' <- PGS.formatQuery c qs a
--hPutStrLn stderr q'
throw (SomeException e)
runPGSQuery :: (PGS.FromRow r, PGS.ToRow q) => Pool PGS.Connection -> PGS.Query -> q -> IO [r]
runPGSQuery pool q a = withResource pool $ \c -> catch (PGS.query c q a) (printError c)
where
where
printError c (SomeException e) = do
q' <- PGS.formatQuery c q a
hPutStrLn stderr q'
throw (SomeException e)
nodeExists :: Pool PGS.Connection -> NodeId -> IO Bool
nodeExists pool nId = (== [PGS.Only True])
<$> runPGSQuery pool [sql|SELECT true FROM nodes WHERE id = ? AND ? |] (nId, True)
getNodesIdWithType :: Pool PGS.Connection -> NodeType -> IO [NodeId]
getNodesIdWithType pool nt = do
--ns <- withResource pool $ \c -> runSelect c $ selectNodesIdWithType nt
ns <- runPGSQuery pool query (nodeTypeId nt, True)
pure $ map (\(PGS.Only nId) -> NodeId nId) ns
--pure (map NodeId ns)
where
query :: PGS.Query
query = [sql|SELECT id FROM nodes WHERE typename = ? AND ? |]
......@@ -248,14 +263,40 @@ nodeStoryTable =
Table "node_stories"
( pNodeStory NodeStoryDB { node_id = tableField "node_id"
, archive = tableField "archive" } )
nodeStorySelect :: Select NodeStoryRead
nodeStorySelect = selectTable nodeStoryTable
-- TODO Check ordering, "first patch in the _a_history list is the most recent"
getNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> IO [NgramsStatePatch']
getNodeArchiveHistory pool nodeId = do
as <- runPGSQuery pool query (nodeId, True)
let asTuples = mapMaybe (\(ngrams_type_id, patch) -> (\ntId -> (ntId, patch)) <$> (TableNgrams.fromNgramsTypeId ngrams_type_id)) as
pure $ (\(ntId, patch) -> fst $ PM.singleton ntId patch) <$> asTuples
where
query :: PGS.Query
query = [sql|SELECT ngrams_type_id, patch FROM node_story_archive_history WHERE node_id = ? AND ? |]
insertNodeArchiveHistory :: Pool PGS.Connection -> NodeId -> [NgramsStatePatch'] -> IO ()
insertNodeArchiveHistory _ _ [] = pure ()
insertNodeArchiveHistory pool nodeId (h:hs) = do
_ <- runPGSExecuteMany pool query $ (\(nType, patch) -> (nodeId, TableNgrams.ngramsTypeId nType, patch)) <$> (PM.toList h)
_ <- insertNodeArchiveHistory pool nodeId hs
pure ()
where
query :: PGS.Query
query = [sql| INSERT INTO node_story_archive_history(node_id, ngrams_type_id, patch) VALUES (?, ?, ?) |]
getNodeStory :: Pool PGS.Connection -> NodeId -> IO NodeListStory
getNodeStory pool (NodeId nodeId) = do
res <- withResource pool $ \c -> runSelect c query
pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
res <- withResource pool $ \c -> runSelect c query :: IO [NodeStoryPoly NodeId ArchiveQ]
withArchive <- mapM (\(NodeStoryDB { node_id = nId, archive = Archive { .. } }) -> do
--a <- getNodeArchiveHistory pool nId
let a = [] :: [NgramsStatePatch']
-- Don't read whole history. Only state is needed and most recent changes.
pure (nId, Archive { _a_history = a, .. })) res
pure $ NodeStory $ Map.fromListWith (<>) withArchive
--pure $ NodeStory $ Map.fromListWith (<>) $ (\(NodeStoryDB nId a) -> (nId, a)) <$> res
where
query :: Select NodeStoryRead
query = proc () -> do
......@@ -264,19 +305,30 @@ getNodeStory pool (NodeId nodeId) = do
returnA -< row
insertNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
insertNodeArchive pool (NodeId nId) a = withResource pool $ \c -> runInsert c insert
insertNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
ret <- withResource pool $ \c -> runInsert c insert
insertNodeArchiveHistory pool nodeId _a_history
pure ret
where
emptyHistory = [] :: [NgramsStatePatch']
insert = Insert { iTable = nodeStoryTable
, iRows = [NodeStoryDB { node_id = sqlInt4 nId
, archive = sqlValueJSONB a }]
, archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
, .. } }]
, iReturning = rCount
, iOnConflict = Nothing }
updateNodeArchive :: Pool PGS.Connection -> NodeId -> ArchiveQ -> IO Int64
updateNodeArchive pool (NodeId nId) a = withResource pool $ \c -> runUpdate c update
updateNodeArchive pool nodeId@(NodeId nId) (Archive {..}) = do
ret <- withResource pool $ \c -> runUpdate c update
insertNodeArchiveHistory pool nodeId _a_history
pure ret
where
emptyHistory = [] :: [NgramsStatePatch']
update = Update { uTable = nodeStoryTable
, uUpdateWith = updateEasy (\(NodeStoryDB { .. }) -> NodeStoryDB { archive = sqlValueJSONB a, .. })
, uUpdateWith = updateEasy (\(NodeStoryDB { node_id }) -> NodeStoryDB { archive = sqlValueJSONB $ Archive { _a_history = emptyHistory
, ..}
, .. })
, uWhere = (\row -> node_id row .== sqlInt4 nId)
, uReturning = rCount }
......@@ -298,7 +350,7 @@ writeNodeStories :: Pool PGS.Connection -> NodeListStory -> IO ()
writeNodeStories pool (NodeStory nls) = do
_ <- mapM (\(nId, a) -> upsertNodeArchive pool nId a) $ Map.toList nls
pure ()
-- | Returns a `NodeListStory`, updating the given one for given `NodeId`
nodeStoryInc :: Pool PGS.Connection -> Maybe NodeListStory -> NodeId -> IO NodeListStory
nodeStoryInc pool Nothing nId = getNodeStory pool nId
......@@ -331,10 +383,13 @@ nodeStoryIncs pool Nothing (ni:ns) = do
readNodeStoryEnv :: Pool PGS.Connection -> IO NodeStoryEnv
readNodeStoryEnv pool = do
mvar <- nodeStoryVar pool Nothing []
saver <- mkNodeStorySaver pool mvar
-- let saver = modifyMVar_ mvar $ \mv' -> do
-- writeNodeStories mv'
-- return mv'
-- saver <- mkNodeStorySaver pool mvar
let saver = modifyMVar_ mvar $ \mv -> do
writeNodeStories pool mv
printDebug "[readNodeStoryEnv] saver" mv
let mv' = clearHistory mv
printDebug "[readNodeStoryEnv] saver, cleared" mv'
return mv'
pure $ NodeStoryEnv { _nse_var = mvar
, _nse_saver = saver
, _nse_getter = nodeStoryVar pool (Just mvar) }
......@@ -350,16 +405,28 @@ nodeStoryVar pool (Just mv) nIds = do
-- TODO No debounce since this is IO stuff.
-- debounce is useful since it could delay the saving to some later
-- time, asynchronously and we keep operating on memory only.
{-
mkNodeStorySaver :: Pool PGS.Connection -> MVar NodeListStory -> IO (IO ())
mkNodeStorySaver pool mvns = mkDebounce settings
where
settings = defaultDebounceSettings
{ debounceAction = withMVar mvns (\ns -> writeNodeStories pool ns)
{ debounceAction = do
withMVar mvns (\ns -> writeNodeStories pool ns)
withMVar mvns (\ns -> printDebug "[mkNodeStorySaver] debounce nodestory" ns)
modifyMVar_ mvns $ \ns -> pure $ clearAHistoryToInsert ns
, debounceFreq = 1*minute
}
minute = 60*second
second = 10^(6 :: Int)
--mkNodeStorySaver pool mvns = withMVar mvns $ writeNodeStories pool
-}
clearHistory :: NodeListStory -> NodeListStory
-- clearHistory (NodeStory ns) =
-- NodeStory $ Map.map (\(Archive { .. }) -> Archive { _a_history_to_insert = emptyHistory, .. }) ns
clearHistory (NodeStory ns) = NodeStory $ ns & (traverse . a_history) .~ emptyHistory
where
emptyHistory = [] :: [NgramsStatePatch']
-- mkNodeStorySaver :: MVar NodeListStory -> Cmd err (Cmd err ())
-- mkNodeStorySaver mvns = mkDebounce settings
......
......@@ -68,5 +68,3 @@ history' types lists = (Map.map (Map.unionsWith (<>)))
-> Map NgramsType [HashMap NgramsTerm NgramsPatch]
toMap m = Map.map (cons . unNgramsTablePatch)
$ unPatchMapToMap m
......@@ -210,4 +210,3 @@ putListNgrams nodeId ngramsType nes = putListNgrams' nodeId ngramsType m
& unNodeStory . at listId . _Just . a_history %~ (p :)
& unNodeStory . at listId . _Just . a_state . at ngramsType' .~ Just ns
saveNodeStory
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment