Commit b16a5b54 authored by Alp Mestanogullari's avatar Alp Mestanogullari

fix some Conduit wiring, lifting IO conduit to a more generic setting

parent b5aec299
...@@ -101,7 +101,7 @@ documentsFromWriteNodes uId nId _p logStatus = do ...@@ -101,7 +101,7 @@ documentsFromWriteNodes uId nId _p logStatus = do
let parsedE = (\(node, contents) -> hyperdataDocumentFromFrameWrite (node ^. node_hyperdata, contents)) <$> frameWritesWithContents let parsedE = (\(node, contents) -> hyperdataDocumentFromFrameWrite (node ^. node_hyperdata, contents)) <$> frameWritesWithContents
let parsed = rights parsedE let parsed = rights parsedE
_ <- flowDataText (RootId (NodeId uId)) (DataNew $ yield parsed) (Multi EN) cId Nothing logStatus _ <- flowDataText (RootId (NodeId uId)) (DataNew $ yieldMany parsed) (Multi EN) cId Nothing logStatus
pure $ jobLogSuccess jobLog pure $ jobLogSuccess jobLog
------------------------------------------------------------------------ ------------------------------------------------------------------------
......
...@@ -153,7 +153,8 @@ getDataText (InternalOrigin _) _la q _li = do ...@@ -153,7 +153,8 @@ getDataText (InternalOrigin _) _la q _li = do
pure $ DataOld ids pure $ DataOld ids
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
flowDataText :: ( FlowCmdM env err m flowDataText :: forall env err m.
( FlowCmdM env err m
) )
=> User => User
-> DataText -> DataText
...@@ -165,7 +166,7 @@ flowDataText :: ( FlowCmdM env err m ...@@ -165,7 +166,7 @@ flowDataText :: ( FlowCmdM env err m
flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw flowDataText u (DataOld ids) tt cid mfslw _ = flowCorpusUser (_tt_lang tt) u (Right [cid]) corpusType ids mfslw
where where
corpusType = (Nothing :: Maybe HyperdataCorpus) corpusType = (Nothing :: Maybe HyperdataCorpus)
flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw txtC logStatus flowDataText u (DataNew txtC) tt cid mfslw logStatus = flowCorpus u (Right [cid]) tt mfslw (transPipe liftBase txtC) logStatus
------------------------------------------------------------------------ ------------------------------------------------------------------------
-- TODO use proxy -- TODO use proxy
...@@ -190,7 +191,7 @@ flowCorpusFile :: (FlowCmdM env err m) ...@@ -190,7 +191,7 @@ flowCorpusFile :: (FlowCmdM env err m)
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
-> (JobLog -> m ()) -> (JobLog -> m ())
-> m CorpusId -> m CorpusId
flowCorpusFile u n l la ff fp mfslw logStatus = do flowCorpusFile u n _l la ff fp mfslw logStatus = do
eParsed <- liftBase $ parseFile ff fp eParsed <- liftBase $ parseFile ff fp
case eParsed of case eParsed of
Right parsed -> do Right parsed -> do
...@@ -207,13 +208,14 @@ flowCorpus :: (FlowCmdM env err m, FlowCorpus a) ...@@ -207,13 +208,14 @@ flowCorpus :: (FlowCmdM env err m, FlowCorpus a)
-> Either CorpusName [CorpusId] -> Either CorpusName [CorpusId]
-> TermType Lang -> TermType Lang
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
-> ConduitT () a IO () -> ConduitT () a m ()
-> (JobLog -> m ()) -> (JobLog -> m ())
-> m CorpusId -> m CorpusId
flowCorpus = flow (Nothing :: Maybe HyperdataCorpus) flowCorpus = flow (Nothing :: Maybe HyperdataCorpus)
flow :: ( FlowCmdM env err m flow :: forall env err m a c.
( FlowCmdM env err m
, FlowCorpus a , FlowCorpus a
, MkCorpus c , MkCorpus c
) )
...@@ -222,12 +224,12 @@ flow :: ( FlowCmdM env err m ...@@ -222,12 +224,12 @@ flow :: ( FlowCmdM env err m
-> Either CorpusName [CorpusId] -> Either CorpusName [CorpusId]
-> TermType Lang -> TermType Lang
-> Maybe FlowSocialListWith -> Maybe FlowSocialListWith
-> ConduitT () a IO () -> ConduitT () a m ()
-> (JobLog -> m ()) -> (JobLog -> m ())
-> m CorpusId -> m CorpusId
flow c u cn la mfslw docsC logStatus = do flow c u cn la mfslw docsC _logStatus = do
-- TODO if public insertMasterDocs else insertUserDocs -- TODO if public insertMasterDocs else insertUserDocs
ids <- liftBase $ runConduit $ ids <- runConduit $
zipSources (yieldMany [1..]) docsC zipSources (yieldMany [1..]) docsC
.| mapMC insertDoc .| mapMC insertDoc
.| sinkList .| sinkList
...@@ -243,7 +245,8 @@ flow c u cn la mfslw docsC logStatus = do ...@@ -243,7 +245,8 @@ flow c u cn la mfslw docsC logStatus = do
flowCorpusUser (la ^. tt_lang) u cn c ids mfslw flowCorpusUser (la ^. tt_lang) u cn c ids mfslw
where where
insertDoc (idx, doc) = do insertDoc :: (Int, a) -> m NodeId
insertDoc (_idx, doc) = do
id <- insertMasterDocs c la [doc] id <- insertMasterDocs c la [doc]
-- logStatus JobLog { _scst_succeeded = Just $ 1 + idx -- logStatus JobLog { _scst_succeeded = Just $ 1 + idx
-- , _scst_failed = Just 0 -- , _scst_failed = Just 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment