Commit f63b5188 authored by Alexandre Delanoë's avatar Alexandre Delanoë

[RENAME] ScraperStatus -> JobLog

parent 3041d86d
Pipeline #892 canceled with stage
......@@ -16,18 +16,17 @@ module Gargantext.API.Admin.Orchestrator where
import Control.Lens hiding (elements)
import Data.Aeson
import qualified Data.ByteString.Lazy.Char8 as LBS
import Data.Text
import Gargantext.API.Admin.Orchestrator.Scrapy.Schedule
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.Settings
import Gargantext.Prelude
import Servant
import Servant.Job.Async
import Servant.Job.Client
import Servant.Job.Server
import Servant.Job.Utils (extendBaseUrl)
import Gargantext.Prelude
import Gargantext.API.Admin.Settings
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.Orchestrator.Scrapy.Schedule
import qualified Data.ByteString.Lazy.Char8 as LBS
callJobScrapy :: (ToJSON e, FromJSON e, FromJSON o, MonadClientJob m)
=> JobServerURL e Schedule o
......@@ -44,7 +43,7 @@ callJobScrapy jurl schedule = do
logConsole :: ToJSON a => a -> IO ()
logConsole = LBS.putStrLn . encode
callScraper :: MonadClientJob m => URL -> ScraperInput -> m ScraperStatus
callScraper :: MonadClientJob m => URL -> ScraperInput -> m JobLog
callScraper url input =
callJobScrapy jurl $ \cb ->
Schedule
......@@ -64,11 +63,11 @@ callScraper url input =
,("callback", [toUrlPiece cb])]
}
where
jurl :: JobServerURL ScraperStatus Schedule ScraperStatus
jurl :: JobServerURL JobLog Schedule JobLog
jurl = JobServerURL url Callback
pipeline :: FromJSON e => URL -> ClientEnv -> ScraperInput
-> (e -> IO ()) -> IO ScraperStatus
-> (e -> IO ()) -> IO JobLog
pipeline scrapyurl client_env input log_status = do
e <- runJobMLog client_env log_status $ callScraper scrapyurl input
either (panic . cs . show) pure e -- TODO throwError
......
......@@ -20,7 +20,6 @@ import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
------------------------------------------------------------------------
instance Arbitrary a => Arbitrary (JobStatus 'Safe a) where
arbitrary = panic "TODO"
......@@ -90,7 +89,9 @@ instance ToJSON ScraperEvent where
instance FromJSON ScraperEvent where
parseJSON = genericParseJSON $ jsonOptions "_scev_"
data ScraperStatus = ScraperStatus
data JobLog = JobLog
{ _scst_succeeded :: !(Maybe Int)
, _scst_failed :: !(Maybe Int)
, _scst_remaining :: !(Maybe Int)
......@@ -98,20 +99,20 @@ data ScraperStatus = ScraperStatus
}
deriving (Show, Generic)
instance Arbitrary ScraperStatus where
arbitrary = ScraperStatus
instance Arbitrary JobLog where
arbitrary = JobLog
<$> arbitrary
<*> arbitrary
<*> arbitrary
<*> arbitrary
instance ToJSON ScraperStatus where
instance ToJSON JobLog where
toJSON = genericToJSON $ jsonOptions "_scst_"
instance FromJSON ScraperStatus where
instance FromJSON JobLog where
parseJSON = genericParseJSON $ jsonOptions "_scst_"
instance ToSchema ScraperStatus -- TODO _scst_ prefix
instance ToSchema JobLog -- TODO _scst_ prefix
instance ToSchema ScraperInput -- TODO _scin_ prefix
instance ToSchema ScraperEvent -- TODO _scev_ prefix
......@@ -122,6 +123,6 @@ instance ToParamSchema Offset -- where
instance ToParamSchema Limit -- where
-- toParamSchema = panic "TODO"
type ScrapersEnv = JobEnv ScraperStatus ScraperStatus
type ScrapersEnv = JobEnv JobLog JobLog
type ScraperAPI = AsyncJobsAPI ScraperStatus ScraperInput ScraperStatus
type ScraperAPI = AsyncJobsAPI JobLog ScraperInput JobLog
......@@ -156,10 +156,10 @@ instance HasRepo Env where
instance HasSettings Env where
settings = env_settings
instance Servant.Job.Core.HasEnv Env (Job ScraperStatus ScraperStatus) where
instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
_env = env_scrapers . Servant.Job.Core._env
instance HasJobEnv Env ScraperStatus ScraperStatus where
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
data MockEnv = MockEnv
......
......@@ -93,7 +93,7 @@ type PostAPI = Summary "Update List"
:> "add"
:> "form"
:> "async"
:> AsyncJobs ScraperStatus '[FormUrlEncoded] WithFile ScraperStatus
:> AsyncJobs JobLog '[FormUrlEncoded] WithFile JobLog
postAsync :: ListId -> GargServer PostAPI
postAsync lId =
......@@ -103,18 +103,18 @@ postAsync lId =
postAsync' :: FlowCmdM env err m
=> ListId
-> WithFile
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
postAsync' l (WithFile _ m _) logStatus = do
logStatus ScraperStatus { _scst_succeeded = Just 0
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
_r <- post l m
pure ScraperStatus { _scst_succeeded = Just 1
pure JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......
......@@ -324,5 +324,3 @@ putNode :: forall err a. (HasNodeError err, JSONB a, ToJSON a)
-> Cmd err Int
putNode n h = fromIntegral <$> updateHyperdata n h
-------------------------------------------------------------
......@@ -62,14 +62,14 @@ type AddWithForm = Summary "Add with FormUrlEncoded to annuaire endpoint"
:> "add"
:> "form"
:> "async"
:> AsyncJobs ScraperStatus '[FormUrlEncoded] AnnuaireWithForm ScraperStatus
:> AsyncJobs JobLog '[FormUrlEncoded] AnnuaireWithForm JobLog
------------------------------------------------------------------------
addToAnnuaireWithForm :: FlowCmdM env err m
=> AnnuaireId
-> AnnuaireWithForm
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
addToAnnuaireWithForm _cid (AnnuaireWithForm ft _d _l) logStatus = do
printDebug "ft" ft
......@@ -86,7 +86,7 @@ addToAnnuaireWithForm _cid (AnnuaireWithForm ft _d _l) logStatus = do
-- <$> take 1000000
-- <$> parse (cs d)
logStatus ScraperStatus { _scst_succeeded = Just 1
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
......@@ -98,7 +98,7 @@ addToAnnuaireWithForm _cid (AnnuaireWithForm ft _d _l) logStatus = do
-- printDebug "cid'" cid'
pure ScraperStatus { _scst_succeeded = Just 2
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......
......@@ -27,7 +27,7 @@ import Data.Maybe (fromMaybe)
import Data.Swagger
import Data.Text (Text)
import GHC.Generics (Generic)
import Gargantext.API.Admin.Orchestrator.Types (ScraperStatus(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import qualified Gargantext.API.Admin.Orchestrator.Types as T
import Gargantext.API.Node.Corpus.New.File
import Gargantext.Core (Lang(..){-, allLangs-})
......@@ -173,7 +173,7 @@ type AddWithQuery = Summary "Add with Query to corpus endpoint"
:> "corpus"
:> Capture "corpus_id" CorpusId
:> "query"
:> AsyncJobs ScraperStatus '[JSON] WithQuery ScraperStatus
:> AsyncJobs JobLog '[JSON] WithQuery JobLog
{-
type AddWithFile = Summary "Add with MultipartData to corpus endpoint"
......@@ -184,7 +184,7 @@ type AddWithFile = Summary "Add with MultipartData to corpus endpoint"
:> MultipartForm Mem (MultipartData Mem)
:> QueryParam "fileType" FileType
:> "async"
:> AsyncJobs ScraperStatus '[JSON] () ScraperStatus
:> AsyncJobs JobLog '[JSON] () JobLog
-}
type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
......@@ -193,7 +193,7 @@ type AddWithForm = Summary "Add with FormUrlEncoded to corpus endpoint"
:> "add"
:> "form"
:> "async"
:> AsyncJobs ScraperStatus '[FormUrlEncoded] NewWithForm ScraperStatus
:> AsyncJobs JobLog '[FormUrlEncoded] NewWithForm JobLog
------------------------------------------------------------------------
......@@ -202,13 +202,13 @@ addToCorpusWithQuery :: FlowCmdM env err m
=> User
-> CorpusId
-> WithQuery
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
addToCorpusWithQuery u cid (WithQuery q dbs l _nid) logStatus = do
-- TODO ...
logStatus ScraperStatus { _scst_succeeded = Just 10
, _scst_failed = Just 2
, _scst_remaining = Just 138
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 5
, _scst_events = Just []
}
printDebug "addToCorpusWithQuery" cid
......@@ -217,11 +217,18 @@ addToCorpusWithQuery u cid (WithQuery q dbs l _nid) logStatus = do
-- if cid is corpus -> add to corpus
-- if cid is root -> create corpus in Private
txts <- mapM (\db -> getDataText db (Multi l) q (Just 10000)) [database2origin dbs]
logStatus JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
cids <- mapM (\txt -> flowDataText u txt (Multi l) cid) txts
printDebug "corpus id" cids
-- TODO ...
pure ScraperStatus { _scst_succeeded = Just 137
, _scst_failed = Just 13
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
......@@ -230,10 +237,16 @@ addToCorpusWithForm :: FlowCmdM env err m
=> User
-> CorpusId
-> NewWithForm
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do
printDebug "Parsing corpus: " cid
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
}
let
parse = case ft of
CSV_HAL -> Parser.parseFormat Parser.CsvHal
......@@ -241,22 +254,20 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do
WOS -> Parser.parseFormat Parser.WOS
PresseRIS -> Parser.parseFormat Parser.RisPresse
logStatus ScraperStatus { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
printDebug "Parsing corpus: " cid
-- TODO granularity of the logStatus
docs <- liftBase $ splitEvery 500
<$> take 1000000
<$> parse (cs d)
printDebug "Parsing corpus finished : " cid
printDebug "Starting extraction : " cid
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
printDebug "Starting extraction : " cid
-- TODO granularity of the logStatus
_cid' <- flowCorpus user
(Right [cid])
......@@ -264,8 +275,7 @@ addToCorpusWithForm user cid (NewWithForm ft d l _n) logStatus = do
(map (map toHyperdataDocument) docs)
printDebug "Extraction finished : " cid
pure ScraperStatus { _scst_succeeded = Just 2
pure JobLog { _scst_succeeded = Just 2
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......@@ -276,10 +286,10 @@ addToCorpusWithFile :: FlowCmdM env err m
=> CorpusId
-> MultipartData Mem
-> Maybe FileType
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
addToCorpusWithFile cid input filetype logStatus = do
logStatus ScraperStatus { _scst_succeeded = Just 10
logStatus JobLog { _scst_succeeded = Just 10
, _scst_failed = Just 2
, _scst_remaining = Just 138
, _scst_events = Just []
......@@ -287,7 +297,7 @@ addToCorpusWithFile cid input filetype logStatus = do
printDebug "addToCorpusWithFile" cid
_h <- postUpload cid filetype input
pure ScraperStatus { _scst_succeeded = Just 137
pure JobLog { _scst_succeeded = Just 137
, _scst_failed = Just 13
, _scst_remaining = Just 0
, _scst_events = Just []
......
......@@ -24,7 +24,7 @@ import Data.Aeson
import Data.Swagger
import Data.Text (Text)
import GHC.Generics (Generic)
import Gargantext.API.Admin.Orchestrator.Types (ScraperStatus(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import Gargantext.API.Node.Corpus.New (AsyncJobs)
import Gargantext.API.Prelude
import Gargantext.Database.Action.Flow.Types
......@@ -68,7 +68,7 @@ postNode uId pId (PostNode nodeName nt) = do
------------------------------------------------------------------------
type PostNodeAsync = Summary "Post Node"
:> "async"
:> AsyncJobs ScraperStatus '[FormUrlEncoded] PostNode ScraperStatus
:> AsyncJobs JobLog '[FormUrlEncoded] PostNode JobLog
postNodeAsyncAPI :: UserId -> NodeId -> GargServer PostNodeAsync
......@@ -81,12 +81,12 @@ postNodeAsync :: FlowCmdM env err m
=> UserId
-> NodeId
-> PostNode
-> (ScraperStatus -> m ())
-> m ScraperStatus
-> (JobLog -> m ())
-> m JobLog
postNodeAsync uId nId (PostNode nodeName tn) logStatus = do
printDebug "postNodeAsync" nId
logStatus ScraperStatus { _scst_succeeded = Just 1
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
......@@ -95,7 +95,7 @@ postNodeAsync uId nId (PostNode nodeName tn) logStatus = do
nodeUser <- getNodeUser (NodeId uId)
-- _ <- threadDelay 1000
logStatus ScraperStatus { _scst_succeeded = Just 1
logStatus JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 2
, _scst_events = Just []
......@@ -104,7 +104,7 @@ postNodeAsync uId nId (PostNode nodeName tn) logStatus = do
let uId' = nodeUser ^. node_userId
_ <- mkNodeWithParent tn (Just nId) uId' nodeName
pure ScraperStatus { _scst_succeeded = Just 3
pure JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......
......@@ -19,12 +19,12 @@ module Gargantext.API.Node.Update
import Data.Aeson
import Data.Swagger
import GHC.Generics (Generic)
import Gargantext.API.Admin.Orchestrator.Types (ScraperStatus(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..))
import Gargantext.API.Node.Corpus.New (AsyncJobs)
import Gargantext.API.Prelude (GargServer{-, simuLogs-})
import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (Ord, Eq, (<$>), ($), liftBase, (.), Int, (-), pure, (*), (^), printDebug)
import Gargantext.Prelude (Ord, Eq, (<$>), ($), liftBase, (.), Int, pure, (*), printDebug, (^)) -- (-), (^))
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
......@@ -97,57 +97,68 @@ instance Arbitrary Charts where
arbitrary = elements [ minBound .. maxBound ]
------------------------------------------------------------------------
api :: UserId -> NodeId -> GargServer API
api uId nId =
serveJobsAPI $
JobFunction (\p logs -> updateNode uId nId p (liftBase . logs))
serveJobsAPI $
JobFunction (\p log ->
let
log' x = do
printDebug "updateNode" x
liftBase $ log x
in updateNode uId nId p (liftBase . log')
)
updateNode :: FlowCmdM env err m
=> UserId
-> NodeId
-> UpdateNodeParams
-> (ScraperStatus -> m ())
-> m ScraperStatus
updateNode _uId _nId _ logStatus = do
-> (JobLog -> m ())
-> m JobLog
updateNode uId nId _p logStatus = do
-- Why this does not work ?
-- simuLogs logStatus 100
logStatus $ ScraperStatus { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
let
m = (10 :: Int) ^ (6 :: Int)
printDebug "updateNode xxxxxxxxxxxxxxxxxxxx" nId
--liftBase $ threadDelay ( m * 10)
logStatus $ JobLog { _scst_succeeded = Just 3
, _scst_failed = Just 0
, _scst_remaining = Just 10
, _scst_events = Just []
}
{-
status t n = do
_ <- liftBase $ threadDelay ( m * 100)
let s = ScraperStatus { _scst_succeeded = Just n
_ <- liftBase $ threadDelay ( m * 1000)
let s = JobLog { _scst_succeeded = Just n
, _scst_failed = Just 0
, _scst_remaining = Just (t - n)
, _scst_events = Just []
}
printDebug "status " s
pure s
-}
s1 <- status 10 2
logStatus s1
s2 <- status 10 5
logStatus s2
s3 <- status 10 7
logStatus s3
status 10 10
printDebug "updateNode yyyyyyyyyyyyyyyyyyyy" uId
--liftBase $ threadDelay ( m * 10)
logStatus $ JobLog { _scst_succeeded = Just 6
, _scst_failed = Just 0
, _scst_remaining = Just 4
, _scst_events = Just []
}
printDebug "updateNode zzzzzzzzzzzzzzzzzzzz" nId
liftBase $ threadDelay ( m * 10)
pure $ JobLog { _scst_succeeded = Just 10
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
}
------------------------------------------------------------------------
type API = Summary " Share Node with username"
:> AsyncJobs ScraperStatus '[JSON] UpdateNodeParams ScraperStatus
type API = Summary " Update node according to NodeType params"
:> AsyncJobs JobLog '[JSON] UpdateNodeParams JobLog
......@@ -82,7 +82,7 @@ type GargServerC env err m =
, Exception err
, HasRepo env
, HasSettings env
, HasJobEnv env ScraperStatus ScraperStatus
, HasJobEnv env JobLog JobLog
)
type GargServerT env err m api = GargServerC env err m => ServerT api m
......@@ -98,7 +98,7 @@ type EnvC env =
( HasConnectionPool env
, HasRepo env
, HasSettings env
, HasJobEnv env ScraperStatus ScraperStatus
, HasJobEnv env JobLog JobLog
)
-------------------------------------------------------------------
......@@ -154,12 +154,12 @@ instance HasJoseError GargError where
-- | Simulate logs
simuLogs :: MonadBase IO m
=> (ScraperStatus -> m ())
=> (JobLog -> m ())
-> Int
-> m ScraperStatus
-> m JobLog
simuLogs logStatus t = do
{-
let task = ScraperStatus { _scst_succeeded = Just 0
let task = JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......@@ -167,7 +167,7 @@ simuLogs logStatus t = do
-}
-- f <- mapM (\status n -> simuTask logStatus status n t) task $ take t [1,2..]
_ <- mapM (\n -> simuTask' logStatus n t) $ take t [1,2..]
pure $ ScraperStatus { _scst_succeeded = Just t
pure $ JobLog { _scst_succeeded = Just t
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......@@ -176,17 +176,17 @@ simuLogs logStatus t = do
{-
simuTask :: MonadBase IO m
=> (ScraperStatus -> m ())
-> ScraperStatus
=> (JobLog -> m ())
-> JobLog
-> Int
-> Int
-> m ScraperStatus
simuTask logStatus (ScraperStatus _s f _r e) n t = do
-> m JobLog
simuTask logStatus (JobLog _s f _r e) n t = do
let
m = (10 :: Int) ^ (6 :: Int)
_ <- liftBase $ threadDelay ( m * 10)
let status = ScraperStatus { _scst_succeeded = Just n
let status = JobLog { _scst_succeeded = Just n
, _scst_failed = f
, _scst_remaining = (-) <$> Just t <*> Just n
, _scst_events = e
......@@ -197,7 +197,7 @@ simuTask logStatus (ScraperStatus _s f _r e) n t = do
-}
simuTask' :: MonadBase IO m
=> (ScraperStatus -> m ())
=> (JobLog -> m ())
-> Int
-> Int
-> m ()
......@@ -206,7 +206,7 @@ simuTask' logStatus cur total = do
m = (10 :: Int) ^ (6 :: Int)
_ <- liftBase $ threadDelay ( m * 10)
let status = ScraperStatus { _scst_succeeded = Just cur
let status = JobLog { _scst_succeeded = Just cur
, _scst_failed = Just 0
, _scst_remaining = (-) <$> Just total <*> Just cur
, _scst_events = Just []
......
......@@ -155,7 +155,7 @@ computeGraph cId nt repo = do
------------------------------------------------------------
type GraphAsyncAPI = Summary "Update graph"
:> "async"
:> AsyncJobsAPI ScraperStatus () ScraperStatus
:> AsyncJobsAPI JobLog () JobLog
graphAsync :: UserId -> NodeId -> GargServer GraphAsyncAPI
......@@ -166,16 +166,16 @@ graphAsync u n =
graphAsync' :: UserId
-> NodeId
-> (ScraperStatus -> GargNoServer ())
-> GargNoServer ScraperStatus
-> (JobLog -> GargNoServer ())
-> GargNoServer JobLog
graphAsync' u n logStatus = do
logStatus ScraperStatus { _scst_succeeded = Just 0
logStatus JobLog { _scst_succeeded = Just 0
, _scst_failed = Just 0
, _scst_remaining = Just 1
, _scst_events = Just []
}
_g <- trace (show u) $ recomputeGraph u n
pure ScraperStatus { _scst_succeeded = Just 1
pure JobLog { _scst_succeeded = Just 1
, _scst_failed = Just 0
, _scst_remaining = Just 0
, _scst_events = Just []
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment