Commit 40ff740e authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

pollUntilWorkFinished returns a JobLog now

parent 645463e6
Pipeline #7521 canceled with stages
...@@ -12,6 +12,7 @@ import Data.Aeson.QQ ...@@ -12,6 +12,7 @@ import Data.Aeson.QQ
import Data.Text qualified as T import Data.Text qualified as T
import Data.Text.IO qualified as TIO import Data.Text.IO qualified as TIO
import Fmt import Fmt
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.HashedResponse import Gargantext.API.HashedResponse
import Gargantext.API.Ngrams.List.Types import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Types qualified as APINgrams import Gargantext.API.Ngrams.Types qualified as APINgrams
...@@ -22,7 +23,6 @@ import Gargantext.Core.Config ...@@ -22,7 +23,6 @@ import Gargantext.Core.Config
import Gargantext.Core.Text.List.Social import Gargantext.Core.Text.List.Social
import Gargantext.Core.Types import Gargantext.Core.Types
import Gargantext.Core.Types.Individu import Gargantext.Core.Types.Individu
import Gargantext.Core.Worker.Types
import Gargantext.Prelude import Gargantext.Prelude
import Paths_gargantext import Paths_gargantext
import Prelude import Prelude
...@@ -47,7 +47,7 @@ mkNewWithForm content name = NewWithForm ...@@ -47,7 +47,7 @@ mkNewWithForm content name = NewWithForm
} }
importTermsTSV :: SpecContext () -> String -> IO (JobInfo, CorpusId, ListId) importTermsTSV :: SpecContext () -> String -> IO (JobLog, CorpusId, ListId)
importTermsTSV (SpecContext testEnv port app _) name = do importTermsTSV (SpecContext testEnv port app _) name = do
cId <- liftIO $ newCorpusForUser testEnv "alice" cId <- liftIO $ newCorpusForUser testEnv "alice"
let log_cfg = test_config testEnv ^. gc_logging let log_cfg = test_config testEnv ^. gc_logging
...@@ -60,11 +60,11 @@ importTermsTSV (SpecContext testEnv port app _) name = do ...@@ -60,11 +60,11 @@ importTermsTSV (SpecContext testEnv port app _) name = do
, _wtf_data = simpleNgrams , _wtf_data = simpleNgrams
, _wtf_name = "simple.tsv" } , _wtf_name = "simple.tsv" }
pendingJob <- checkEither $ liftIO $ runClientM (add_tsv_to_list token listId params) clientEnv pendingJob <- checkEither $ liftIO $ runClientM (add_tsv_to_list token listId params) clientEnv
jobInfo <- pollUntilWorkFinished log_cfg port pendingJob jobLog <- pollUntilWorkFinished log_cfg port pendingJob
pure (jobInfo, cId, listId) pure (jobLog, cId, listId)
importCorpusTSV :: SpecContext () -> String -> IO (JobInfo, CorpusId, ListId) importCorpusTSV :: SpecContext () -> String -> IO (JobLog, CorpusId, ListId)
importCorpusTSV (SpecContext testEnv port app _) name = do importCorpusTSV (SpecContext testEnv port app _) name = do
cId <- liftIO $ newCorpusForUser testEnv "alice" cId <- liftIO $ newCorpusForUser testEnv "alice"
let log_cfg = test_config testEnv ^. gc_logging let log_cfg = test_config testEnv ^. gc_logging
...@@ -75,9 +75,9 @@ importCorpusTSV (SpecContext testEnv port app _) name = do ...@@ -75,9 +75,9 @@ importCorpusTSV (SpecContext testEnv port app _) name = do
simpleNgrams <- liftIO (TIO.readFile =<< getDataFileName name) simpleNgrams <- liftIO (TIO.readFile =<< getDataFileName name)
let params = mkNewWithForm simpleNgrams "simple.tsv" let params = mkNewWithForm simpleNgrams "simple.tsv"
pendingJob <- checkEither $ liftIO $ runClientM (importCorpus token cId params) clientEnv pendingJob <- checkEither $ liftIO $ runClientM (importCorpus token cId params) clientEnv
jobInfo <- pollUntilWorkFinished log_cfg port pendingJob jobLog <- pollUntilWorkFinished log_cfg port pendingJob
pure (jobInfo, cId, listId) pure (jobLog, cId, listId)
tests :: Spec tests :: Spec
tests = sequential $ aroundAll withTestDBAndPort $ do tests = sequential $ aroundAll withTestDBAndPort $ do
...@@ -110,7 +110,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -110,7 +110,7 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
length tr_docs `shouldBe` 7 length tr_docs `shouldBe` 7
it "should skip problematic rows" $ \ctx@(SpecContext _testEnv port app _) -> do it "should skip problematic rows" $ \ctx@(SpecContext _testEnv port app _) -> do
(_, cId, _listId) <- importCorpusTSV ctx "test-data/issue-380/malformed_row.tsv" (jobLogs, cId, _listId) <- importCorpusTSV ctx "test-data/issue-380/malformed_row.tsv"
withApplication app $ do withApplication app $ do
withValidLogin port "alice" (GargPassword "alice") $ \clientEnv token -> do withValidLogin port "alice" (GargPassword "alice") $ \clientEnv token -> do
-- Now check that we can retrieve the ngrams, and the ngrams list is not empty! -- Now check that we can retrieve the ngrams, and the ngrams list is not empty!
...@@ -118,3 +118,16 @@ tests = sequential $ aroundAll withTestDBAndPort $ do ...@@ -118,3 +118,16 @@ tests = sequential $ aroundAll withTestDBAndPort $ do
eRes <- checkEither $ runClientM (get_table token cId (Just APINgrams.Docs) Nothing Nothing Nothing Nothing Nothing) clientEnv eRes <- checkEither $ runClientM (get_table token cId (Just APINgrams.Docs) Nothing Nothing Nothing Nothing Nothing) clientEnv
let (HashedResponse _ TableResult{tr_docs}) = eRes let (HashedResponse _ TableResult{tr_docs}) = eRes
length tr_docs `shouldBe` 6 -- it must have skipped the broken row length tr_docs `shouldBe` 6 -- it must have skipped the broken row
-- Check that the events include the two failures we encountered.
_scst_events jobLogs `shouldBe` Just [
ScraperEvent {
_scev_message = Just "Skipping record at row 6 as parsing failed due to: no field named \"Publication Year\""
, _scev_level = Just "WARNING"
, _scev_date = Nothing
}
,ScraperEvent {
_scev_message = Just "Skipping record at row 8 as parsing failed due to: parse error (endOfInput)"
, _scev_level = Just "WARNING"
, _scev_date = Nothing
}
]
...@@ -75,7 +75,7 @@ import Test.Hspec ...@@ -75,7 +75,7 @@ import Test.Hspec
import Test.Hspec.Wai.Internal (withApplication, WaiSession) import Test.Hspec.Wai.Internal (withApplication, WaiSession)
import Test.Hspec.Wai.JSON (json) import Test.Hspec.Wai.JSON (json)
import Test.Types (JobPollHandle(..)) import Test.Types (JobPollHandle(..))
import Test.Utils (pollUntilWorkFinished, protectedJSON, withValidLogin) import Test.Utils (pollUntilWorkFinished, protectedJSON, withValidLogin, isJobFinished)
import Text.Printf (printf) import Text.Printf (printf)
...@@ -104,7 +104,7 @@ uploadJSONList log_cfg port token cId pathToNgrams clientEnv = do ...@@ -104,7 +104,7 @@ uploadJSONList log_cfg port token cId pathToNgrams clientEnv = do
ji <- checkEither $ liftIO $ runClientM (add_form_to_list token listId params) clientEnv ji <- checkEither $ liftIO $ runClientM (add_form_to_list token listId params) clientEnv
-- liftIO (_jph_status j' `shouldBe` "IsFinished") -- liftIO (_jph_status j' `shouldBe` "IsFinished")
ji' <- pollUntilWorkFinished log_cfg port ji ji' <- pollUntilWorkFinished log_cfg port ji
liftIO $ ji' `shouldBe` ji liftIO $ ji' `shouldSatisfy` isJobFinished
pure listId pure listId
...@@ -357,7 +357,7 @@ createDocsList testDataPath testEnv port clientEnv token = do ...@@ -357,7 +357,7 @@ createDocsList testDataPath testEnv port clientEnv token = do
let newWithForm = mkNewWithForm simpleDocs (T.pack $ takeBaseName testDataPath) let newWithForm = mkNewWithForm simpleDocs (T.pack $ takeBaseName testDataPath)
ji <- checkEither $ liftIO $ runClientM (add_file_async token corpusId newWithForm) clientEnv ji <- checkEither $ liftIO $ runClientM (add_file_async token corpusId newWithForm) clientEnv
ji' <- pollUntilWorkFinished log_cfg port ji ji' <- pollUntilWorkFinished log_cfg port ji
liftIO $ ji' `shouldBe` ji liftIO $ ji' `shouldSatisfy` isJobFinished
pure corpusId pure corpusId
where where
log_cfg = (test_config testEnv) ^. gc_logging log_cfg = (test_config testEnv) ^. gc_logging
...@@ -376,7 +376,7 @@ updateNode log_cfg port clientEnv token nodeId = do ...@@ -376,7 +376,7 @@ updateNode log_cfg port clientEnv token nodeId = do
let params = UpdateNodeParamsTexts Both let params = UpdateNodeParamsTexts Both
ji <- checkEither $ liftIO $ runClientM (update_node token nodeId params) clientEnv ji <- checkEither $ liftIO $ runClientM (update_node token nodeId params) clientEnv
ji' <- pollUntilWorkFinished log_cfg port ji ji' <- pollUntilWorkFinished log_cfg port ji
liftIO $ ji' `shouldBe` ji liftIO $ ji' `shouldSatisfy` isJobFinished
mkNewWithForm :: T.Text -> T.Text -> NewWithForm mkNewWithForm :: T.Text -> T.Text -> NewWithForm
mkNewWithForm content name = NewWithForm mkNewWithForm content name = NewWithForm
......
...@@ -28,6 +28,7 @@ module Test.Utils ( ...@@ -28,6 +28,7 @@ module Test.Utils (
, waitUntil , waitUntil
, withValidLogin , withValidLogin
, withValidLoginA , withValidLoginA
, isJobFinished
) where ) where
import Control.Concurrent.STM.TChan (TChan, readTChan) import Control.Concurrent.STM.TChan (TChan, readTChan)
...@@ -256,10 +257,10 @@ pollUntilWorkFinished :: HasCallStack ...@@ -256,10 +257,10 @@ pollUntilWorkFinished :: HasCallStack
=> LogConfig => LogConfig
-> Port -> Port
-> JobInfo -> JobInfo
-> WaiSession () JobInfo -> WaiSession () JobLog
pollUntilWorkFinished log_cfg port ji = do pollUntilWorkFinished log_cfg port ji = do
let waitSecs = 60 let waitSecs = 60
isFinishedTVar <- liftIO $ newTVarIO False isFinishedTVar <- liftIO $ newTVarIO Nothing
let wsConnect = let wsConnect =
withWSConnection ("127.0.0.1", port) $ \conn -> do withWSConnection ("127.0.0.1", port) $ \conn -> do
-- We wait a bit before the server settles -- We wait a bit before the server settles
...@@ -275,11 +276,11 @@ pollUntilWorkFinished log_cfg port ji = do ...@@ -275,11 +276,11 @@ pollUntilWorkFinished log_cfg port ji = do
Just (DT.NUpdateWorkerProgress ji' jl) -> do Just (DT.NUpdateWorkerProgress ji' jl) -> do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] received " <> show ji' <> ", " <> show jl logMsg ioL DEBUG $ "[pollUntilWorkFinished] received " <> show ji' <> ", " <> show jl
if ji' == ji && isFinished jl if ji' == ji && isJobFinished jl
then do then do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] FINISHED! " <> show ji' logMsg ioL DEBUG $ "[pollUntilWorkFinished] FINISHED! " <> show ji'
atomically $ writeTVar isFinishedTVar True atomically $ writeTVar isFinishedTVar (Just jl)
else else
pure () pure ()
_ -> pure () _ -> pure ()
...@@ -287,23 +288,24 @@ pollUntilWorkFinished log_cfg port ji = do ...@@ -287,23 +288,24 @@ pollUntilWorkFinished log_cfg port ji = do
liftIO $ withAsync wsConnect $ \_ -> do liftIO $ withAsync wsConnect $ \_ -> do
mRet <- Timeout.timeout (waitSecs * 1000 * millisecond) $ do mRet <- Timeout.timeout (waitSecs * 1000 * millisecond) $ do
let go = do let go = do
finished <- readTVarIO isFinishedTVar finished_mb <- readTVarIO isFinishedTVar
if finished case finished_mb of
then do Just job_log -> do
withLogger log_cfg $ \ioL -> withLogger log_cfg $ \ioL ->
logMsg ioL DEBUG $ "[pollUntilWorkFinished] JOB FINISHED: " <> show ji logMsg ioL DEBUG $ "[pollUntilWorkFinished] JOB FINISHED: " <> show ji
return True pure job_log
else do Nothing -> do
threadDelay (50 * millisecond) threadDelay (50 * millisecond)
go go
go go
case mRet of case mRet of
Nothing -> panicTrace $ "[pollUntilWorkFinished] timed out while waiting to finish job " <> show ji Nothing -> panicTrace $ "[pollUntilWorkFinished] timed out while waiting to finish job " <> show ji
Just _ -> return ji Just jl -> pure jl
where
isFinished (JobLog { .. }) = _scst_remaining == Just 0 isJobFinished :: JobLog -> Bool
isJobFinished (JobLog { .. }) = _scst_remaining == Just 0
-- | Like HUnit's '@?=', but With a nicer error message in case the two entities are not equal. -- | Like HUnit's '@?=', but With a nicer error message in case the two entities are not equal.
(@??=) :: (HasCallStack, ToExpr a, Eq a) => a -> a -> Assertion (@??=) :: (HasCallStack, ToExpr a, Eq a) => a -> a -> Assertion
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment