Commit 8667dfeb authored by Przemyslaw Kaminski's avatar Przemyslaw Kaminski

[frameUpload] implement CSV v3 upload for corpus

parent 6549732e
Pipeline #1695 passed with stage
in 35 minutes and 14 seconds
...@@ -224,6 +224,7 @@ library: ...@@ -224,6 +224,7 @@ library:
- transformers - transformers
- transformers-base - transformers-base
- unordered-containers - unordered-containers
- utf8-string
- uuid - uuid
- validity - validity
- vector - vector
......
...@@ -101,6 +101,8 @@ data JobLog = JobLog ...@@ -101,6 +101,8 @@ data JobLog = JobLog
} }
deriving (Show, Generic) deriving (Show, Generic)
makeLenses ''JobLog
instance Arbitrary JobLog where instance Arbitrary JobLog where
arbitrary = JobLog arbitrary = JobLog
<$> arbitrary <$> arbitrary
...@@ -129,6 +131,18 @@ type ScrapersEnv = JobEnv JobLog JobLog ...@@ -129,6 +131,18 @@ type ScrapersEnv = JobEnv JobLog JobLog
type ScraperAPI = AsyncJobsAPI JobLog ScraperInput JobLog type ScraperAPI = AsyncJobsAPI JobLog ScraperInput JobLog
jobLogInit :: Int -> JobLog
jobLogInit n = JobLog { _scst_succeeded = Just n
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just [] }
jobLogSucc :: JobLog -> JobLog
jobLogSucc jl = over (scst_succeeded . _Just) (+ 1) $ over (scst_remaining . _Just) (\c -> c - 1) jl
jobLogErr :: JobLog -> JobLog
jobLogErr jl = over (scst_failed . _Just) (+ 1) $ over (scst_remaining . _Just) (\c -> c - 1) jl
------------------------------------------------------------------------ ------------------------------------------------------------------------
type AsyncJobs event ctI input output = type AsyncJobs event ctI input output =
AsyncJobsAPI' 'Unsafe 'Safe ctI '[JSON] Maybe event input output AsyncJobsAPI' 'Unsafe 'Safe ctI '[JSON] Maybe event input output
...@@ -19,6 +19,11 @@ module Gargantext.API.Node.Corpus.Export ...@@ -19,6 +19,11 @@ module Gargantext.API.Node.Corpus.Export
import Data.Map (Map) import Data.Map (Map)
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.Set (Set) import Data.Set (Set)
import qualified Data.List as List
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Data.HashMap.Strict as HashMap
import Gargantext.API.Node.Corpus.Export.Types import Gargantext.API.Node.Corpus.Export.Types
import Gargantext.API.Ngrams.Types import Gargantext.API.Ngrams.Types
import Gargantext.API.Ngrams.Tools (filterListWithRoot, mapTermListRoot, getRepo) import Gargantext.API.Ngrams.Tools (filterListWithRoot, mapTermListRoot, getRepo)
...@@ -36,10 +41,6 @@ import Gargantext.Database.Query.Table.NodeNode (selectDocNodes) ...@@ -36,10 +41,6 @@ import Gargantext.Database.Query.Table.NodeNode (selectDocNodes)
import Gargantext.Database.Schema.Ngrams (NgramsType(..)) import Gargantext.Database.Schema.Ngrams (NgramsType(..))
import Gargantext.Database.Schema.Node (_node_id, _node_hyperdata) import Gargantext.Database.Schema.Node (_node_id, _node_hyperdata)
import Gargantext.Prelude import Gargantext.Prelude
import qualified Data.List as List
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Data.HashMap.Strict as HashMap
-------------------------------------------------- --------------------------------------------------
-- | Hashes are ordered by Set -- | Hashes are ordered by Set
......
...@@ -35,7 +35,7 @@ import Test.QuickCheck.Arbitrary ...@@ -35,7 +35,7 @@ import Test.QuickCheck.Arbitrary
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, jobLogSucc)
import Gargantext.API.Admin.Types (HasSettings) import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Node.Corpus.New.File import Gargantext.API.Node.Corpus.New.File
import Gargantext.API.Node.Corpus.Searx import Gargantext.API.Node.Corpus.Searx
...@@ -234,15 +234,12 @@ addToCorpusWithForm :: FlowCmdM env err m ...@@ -234,15 +234,12 @@ addToCorpusWithForm :: FlowCmdM env err m
-> CorpusId -> CorpusId
-> NewWithForm -> NewWithForm
-> (JobLog -> m ()) -> (JobLog -> m ())
-> JobLog
-> m JobLog -> m JobLog
addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus jobLog = do
printDebug "[addToCorpusWithForm] Parsing corpus: " cid printDebug "[addToCorpusWithForm] Parsing corpus: " cid
printDebug "[addToCorpusWithForm] fileType" ft printDebug "[addToCorpusWithForm] fileType" ft
logStatus JobLog { _scst_succeeded = Just 0 logStatus jobLog
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
let let
parse = case ft of parse = case ft of
CSV_HAL -> Parser.parseFormat Parser.CsvHal CSV_HAL -> Parser.parseFormat Parser.CsvHal
...@@ -256,12 +253,7 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do ...@@ -256,12 +253,7 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do
<$> parse (cs d) <$> parse (cs d)
printDebug "Parsing corpus finished : " cid printDebug "Parsing corpus finished : " cid
logStatus JobLog { _scst_succeeded = Just 1 logStatus jobLog2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
printDebug "Starting extraction : " cid printDebug "Starting extraction : " cid
-- TODO granularity of the logStatus -- TODO granularity of the logStatus
...@@ -274,11 +266,10 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do ...@@ -274,11 +266,10 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do
printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text) printDebug "sending email" ("xxxxxxxxxxxxxxxxxxxxx" :: Text)
sendMail user sendMail user
pure JobLog { _scst_succeeded = Just 2 pure jobLog3
, _scst_failed = Just 0 where
, _scst_remaining = Just 0 jobLog2 = jobLogSucc jobLog
, _scst_events = Just [] jobLog3 = jobLogSucc jobLog2
}
{- {-
addToCorpusWithFile :: FlowCmdM env err m addToCorpusWithFile :: FlowCmdM env err m
......
...@@ -6,18 +6,27 @@ module Gargantext.API.Node.FrameCalcUpload where ...@@ -6,18 +6,27 @@ module Gargantext.API.Node.FrameCalcUpload where
import Control.Lens ((^.)) import Control.Lens ((^.))
import Data.Aeson import Data.Aeson
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.UTF8 as BSU8
import Data.Swagger import Data.Swagger
import qualified Data.Text as T
import GHC.Generics (Generic) import GHC.Generics (Generic)
import Network.HTTP.Client (newManager, httpLbs, parseRequest, responseBody)
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Servant import Servant
import Servant.Job.Async import Servant.Job.Async
import Web.FormUrlEncoded (FromForm) import Web.FormUrlEncoded (FromForm)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs) import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs, jobLogInit, jobLogSucc, jobLogErr)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
import Gargantext.API.Node.Corpus.New.File (FileType(..))
import Gargantext.API.Node.Types (NewWithForm(..))
import Gargantext.API.Prelude import Gargantext.API.Prelude
import Gargantext.Core.Types.Individu (User(..))
import Gargantext.Database.Action.Flow.Types import Gargantext.Database.Action.Flow.Types
import Gargantext.Database.Admin.Types.Hyperdata.Frame import Gargantext.Database.Admin.Types.Hyperdata.Frame
import Gargantext.Database.Admin.Types.Node import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getNodeWith) import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata) import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude import Gargantext.Prelude
...@@ -38,7 +47,9 @@ type FrameCalcUploadAPI = Summary " FrameCalc upload" ...@@ -38,7 +47,9 @@ type FrameCalcUploadAPI = Summary " FrameCalc upload"
frameCalcUploadAPI :: UserId -> NodeId -> GargServer FrameCalcUploadAPI frameCalcUploadAPI :: UserId -> NodeId -> GargServer FrameCalcUploadAPI
frameCalcUploadAPI uId nId = frameCalcUploadAPI uId nId =
serveJobsAPI $ serveJobsAPI $
JobFunction (\p logs -> frameCalcUploadAsync uId nId p (liftBase . logs)) JobFunction (\p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
)
frameCalcUploadAsync :: FlowCmdM env err m frameCalcUploadAsync :: FlowCmdM env err m
...@@ -46,26 +57,33 @@ frameCalcUploadAsync :: FlowCmdM env err m ...@@ -46,26 +57,33 @@ frameCalcUploadAsync :: FlowCmdM env err m
-> NodeId -> NodeId
-> FrameCalcUpload -> FrameCalcUpload
-> (JobLog -> m ()) -> (JobLog -> m ())
-> JobLog
-> m JobLog -> m JobLog
frameCalcUploadAsync uId nId _f logStatus = do frameCalcUploadAsync uId nId _f logStatus jobLog = do
logStatus JobLog { _scst_succeeded = Just 0 logStatus jobLog
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
printDebug "[frameCalcUploadAsync] uId" uId -- printDebug "[frameCalcUploadAsync] uId" uId
printDebug "[frameCalcUploadAsync] nId" nId -- printDebug "[frameCalcUploadAsync] nId" nId
node <- getNodeWith nId (Proxy :: Proxy HyperdataFrame) node <- getNodeWith nId (Proxy :: Proxy HyperdataFrame)
let (HyperdataFrame { _hf_base = base let (HyperdataFrame { _hf_base = base
, _hf_frame_id = frame_id }) = node ^. node_hyperdata , _hf_frame_id = frame_id }) = node ^. node_hyperdata
let csvUrl = base <> "/" <> frame_id <> ".csv" let csvUrl = base <> "/" <> frame_id <> ".csv"
printDebug "[frameCalcUploadAsync] csvUrl" csvUrl -- printDebug "[frameCalcUploadAsync] csvUrl" csvUrl
pure JobLog { _scst_succeeded = Just 1 res <- liftBase $ do
, _scst_failed = Just 0 manager <- newManager tlsManagerSettings
, _scst_remaining = Just 0 req <- parseRequest $ T.unpack csvUrl
, _scst_events = Just [] httpLbs req manager
} let body = T.pack $ BSU8.toString $ BSL.toStrict $ responseBody res
mCId <- getClosestParentIdByType nId NodeCorpus
-- printDebug "[frameCalcUploadAsync] mCId" mCId
jobLog2 <- case mCId of
Nothing -> pure $ jobLogErr jobLog
Just cId ->
addToCorpusWithForm (RootId (NodeId uId)) cId (NewWithForm CSV body Nothing "calc-upload.csv") logStatus jobLog
pure $ jobLogSucc jobLog2
...@@ -27,9 +27,23 @@ import Control.Concurrent (threadDelay) ...@@ -27,9 +27,23 @@ import Control.Concurrent (threadDelay)
import Control.Lens (view) import Control.Lens (view)
import Data.Text (Text) import Data.Text (Text)
import Data.Validity import Data.Validity
import Servant
import Servant.Auth as SA
import Servant.Auth.Swagger ()
import Servant.Job.Async
import Servant.Swagger.UI
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
import qualified Gargantext.API.Node.Corpus.Annuaire as Annuaire
import qualified Gargantext.API.Node.Corpus.Export as Export
import qualified Gargantext.API.Node.Corpus.Export.Types as Export
import qualified Gargantext.API.Node.Corpus.New as New
import qualified Gargantext.API.Public as Public
import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, AuthenticatedUser(..), PathId(..)) import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, AuthenticatedUser(..), PathId(..))
import Gargantext.API.Admin.Auth (withAccess) import Gargantext.API.Admin.Auth (withAccess)
import Gargantext.API.Admin.FrontEnd (FrontEndAPI) import Gargantext.API.Admin.FrontEnd (FrontEndAPI)
import Gargantext.API.Admin.Orchestrator.Types (jobLogInit)
import Gargantext.API.Count (CountAPI, count, Query) import Gargantext.API.Count (CountAPI, count, Query)
import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableDoc) import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableDoc)
import Gargantext.API.Node import Gargantext.API.Node
...@@ -41,18 +55,7 @@ import Gargantext.Database.Admin.Types.Hyperdata ...@@ -41,18 +55,7 @@ import Gargantext.Database.Admin.Types.Hyperdata
import Gargantext.Database.Admin.Types.Node import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_scrapers) import Gargantext.Prelude.Config (gc_max_docs_scrapers)
import Servant
import Servant.Auth as SA
import Servant.Auth.Swagger ()
import Servant.Job.Async
import Servant.Swagger.UI
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
import qualified Gargantext.API.Node.Corpus.Annuaire as Annuaire
import qualified Gargantext.API.Node.Corpus.Export as Export
import qualified Gargantext.API.Node.Corpus.Export.Types as Export
import qualified Gargantext.API.Node.Corpus.New as New
import qualified Gargantext.API.Public as Public
type GargAPI = "api" :> Summary "API " :> GargAPIVersion type GargAPI = "api" :> Summary "API " :> GargAPIVersion
-- | TODO :<|> Summary "Latest API" :> GargAPI' -- | TODO :<|> Summary "Latest API" :> GargAPI'
...@@ -284,7 +287,7 @@ addCorpusWithForm user cid = ...@@ -284,7 +287,7 @@ addCorpusWithForm user cid =
log'' x = do log'' x = do
printDebug "addToCorpusWithForm" x printDebug "addToCorpusWithForm" x
liftBase $ log' x liftBase $ log' x
in New.addToCorpusWithForm user cid i log'') in New.addToCorpusWithForm user cid i log'' (jobLogInit 3))
addCorpusWithFile :: User -> GargServer New.AddWithFile addCorpusWithFile :: User -> GargServer New.AddWithFile
addCorpusWithFile user cid = addCorpusWithFile user cid =
......
...@@ -31,7 +31,6 @@ import Database.PostgreSQL.Simple (Connection, connect) ...@@ -31,7 +31,6 @@ import Database.PostgreSQL.Simple (Connection, connect)
import Database.PostgreSQL.Simple.FromField ( Conversion, ResultError(ConversionFailed), fromField, returnError) import Database.PostgreSQL.Simple.FromField ( Conversion, ResultError(ConversionFailed), fromField, returnError)
import Database.PostgreSQL.Simple.Internal (Field) import Database.PostgreSQL.Simple.Internal (Field)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.Prelude.Config (GargConfig())
import Opaleye (Query, Unpackspec, showSqlForPostgres, FromFields, Select, runQuery, PGJsonb, QueryRunnerColumnDefault) import Opaleye (Query, Unpackspec, showSqlForPostgres, FromFields, Select, runQuery, PGJsonb, QueryRunnerColumnDefault)
import Opaleye.Aggregate (countRows) import Opaleye.Aggregate (countRows)
import System.IO (FilePath) import System.IO (FilePath)
...@@ -41,6 +40,8 @@ import qualified Data.ByteString as DB ...@@ -41,6 +40,8 @@ import qualified Data.ByteString as DB
import qualified Data.List as DL import qualified Data.List as DL
import qualified Database.PostgreSQL.Simple as PGS import qualified Database.PostgreSQL.Simple as PGS
import Gargantext.Prelude.Config (GargConfig())
------------------------------------------------------- -------------------------------------------------------
class HasConnectionPool env where class HasConnectionPool env where
connPool :: Getter env (Pool Connection) connPool :: Getter env (Pool Connection)
......
...@@ -121,7 +121,7 @@ getClosestParentIdByType :: HasDBid NodeType ...@@ -121,7 +121,7 @@ getClosestParentIdByType :: HasDBid NodeType
getClosestParentIdByType nId nType = do getClosestParentIdByType nId nType = do
result <- runPGSQuery query (nId, 0 :: Int) result <- runPGSQuery query (nId, 0 :: Int)
case result of case result of
[DPS.Only parentId, DPS.Only pTypename] -> do [(NodeId parentId, pTypename)] -> do
if toDBid nType == pTypename then if toDBid nType == pTypename then
pure $ Just $ NodeId parentId pure $ Just $ NodeId parentId
else else
...@@ -131,7 +131,7 @@ getClosestParentIdByType nId nType = do ...@@ -131,7 +131,7 @@ getClosestParentIdByType nId nType = do
query :: DPS.Query query :: DPS.Query
query = [sql| query = [sql|
SELECT n2.id, n2.typename SELECT n2.id, n2.typename
FROM nodes n1 FROM nodes n1
JOIN nodes n2 ON n1.parent_id = n2.id JOIN nodes n2 ON n1.parent_id = n2.id
WHERE n1.id = ? AND 0 = ?; WHERE n1.id = ? AND 0 = ?;
|] |]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment