[worker] garg jobType serialized and goes to the worker

Now one should add arguments to it as well
parent 7520135e
...@@ -896,6 +896,7 @@ test-suite garg-test-tasty ...@@ -896,6 +896,7 @@ test-suite garg-test-tasty
Test.Database.Types Test.Database.Types
Test.Graph.Clustering Test.Graph.Clustering
Test.Graph.Distance Test.Graph.Distance
Test.Instances
Test.Ngrams.Lang Test.Ngrams.Lang
Test.Ngrams.Lang.En Test.Ngrams.Lang.En
Test.Ngrams.Lang.Fr Test.Ngrams.Lang.Fr
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
module Gargantext.API.Admin.EnvTypes ( module Gargantext.API.Admin.EnvTypes (
GargJob(..) GargJob(..)
, parseGargJob
, Env(..) , Env(..)
, Mode(..) , Mode(..)
, modeToLoggingLevels , modeToLoggingLevels
...@@ -23,9 +24,12 @@ module Gargantext.API.Admin.EnvTypes ( ...@@ -23,9 +24,12 @@ module Gargantext.API.Admin.EnvTypes (
, ConcreteJobHandle -- opaque , ConcreteJobHandle -- opaque
) where ) where
import Control.Lens hiding (Level, (:<)) import Control.Lens hiding (Level, (:<), (.=))
import Control.Monad.Except import Control.Monad.Except
import Control.Monad.Reader import Control.Monad.Reader
import Data.Aeson qualified as Aeson
import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch)
import Data.List ((\\)) import Data.List ((\\))
import Data.Pool (Pool) import Data.Pool (Pool)
import Data.Sequence (ViewL(..), viewl) import Data.Sequence (ViewL(..), viewl)
...@@ -89,24 +93,69 @@ instance HasLogger (GargM Env BackendInternalError) where ...@@ -89,24 +93,69 @@ instance HasLogger (GargM Env BackendInternalError) where
data GargJob data GargJob
= TableNgramsJob = AddAnnuaireFormJob
| ForgotPasswordJob
| UpdateNgramsListJobJSON
| UpdateNgramsListJobTSV
| AddContactJob | AddContactJob
| AddCorpusFileJob
| AddCorpusFormJob
| AddCorpusQueryJob
| AddFileJob | AddFileJob
| DocumentFromWriteNodeJob | DocumentFromWriteNodeJob
| UpdateNodeJob | ForgotPasswordJob
| UploadFrameCalcJob
| UploadDocumentJob
| NewNodeJob | NewNodeJob
| AddCorpusQueryJob
| AddCorpusFormJob
| AddCorpusFileJob
| AddAnnuaireFormJob
| RecomputeGraphJob | RecomputeGraphJob
| TableNgramsJob
| UpdateNgramsListJobJSON
| UpdateNgramsListJobTSV
| UpdateNodeJob
| UploadDocumentJob
| UploadFrameCalcJob
deriving (Show, Eq, Ord, Enum, Bounded) deriving (Show, Eq, Ord, Enum, Bounded)
parseGargJob :: Text -> Maybe GargJob
parseGargJob s = case s of
"addannuaireform" -> Just AddAnnuaireFormJob
"addcontact" -> Just AddContactJob
"addcorpusfile" -> Just AddCorpusFileJob
"addcorpusform" -> Just AddCorpusFormJob
"addcorpusquery" -> Just AddCorpusQueryJob
"addfile" -> Just AddFileJob
"documentfromwritenode" -> Just DocumentFromWriteNodeJob
"forgotpassword" -> Just ForgotPasswordJob
"newnode" -> Just NewNodeJob
"recomputegraph" -> Just RecomputeGraphJob
"tablengrams" -> Just TableNgramsJob
"updatedocument" -> Just UploadDocumentJob
"updateframecalc" -> Just UploadFrameCalcJob
"updatengramslistjson" -> Just UpdateNgramsListJobJSON
"updatengramslisttsv" -> Just UpdateNgramsListJobTSV
"updatenode" -> Just UpdateNodeJob
_ -> Nothing
instance FromJSON GargJob where
parseJSON = withObject "GargJob" $ \o -> do
type_ <- o .: "type"
case parseGargJob type_ of
Just gj -> return gj
Nothing -> prependFailure "parsing garg job type failed, " (typeMismatch "type" $ Aeson.String type_)
instance ToJSON GargJob where
toJSON AddAnnuaireFormJob = object [ ("type" .= ("addannuaireform" :: Text))]
toJSON AddContactJob = object [ ("type" .= ("addcontact" :: Text))]
toJSON AddCorpusFileJob = object [ ("type" .= ("addcorpusfile" :: Text))]
toJSON AddCorpusFormJob = object [ ("type" .= ("addcorpusform" :: Text))]
toJSON AddCorpusQueryJob = object [ ("type" .= ("addcorpusquery" :: Text))]
toJSON AddFileJob = object [ ("type" .= ("addfile" :: Text))]
toJSON DocumentFromWriteNodeJob = object [ ("type" .= ("documentfromwritenode" :: Text))]
toJSON ForgotPasswordJob = object [ ("type" .= ("forgotpassword" :: Text))]
toJSON NewNodeJob = object [ ("type" .= ("newnode" :: Text))]
toJSON RecomputeGraphJob = object [ ("type" .= ("recomputegraph" :: Text))]
toJSON TableNgramsJob = object [ ("type" .= ("tablengrams" :: Text))]
toJSON UploadDocumentJob = object [ ("type" .= ("updatedocument" :: Text))]
toJSON UploadFrameCalcJob = object [ ("type" .= ("updateframecalc" :: Text))]
toJSON UpdateNgramsListJobJSON = object [ ("type" .= ("updatengramslistjson" :: Text))]
toJSON UpdateNgramsListJobTSV = object [ ("type" .= ("updatengramslisttsv" :: Text))]
toJSON UpdateNodeJob = object [ ("type" .= ("updatenode" :: Text))]
-- Do /not/ treat the data types of this type as strict, because it's convenient -- Do /not/ treat the data types of this type as strict, because it's convenient
-- to be able to partially initialise things like an 'Env' during tests, without -- to be able to partially initialise things like an 'Env' during tests, without
-- having to specify /everything/. This means that when we /construct/ an 'Env', -- having to specify /everything/. This means that when we /construct/ an 'Env',
......
...@@ -13,10 +13,10 @@ Portability : POSIX ...@@ -13,10 +13,10 @@ Portability : POSIX
{-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DuplicateRecordFields #-} -- permit duplications for field names in multiple constructors {-# LANGUAGE DuplicateRecordFields #-} -- permit duplications for field names in multiple constructors
{-# LANGUAGE KindSignatures #-} -- for use of Endpoint (name :: Symbol) {-# LANGUAGE KindSignatures #-} -- for use of Endpoint (name :: Symbol)
{-# LANGUAGE PartialTypeSignatures #-} -- to automatically use suggested type hole signatures during compilation
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE LambdaCase #-} {-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PartialTypeSignatures #-} -- to automatically use suggested type hole signatures during compilation
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.GraphQL where module Gargantext.API.GraphQL where
......
...@@ -30,6 +30,12 @@ import Gargantext.Prelude ...@@ -30,6 +30,12 @@ import Gargantext.Prelude
-- | Spawn a worker with Redis broker
-- TODO:
-- - reduce size of DB pool
-- - progress report via notifications
-- - I think there is no point to save job result, as usually there is none (we have side-effects only)
-- - replace Servant.Job to use workers instead of garg API threads
withRedisWorker :: (HasWorkerBroker RedisBroker Job, HasSettings env, CmdCommon env) withRedisWorker :: (HasWorkerBroker RedisBroker Job, HasSettings env, CmdCommon env)
=> env => env
-> WorkerDefinition -> WorkerDefinition
...@@ -68,3 +74,4 @@ performAction env _state bm = do ...@@ -68,3 +74,4 @@ performAction env _state bm = do
case us of case us of
[u] -> forgotUserPassword u [u] -> forgotUserPassword u
_ -> pure () _ -> pure ()
GargJob { _gj_garg_job } -> putStrLn ("Garg job: " <> show _gj_garg_job :: Text)
...@@ -15,12 +15,14 @@ module Gargantext.Core.Worker.Jobs.Types where ...@@ -15,12 +15,14 @@ module Gargantext.Core.Worker.Jobs.Types where
import Data.Aeson ((.:), (.=), object, withObject) import Data.Aeson ((.:), (.=), object, withObject)
import Data.Aeson.Types (prependFailure, typeMismatch) import Data.Aeson.Types (prependFailure, typeMismatch)
import Gargantext.API.Admin.EnvTypes ( GargJob )
import Gargantext.Prelude import Gargantext.Prelude
data Job = data Job =
Ping Ping
| ForgotPassword { _fp_email :: Text } | ForgotPassword { _fp_email :: Text }
| GargJob { _gj_garg_job :: GargJob }
deriving (Show, Eq) deriving (Show, Eq)
instance FromJSON Job where instance FromJSON Job where
parseJSON = withObject "Job" $ \o -> do parseJSON = withObject "Job" $ \o -> do
...@@ -30,8 +32,13 @@ instance FromJSON Job where ...@@ -30,8 +32,13 @@ instance FromJSON Job where
"ForgotPassword" -> do "ForgotPassword" -> do
_fp_email <- o .: "email" _fp_email <- o .: "email"
return $ ForgotPassword { _fp_email } return $ ForgotPassword { _fp_email }
"GargJob" -> do
_gj_garg_job <- o .: "garg_job"
return $ GargJob { _gj_garg_job }
s -> prependFailure "parsing job type failed, " (typeMismatch "type" s) s -> prependFailure "parsing job type failed, " (typeMismatch "type" s)
instance ToJSON Job where instance ToJSON Job where
toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ] toJSON Ping = object [ ("type" .= ("Ping" :: Text)) ]
toJSON (ForgotPassword { _fp_email }) = object [ ("type" .= ("ForgotPassword" :: Text)) toJSON (ForgotPassword { _fp_email }) = object [ ("type" .= ("ForgotPassword" :: Text))
, ("email" .= _fp_email) ] , ("email" .= _fp_email) ]
toJSON (GargJob { _gj_garg_job }) = object [ ("type" .= ("GargJob" :: Text))
, ("garg_job" .= _gj_garg_job) ]
...@@ -25,19 +25,20 @@ module Gargantext.Utils.Jobs ( ...@@ -25,19 +25,20 @@ module Gargantext.Utils.Jobs (
import Control.Monad.Except ( runExceptT ) import Control.Monad.Except ( runExceptT )
import Control.Monad.Reader ( MonadReader(ask), ReaderT(runReaderT) ) import Control.Monad.Reader ( MonadReader(ask), ReaderT(runReaderT) )
import Data.Aeson (ToJSON) import Data.Aeson (ToJSON)
import Prelude import Data.Text qualified as T
import System.Directory (doesFileExist) import Gargantext.API.Admin.EnvTypes ( mkJobHandle, parseGargJob, Env, GargJob(..) )
import Text.Read (readMaybe)
import qualified Data.Text as T
import Gargantext.API.Admin.EnvTypes ( mkJobHandle, Env, GargJob(..) )
import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) ) import Gargantext.API.Errors.Types ( BackendInternalError(InternalJobError) )
import Gargantext.API.Prelude ( GargM ) import Gargantext.API.Prelude ( GargM )
import qualified Gargantext.Utils.Jobs.Internal as Internal import Gargantext.Core.Worker.Jobs qualified as Jobs
import Gargantext.Utils.Jobs.Monad ( JobError, MonadJobStatus(..), markFailureNoErr, markFailedNoErr ) import Gargantext.Core.Worker.Jobs.Types qualified as Jobs
import Gargantext.System.Logging import Gargantext.System.Logging
import Gargantext.Utils.Jobs.Internal qualified as Internal
import Gargantext.Utils.Jobs.Monad ( JobError, MonadJobStatus(..), markFailureNoErr, markFailedNoErr )
import Prelude
import Servant.Job.Async qualified as SJ
import System.Directory (doesFileExist)
import Text.Read (readMaybe)
import qualified Servant.Job.Async as SJ
jobErrorToGargError jobErrorToGargError
:: JobError -> BackendInternalError :: JobError -> BackendInternalError
...@@ -61,29 +62,10 @@ serveJobsAPI ...@@ -61,29 +62,10 @@ serveJobsAPI
serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do serveJobsAPI jobType f = Internal.serveJobsAPI mkJobHandle ask jobType jobErrorToGargError $ \env jHandle i -> do
runExceptT $ flip runReaderT env $ do runExceptT $ flip runReaderT env $ do
$(logLocM) INFO (T.pack $ "Running job of type: " ++ show jobType) $(logLocM) INFO (T.pack $ "Running job of type: " ++ show jobType)
Jobs.sendJob $ Jobs.GargJob { Jobs._gj_garg_job = jobType }
f jHandle i f jHandle i
getLatestJobStatus jHandle getLatestJobStatus jHandle
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
"tablengrams" -> Just TableNgramsJob
"forgotpassword" -> Just ForgotPasswordJob
"updatengramslistjson" -> Just UpdateNgramsListJobJSON
"updatengramslisttsv" -> Just UpdateNgramsListJobTSV
"addcontact" -> Just AddContactJob
"addfile" -> Just AddFileJob
"documentfromwritenode" -> Just DocumentFromWriteNodeJob
"updatenode" -> Just UpdateNodeJob
"updateframecalc" -> Just UploadFrameCalcJob
"updatedocument" -> Just UploadDocumentJob
"newnode" -> Just NewNodeJob
"addcorpusquery" -> Just AddCorpusQueryJob
"addcorpusform" -> Just AddCorpusFormJob
"addcorpusfile" -> Just AddCorpusFileJob
"addannuaireform" -> Just AddAnnuaireFormJob
"recomputegraph" -> Just RecomputeGraphJob
_ -> Nothing
parsePrios :: [String] -> IO [(GargJob, Int)] parsePrios :: [String] -> IO [(GargJob, Int)]
parsePrios [] = pure [] parsePrios [] = pure []
parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs
...@@ -91,7 +73,7 @@ parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs ...@@ -91,7 +73,7 @@ parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs
([], _) -> error "parsePrios: empty jobname?" ([], _) -> error "parsePrios: empty jobname?"
(prop, valS) (prop, valS)
| Just val <- readMaybe (tail valS) | Just val <- readMaybe (tail valS)
, Just j <- parseGargJob prop -> pure (j, val) , Just j <- parseGargJob (T.pack prop) -> pure (j, val)
| otherwise -> error $ | otherwise -> error $
"parsePrios: invalid input. " ++ show (prop, valS) "parsePrios: invalid input. " ++ show (prop, valS)
......
...@@ -15,18 +15,12 @@ import Data.Aeson qualified as Aeson ...@@ -15,18 +15,12 @@ import Data.Aeson qualified as Aeson
import Gargantext.Core.Methods.Similarities.Conditional import Gargantext.Core.Methods.Similarities.Conditional
import Gargantext.Core.Worker.Jobs.Types (Job(..)) import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Prelude import Gargantext.Prelude
import Test.Instances ()
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
import Test.Tasty.QuickCheck hiding (Positive, Negative) import Test.Tasty.QuickCheck hiding (Positive, Negative)
instance Arbitrary Job where
arbitrary = oneof [ pure Ping, forgotPasswordGen ]
where
forgotPasswordGen = do
_fp_email <- arbitrary
return $ ForgotPassword { _fp_email }
tests :: TestTree tests :: TestTree
tests = testGroup "worker unit tests" [ tests = testGroup "worker unit tests" [
......
module Test.Instances where
import Gargantext.API.Admin.EnvTypes as EnvTypes
import Gargantext.Core.Worker.Jobs.Types (Job(..))
import Gargantext.Prelude
import Text.Parsec.Error (ParseError, Message(..), newErrorMessage)
import Text.Parsec.Pos
import Test.Tasty.QuickCheck hiding (Positive, Negative)
instance Arbitrary EnvTypes.GargJob where
arbitrary = do
oneof [ pure AddAnnuaireFormJob
, pure AddContactJob
, pure AddCorpusFileJob
, pure AddCorpusFormJob
, pure AddCorpusQueryJob
, pure AddFileJob
, pure DocumentFromWriteNodeJob
, pure ForgotPasswordJob
, pure NewNodeJob
, pure RecomputeGraphJob
, pure TableNgramsJob
, pure UpdateNgramsListJobJSON
, pure UpdateNgramsListJobTSV
, pure UpdateNodeJob
, pure UploadDocumentJob
, pure UploadFrameCalcJob
]
instance Arbitrary Job where
arbitrary = oneof [ pure Ping
, forgotPasswordGen
, gargJobGen ]
where
forgotPasswordGen = do
_fp_email <- arbitrary
return $ ForgotPassword { _fp_email }
gargJobGen = do
_gj_garg_job <- arbitrary
return $ GargJob { _gj_garg_job }
instance Arbitrary Message where
arbitrary = do
msgContent <- arbitrary
oneof $ return <$> [SysUnExpect msgContent
, UnExpect msgContent
, Expect msgContent
, Message msgContent
]
instance Arbitrary SourcePos where
arbitrary = do
sn <- arbitrary
l <- arbitrary
c <- arbitrary
return $ newPos sn l c
instance Arbitrary ParseError where
arbitrary = do
sp <- arbitrary
msg <- arbitrary
return $ newErrorMessage msg sp
...@@ -19,6 +19,7 @@ module Test.Parsers.Types where ...@@ -19,6 +19,7 @@ module Test.Parsers.Types where
import Gargantext.Prelude import Gargantext.Prelude
import Test.Instances ()
import Test.QuickCheck import Test.QuickCheck
import Test.QuickCheck.Instances () import Test.QuickCheck.Instances ()
...@@ -43,25 +44,3 @@ looseZonedTimePrecision (ZonedTime lt tz) = ZonedTime (looseLocalTimePrecision l ...@@ -43,25 +44,3 @@ looseZonedTimePrecision (ZonedTime lt tz) = ZonedTime (looseLocalTimePrecision l
loosePrecisionEitherPEZT :: Either ParseError ZonedTime -> Either ParseError ZonedTime loosePrecisionEitherPEZT :: Either ParseError ZonedTime -> Either ParseError ZonedTime
loosePrecisionEitherPEZT (Right zt) = Right $ looseZonedTimePrecision zt loosePrecisionEitherPEZT (Right zt) = Right $ looseZonedTimePrecision zt
loosePrecisionEitherPEZT pe = pe loosePrecisionEitherPEZT pe = pe
instance Arbitrary Message where
arbitrary = do
msgContent <- arbitrary
oneof $ return <$> [SysUnExpect msgContent
, UnExpect msgContent
, Expect msgContent
, Message msgContent
]
instance Arbitrary SourcePos where
arbitrary = do
sn <- arbitrary
l <- arbitrary
c <- arbitrary
return $ newPos sn l c
instance Arbitrary ParseError where
arbitrary = do
sp <- arbitrary
msg <- arbitrary
return $ newErrorMessage msg sp
...@@ -14,12 +14,13 @@ Portability : POSIX ...@@ -14,12 +14,13 @@ Portability : POSIX
{-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE NumericUnderscores #-}
module Test.Utils.Jobs (test) where module Test.Utils.Jobs ( test, qcTests ) where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.Async qualified as Async import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Data.Aeson qualified as Aeson
import Data.Sequence ((|>), fromList) import Data.Sequence ((|>), fromList)
import Data.Time import Data.Time
import Debug.RecoverRTTI (anythingToString) import Debug.RecoverRTTI (anythingToString)
...@@ -42,6 +43,9 @@ import Servant.Job.Types qualified as SJ ...@@ -42,6 +43,9 @@ import Servant.Job.Types qualified as SJ
import System.IO.Unsafe import System.IO.Unsafe
import Test.Hspec import Test.Hspec
import Test.Hspec.Expectations.Contrib (annotate) import Test.Hspec.Expectations.Contrib (annotate)
import Test.Instances () -- arbitrary instances
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.QuickCheck hiding (Positive, Negative)
data JobT = A data JobT = A
...@@ -432,3 +436,10 @@ test = do ...@@ -432,3 +436,10 @@ test = do
testFetchJobStatusNoContention testFetchJobStatusNoContention
it "marking stuff behaves as expected" $ it "marking stuff behaves as expected" $
testMarkProgress testMarkProgress
qcTests :: TestTree
qcTests = testGroup "jobs qc tests" [
testProperty "GargJob to/from JSON serialization is correct" $
\job -> Aeson.decode (Aeson.encode (job :: EnvTypes.GargJob)) == Just job
]
...@@ -57,4 +57,5 @@ main = do ...@@ -57,4 +57,5 @@ main = do
, Phylo.tests , Phylo.tests
, testGroup "Stemming" [ Lancaster.tests ] , testGroup "Stemming" [ Lancaster.tests ]
, Worker.tests , Worker.tests
, Jobs.qcTests
] ]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment