Commit 375722ae authored by Alp Mestanogullari's avatar Alp Mestanogullari

introduce and use a flexible job queue system

parent 71b8eeb2
Pipeline #3285 passed with stage
in 93 minutes and 26 seconds
module Auth where
import Prelude
import Core
import Options
import Control.Monad.IO.Class
import Data.Text.Encoding (encodeUtf8)
import Options.Generic
import Servant.Client
import qualified Servant.Auth.Client as SA
import Gargantext.API.Client
import qualified Gargantext.API.Admin.Auth.Types as Auth
import qualified Gargantext.Core.Types.Individu as Auth
import qualified Gargantext.Database.Admin.Types.Node as Node
-- | Authenticate and use the resulting Token to perform
-- auth-restricted actions
withAuthToken
:: ClientOpts -- ^ source of user/pass data
-> (SA.Token -> Node.NodeId -> ClientM a) -- ^ do something once authenticated
-> ClientM a
withAuthToken opts act
-- both user and password CLI arguments passed
| Helpful (Just usr) <- user opts
, Helpful (Just pw) <- pass opts = do
authRes <- postAuth (Auth.AuthRequest usr (Auth.GargPassword pw))
case Auth._authRes_valid authRes of
-- authentication failed, this function critically needs it to
-- be able to run the action, so we abort
Nothing -> problem $
"invalid auth response: " ++
maybe "" (show . Auth._authInv_message)
(Auth._authRes_inval authRes)
-- authentication went through, we can run the action
Just (Auth.AuthValid tok tree_id _uid) -> do
let tok' = SA.Token (encodeUtf8 tok)
whenVerbose opts $ do
liftIO . putStrLn $ "[Debug] Authenticated: token=" ++ show tok ++
", tree_id=" ++ show tree_id
act tok' tree_id
-- user and/or pass CLI arguments not passed
| otherwise =
problem "auth-protected actions require --user and --pass"
module Core (problem, whenVerbose) where
import Prelude
import Options
import Options.Generic
import Control.Exception
import Control.Monad
import Control.Monad.Catch
import Servant.Client
newtype GargClientException = GCE String
instance Show GargClientException where
show (GCE s) = "Garg client exception: " ++ s
instance Exception GargClientException
-- | Abort with a message
problem :: String -> ClientM a
problem = throwM . GCE
-- | Only run the given computation when the @--verbose@ flag is
-- passed.
whenVerbose :: Monad m => ClientOpts -> m () -> m ()
whenVerbose opts act = when (unHelpful $ verbose opts) act
module Main where
import Control.Monad
import Network.HTTP.Client
import Options
import Options.Generic
import Prelude
import Script (script)
import Servant.Client
main :: IO ()
main = do
-- we parse CLI options
opts@(ClientOpts (Helpful uri) _ _ (Helpful verb)) <- getRecord "Gargantext client"
mgr <- newManager defaultManagerSettings
burl <- parseBaseUrl uri
when verb $ do
putStrLn $ "[Debug] user: " ++ maybe "<none>" show (unHelpful $ user opts)
putStrLn $ "[Debug] backend: " ++ show burl
-- we run 'script' from the Script module, reporting potential errors
res <- runClientM (script opts) (mkClientEnv mgr burl)
case res of
Left err -> putStrLn $ "[Client error] " ++ show err
Right a -> print a
{-# LANGUAGE TypeOperators #-}
module Options where
import Prelude
import Options.Generic
-- | Some general options to be specified on the command line.
data ClientOpts = ClientOpts
{ url :: String <?> "URL to gargantext backend"
, user :: Maybe Text <?> "(optional) username for auth-restricted actions"
, pass :: Maybe Text <?> "(optional) password for auth-restricted actions"
, verbose :: Bool <?> "Enable verbose output"
} deriving (Generic, Show)
instance ParseRecord ClientOpts
module Script (script) where
import Auth
import Control.Monad.IO.Class
import Core
import Gargantext.API.Client
import Options
import Prelude
import Servant.Client
import Tracking
-- | An example script. Tweak, rebuild and re-run the executable to see the
-- effect of your changes. You can hit any gargantext endpoint in the body
-- of 'script' using the many (many!) client functions exposed by the
-- 'Gargantext.API.Client' module.
--
-- Don't forget to pass @--user@ and @--pass@ if you're using 'withAuthToken'.
script :: ClientOpts -> ClientM ()
script opts = do
-- we start by asking the backend for its version
ver <- getBackendVersion
liftIO . putStrLn $ "Backend version: " ++ show ver
-- next we authenticate using the credentials given on the command line
-- (through --user and --pass), erroring out loudly if the auth creds don't
-- go through, running the continuation otherwise.
withAuthToken opts $ \tok userNode -> do
liftIO . putStrLn $ "user node: " ++ show userNode
steps <-
-- we run a few client computations while tracking some EKG metrics
-- (any RTS stats or routing-related data), which means that we sample the
-- metrics at the beginning, the end, and in between each pair of steps.
tracking opts ["rts.gc.bytes_allocated"]
[ ("get roots", do
roots <- getRoots tok
liftIO . putStrLn $ "roots: " ++ show roots
)
, ("get user node detail", do
userNodeDetail <- getNode tok userNode
liftIO . putStrLn $ "user node details: " ++ show userNodeDetail
)
]
-- we pretty print the values we sampled for all metrics and the
-- results of all the steps
whenVerbose opts (ppTracked steps)
{-# LANGUAGE TupleSections #-}
module Tracking
( tracking
, ppTracked
, EkgMetric
, Step
) where
import Core
import Options
import Prelude
import Control.Monad.IO.Class
import Data.List (intersperse)
import Data.Text (Text)
import Servant.Client
import System.Metrics.Json (Value)
import Gargantext.API.Client
import qualified Data.Text as T
-- | e.g @["rts", "gc", "bytes_allocated"]@
type EkgMetric = [Text]
-- | Any textual description of a step
type Step = Text
-- | Track EKG metrics before/after running a bunch of computations
-- that can talk to the backend.
tracking
:: ClientOpts
-> [Text] -- ^ e.g @["rts.gc.bytes_allocated"]@
-> [(Step, ClientM a)]
-> ClientM [Either [(EkgMetric, Value)] (Step, a)]
-- no steps, nothing to do
tracking _ _ [] = return []
-- no metrics to track, we just run the steps
tracking _ [] steps = traverse runStep steps
-- metrics to track: we intersperse metric fetching and steps,
-- starting and ending with metric fetching
tracking opts ms' steps = mix (Left <$> fetchMetrics) (map runStep steps)
where fetchMetrics :: ClientM [(EkgMetric, Value)]
fetchMetrics = flip traverse ms $ \metric -> do
whenVerbose opts $
liftIO . putStrLn $ "[Debug] metric to track: " ++ T.unpack (T.intercalate "." metric)
dat <- (metric,) <$> getMetricSample metric
whenVerbose opts $
liftIO . putStrLn $ "[Debug] metric pulled: " ++ show dat
return dat
mix :: ClientM a -> [ClientM a] -> ClientM [a]
mix x xs = sequence $ [x] ++ intersperse x xs ++ [x]
ms = map (T.splitOn ".") ms'
-- ^ A trivial function to print results of steps and sampled metrics
ppTracked :: Show a => [Either [(EkgMetric, Value)] (Step, a)] -> ClientM ()
ppTracked [] = return ()
ppTracked (Right (step, a) : rest) = do
liftIO . putStrLn $ "[step: " ++ T.unpack step ++ "] returned: " ++ show a
ppTracked rest
ppTracked (Left ms : rest) = do
liftIO . putStrLn $ unlines
[ T.unpack (T.intercalate "." metric) ++ " = " ++ show val
| (metric, val) <- ms
]
ppTracked rest
runStep :: (Step, ClientM a) -> ClientM (Either e (Step, a))
runStep (step, act) = Right . (step,) <$> act
This diff is collapsed.
......@@ -68,7 +68,6 @@ library:
- Gargantext.API.Admin.Auth.Types
- Gargantext.API.Admin.Types
- Gargantext.API.Prelude
- Gargantext.API.Client
- Gargantext.Core
- Gargantext.Core.NodeStory
- Gargantext.Core.Methods.Distances
......@@ -76,6 +75,13 @@ library:
- Gargantext.Core.Types.Individu
- Gargantext.Core.Types.Main
- Gargantext.Core.Utils.Prefix
- Gargantext.Utils.Jobs
- Gargantext.Utils.Jobs.API
- Gargantext.Utils.Jobs.Map
- Gargantext.Utils.Jobs.Monad
- Gargantext.Utils.Jobs.Queue
- Gargantext.Utils.Jobs.Settings
- Gargantext.Utils.Jobs.State
- Gargantext.Utils.SpacyNLP
- Gargantext.Database.Action.Flow
- Gargantext.Database.Action.Flow.Types
......@@ -262,6 +268,7 @@ library:
- singletons # (IGraph)
- split
- stemmer
- stm
- swagger2
- taggy-lens
- tagsoup
......@@ -347,42 +354,6 @@ executables:
- unordered-containers
- full-text-search
gargantext-client:
main: Main.hs
source-dirs: bin/gargantext-client
ghc-options:
- -Wall
- -threaded
- -rtsopts
- -with-rtsopts=-N
- -O2
- -Wmissing-signatures
default-extensions:
- DataKinds
- DeriveGeneric
- FlexibleContexts
- FlexibleInstances
- GeneralizedNewtypeDeriving
- MultiParamTypeClasses
- NamedFieldPuns
- NoImplicitPrelude
- OverloadedStrings
- RankNTypes
- RecordWildCards
dependencies:
- base
- extra
- servant
- text
- optparse-generic
- exceptions
- servant-client
- servant-auth-client
- gargantext
- ekg-json
- http-client
gargantext-phylo:
main: Main.hs
source-dirs: bin/gargantext-phylo
......@@ -533,6 +504,19 @@ tests:
- duckling
- text
- unordered-containers
jobqueue-test:
main: Main.hs
source-dirs: tests/queue
ghc-options:
- -threaded
- -rtsopts
- -with-rtsopts=-N
dependencies:
- base
- gargantext
- hspec
- async
- stm
# garg-doctest:
# main: Main.hs
# source-dirs: src-doctest
......
......@@ -44,11 +44,11 @@ import Data.Validity
import GHC.Base (Applicative)
import GHC.Generics (Generic)
import Gargantext.API.Admin.Auth.Types (AuthContext)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Admin.Settings (newEnv)
import Gargantext.API.Admin.Types (FireWall(..), PortNumber, cookieSettings, jwtSettings, settings)
import Gargantext.API.EKG
import Gargantext.API.Ngrams (saveNodeStoryImmediate)
import Gargantext.API.Prelude
import Gargantext.API.Routes
import Gargantext.API.Server (server)
import Gargantext.Core.NodeStory
......@@ -207,7 +207,7 @@ serverGargAdminAPI = roots
--gargMock = mock apiGarg Proxy
---------------------------------------------------------------------
makeApp :: (Typeable env, EnvC env) => env -> IO Application
makeApp :: Env -> IO Application
makeApp env = do
serv <- server env
(ekgStore, ekgMid) <- newEkgStore api
......
......@@ -47,16 +47,16 @@ import Data.UUID.V4 (nextRandom)
import GHC.Generics (Generic)
import Servant
import Servant.Auth.Server
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
--import qualified Text.Blaze.Html5.Attributes as HA
import qualified Gargantext.Prelude.Crypto.Auth as Auth
import Gargantext.API.Admin.Auth.Types
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types
import Gargantext.API.Job (jobLogSuccess)
import Gargantext.API.Prelude (HasJoseError(..), joseError, HasServerError, GargServerC, GargServer, _ServerError)
import Gargantext.API.Prelude (HasJoseError(..), joseError, HasServerError, GargServerC, GargServer, _ServerError, GargM, GargError)
import Gargantext.Core.Mail (MailModel(..), mail)
import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Core.Types.Individu (User(..), Username, GargPassword(..))
......@@ -69,6 +69,7 @@ import Gargantext.Database.Query.Tree.Root (getRoot)
import Gargantext.Database.Schema.Node (NodePoly(_node_id))
import Gargantext.Prelude hiding (reverse)
import Gargantext.Prelude.Crypto.Pass.User (gargPass)
import Gargantext.Utils.Jobs (serveJobsAPI)
---------------------------------------------------
......@@ -266,12 +267,10 @@ generateForgotPasswordUUID = do
type ForgotPasswordAsyncAPI = Summary "Forgot password asnc"
:> AsyncJobs JobLog '[JSON] ForgotPasswordAsyncParams JobLog
forgotPasswordAsync :: GargServer ForgotPasswordAsyncAPI
forgotPasswordAsync :: ServerT ForgotPasswordAsyncAPI (GargM Env GargError)
forgotPasswordAsync =
serveJobsAPI $
JobFunction (\p log' ->
forgotPasswordAsync' p (liftBase . log')
)
serveJobsAPI ForgotPasswordJob $ \p log' ->
forgotPasswordAsync' p (liftBase . log')
forgotPasswordAsync' :: (FlowCmdM env err m)
=> ForgotPasswordAsyncParams
......
......@@ -5,6 +5,9 @@
module Gargantext.API.Admin.EnvTypes where
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader
import Data.Monoid
import Data.Pool (Pool)
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
......@@ -16,6 +19,7 @@ import qualified Servant.Job.Core
import Gargantext.API.Admin.Types
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Prelude (GargError)
import Gargantext.Core.NodeStory
import Gargantext.Core.Mail.Types (HasMail, mailSettings)
import Gargantext.Database.Prelude (HasConnectionPool(..), HasConfig(..))
......@@ -23,6 +27,27 @@ import Gargantext.Prelude
import Gargantext.Prelude.Config (GargConfig(..))
import Gargantext.Prelude.Mail.Types (MailConfig)
import qualified Gargantext.Utils.Jobs.Monad as Jobs
data GargJob
= TableNgramsJob
| ForgotPasswordJob
| UpdateNgramsListJobJSON
| UpdateNgramsListJobCSV
| AddContactJob
| AddFileJob
| DocumentFromWriteNodeJob
| UpdateNodeJob
| UploadFrameCalcJob
| UploadDocumentJob
| NewNodeJob
| AddCorpusQueryJob
| AddCorpusFormJob
| AddCorpusFileJob
| AddAnnuaireFormJob
| RecomputeGraphJob
deriving (Show, Eq, Ord, Enum, Bounded)
data Env = Env
{ _env_settings :: !Settings
, _env_logger :: !LoggerSet
......@@ -31,6 +56,7 @@ data Env = Env
, _env_manager :: !Manager
, _env_self_url :: !BaseUrl
, _env_scrapers :: !ScrapersEnv
, _env_jobs :: !(Jobs.JobEnv GargJob (Dual [JobLog]) JobLog)
, _env_config :: !GargConfig
, _env_mail :: !MailConfig
}
......@@ -62,13 +88,15 @@ instance HasSettings Env where
instance HasMail Env where
mailSettings = env_mail
instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
_env = env_scrapers . Servant.Job.Core._env
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Dual [JobLog]) JobLog where
getJobEnv = asks (view env_jobs)
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
}
......
......@@ -47,6 +47,9 @@ import Gargantext.Database.Prelude (databaseParameters)
import Gargantext.Prelude
-- import Gargantext.Prelude.Config (gc_repofilepath)
import qualified Gargantext.Prelude.Mail as Mail
import qualified Gargantext.Utils.Jobs as Jobs
import qualified Gargantext.Utils.Jobs.Monad as Jobs
import qualified Gargantext.Utils.Jobs.Queue as Jobs
devSettings :: FilePath -> IO Settings
devSettings jwkFile = do
......@@ -177,12 +180,19 @@ newEnv port file = do
panic "TODO: conflicting settings of port"
config_env <- readConfig file
prios <- Jobs.readPrios (file <> ".jobs")
let prios' = Jobs.applyPrios prios Jobs.defaultPrios
putStrLn $ "Overrides: " <> show prios
putStrLn $ "New priorities: " <> show prios'
self_url_env <- parseBaseUrl $ "http://0.0.0.0:" <> show port
dbParam <- databaseParameters file
pool <- newPool dbParam
--nodeStory_env <- readNodeStoryEnv (_gc_repofilepath config_env)
nodeStory_env <- readNodeStoryEnv pool
scrapers_env <- newJobEnv defaultSettings manager_env
secret <- Jobs.genSecret
jobs_env <- Jobs.newJobEnv (Jobs.defaultJobSettings secret) prios' manager_env
logger <- newStderrLoggerSet defaultBufSize
config_mail <- Mail.readConfig file
......@@ -193,6 +203,7 @@ newEnv port file = do
, _env_nodeStory = nodeStory_env
, _env_manager = manager_env
, _env_scrapers = scrapers_env
, _env_jobs = jobs_env
, _env_self_url = self_url_env
, _env_config = config_env
, _env_mail = config_mail
......
This diff is collapsed.
......@@ -95,6 +95,7 @@ import Data.Text (Text, isInfixOf, unpack, pack)
import Data.Text.Lazy.IO as DTL
import Formatting (hprint, int, (%))
import GHC.Generics (Generic)
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job
......@@ -118,7 +119,7 @@ import Gargantext.Prelude hiding (log)
import Gargantext.Prelude.Clock (hasTime, getTime)
import Prelude (error)
import Servant hiding (Patch)
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
import Gargantext.Utils.Jobs (serveJobsAPI)
import System.IO (stderr)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary (Arbitrary, arbitrary)
......@@ -774,28 +775,23 @@ getTableNgramsDoc dId tabType listId limit_ offset listType minSize maxSize orde
apiNgramsTableCorpus :: ( GargServerC env err m
)
=> NodeId -> ServerT TableNgramsApi m
apiNgramsTableCorpus :: NodeId -> ServerT TableNgramsApi (GargM Env GargError)
apiNgramsTableCorpus cId = getTableNgramsCorpus cId
:<|> tableNgramsPut
:<|> scoresRecomputeTableNgrams cId
:<|> getTableNgramsVersion cId
:<|> apiNgramsAsync cId
apiNgramsTableDoc :: ( GargServerC env err m
)
=> DocId -> ServerT TableNgramsApi m
apiNgramsTableDoc :: DocId -> ServerT TableNgramsApi (GargM Env GargError)
apiNgramsTableDoc dId = getTableNgramsDoc dId
:<|> tableNgramsPut
:<|> scoresRecomputeTableNgrams dId
:<|> getTableNgramsVersion dId
:<|> apiNgramsAsync dId
apiNgramsAsync :: NodeId -> GargServer TableNgramsAsyncApi
apiNgramsAsync :: NodeId -> ServerT TableNgramsAsyncApi (GargM Env GargError)
apiNgramsAsync _dId =
serveJobsAPI $
JobFunction $ \i log ->
serveJobsAPI TableNgramsJob $ \i log ->
let
log' x = do
printDebug "tableNgramsPostChartsAsync" x
......
......@@ -23,13 +23,14 @@ import Data.Maybe (catMaybes, fromMaybe)
import Data.Set (Set)
import Data.Text (Text, concat, pack, splitOn)
import Data.Vector (Vector)
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Ngrams (setListNgrams)
import Gargantext.API.Ngrams.List.Types
import Gargantext.API.Ngrams.Prelude (getNgramsList)
import Gargantext.API.Ngrams.Tools (getTermsWith)
import Gargantext.API.Ngrams.Types
import Gargantext.API.Prelude (GargServer)
import Gargantext.API.Prelude (GargServer, GargM, GargError)
import Gargantext.API.Types
import Gargantext.Core.NodeStory
import Gargantext.Core.Text.Terms (ExtractedNgrams(..))
......@@ -46,8 +47,9 @@ import Gargantext.Database.Schema.Ngrams
import Gargantext.Database.Schema.Node (_node_parent_id)
import Gargantext.Database.Types (Indexed(..))
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant
import Servant.Job.Async
-- import Servant.Job.Async
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Csv as Csv
import qualified Data.HashMap.Strict as HashMap
......@@ -75,7 +77,7 @@ type JSONAPI = Summary "Update List"
:> "async"
:> AsyncJobs JobLog '[FormUrlEncoded] WithFile JobLog
jsonApi :: GargServer JSONAPI
jsonApi :: ServerT JSONAPI (GargM Env GargError)
jsonApi = postAsync
----------------------
......@@ -88,7 +90,7 @@ type CSVAPI = Summary "Update List (legacy v3 CSV)"
:> "async"
:> AsyncJobs JobLog '[FormUrlEncoded] WithTextFile JobLog
csvApi :: GargServer CSVAPI
csvApi :: ServerT CSVAPI (GargM Env GargError)
csvApi = csvPostAsync
------------------------------------------------------------------------
......@@ -188,15 +190,14 @@ type PostAPI = Summary "Update List"
:> "async"
:> AsyncJobs JobLog '[FormUrlEncoded] WithFile JobLog
postAsync :: GargServer JSONAPI
postAsync :: ListId -> ServerT PostAPI (GargM Env GargError)
postAsync lId =
serveJobsAPI $
JobFunction (\f log' ->
serveJobsAPI UpdateNgramsListJobJSON $ \f log' ->
let
log'' x = do
-- printDebug "postAsync ListId" x
liftBase $ log' x
in postAsync' lId f log'')
in postAsync' lId f log''
postAsync' :: FlowCmdM env err m
=> ListId
......@@ -291,10 +292,9 @@ csvPost l m = do
pure True
------------------------------------------------------------------------
csvPostAsync :: GargServer CSVAPI
csvPostAsync :: ServerT CSVAPI (GargM Env GargError)
csvPostAsync lId =
serveJobsAPI $
JobFunction $ \f@(WithTextFile ft _ n) log' -> do
serveJobsAPI UpdateNgramsListJobCSV $ \f@(WithTextFile ft _ n) log' -> do
let log'' x = do
printDebug "[csvPostAsync] filetype" ft
printDebug "[csvPostAsync] name" n
......
......@@ -36,6 +36,7 @@ import Data.Text (Text())
import GHC.Generics (Generic)
import Gargantext.API.Admin.Auth (withAccess)
import Gargantext.API.Admin.Auth.Types (PathId(..))
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Metrics
import Gargantext.API.Ngrams (TableNgramsApi, apiNgramsTableCorpus)
import Gargantext.API.Ngrams.Types (TabType(..))
......@@ -196,10 +197,10 @@ nodeAPI :: forall proxy a.
) => proxy a
-> UserId
-> NodeId
-> GargServer (NodeAPI a)
-> ServerT (NodeAPI a) (GargM Env GargError)
nodeAPI p uId id' = withAccess (Proxy :: Proxy (NodeAPI a)) Proxy uId (PathNode id') nodeAPI'
where
nodeAPI' :: GargServer (NodeAPI a)
nodeAPI' :: ServerT (NodeAPI a) (GargM Env GargError)
nodeAPI' = getNodeWith id' p
:<|> rename id'
:<|> postNode uId id'
......
......@@ -30,14 +30,14 @@ import Data.Swagger
import Data.Text (Text)
import GHC.Generics (Generic)
import Servant
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Node
import Gargantext.API.Prelude (GargServer, simuLogs)
import Gargantext.API.Prelude (GargError, GargM, simuLogs)
import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Terms (TermType(..))
import Gargantext.Core.Types.Individu (User(..))
......@@ -48,6 +48,7 @@ import Gargantext.Database.Admin.Types.Hyperdata.Contact (hyperdataContact)
import Gargantext.Database.Admin.Types.Node
import Gargantext.Prelude (($), liftBase, (.), printDebug, pure)
import qualified Gargantext.Utils.Aeson as GUA
import Gargantext.Utils.Jobs (serveJobsAPI)
------------------------------------------------------------------------
type API = "contact" :> Summary "Contact endpoint"
......@@ -56,7 +57,7 @@ type API = "contact" :> Summary "Contact endpoint"
:> NodeNodeAPI HyperdataContact
api :: UserId -> CorpusId -> GargServer API
api :: UserId -> CorpusId -> ServerT API (GargM Env GargError)
api uid cid = (api_async (RootId (NodeId uid)) cid)
:<|> (nodeNodeAPI (Proxy :: Proxy HyperdataContact) uid cid)
......@@ -70,16 +71,14 @@ data AddContactParams = AddContactParams { firstname :: !Text, lastname
deriving (Generic)
----------------------------------------------------------------------
api_async :: User -> NodeId -> GargServer API_Async
api_async :: User -> NodeId -> ServerT API_Async (GargM Env GargError)
api_async u nId =
serveJobsAPI $
JobFunction (\p log ->
serveJobsAPI AddContactJob $ \p log ->
let
log' x = do
printDebug "addContact" x
liftBase $ log x
in addContact u nId p (liftBase . log')
)
addContact :: (HasSettings env, FlowCmdM env err m)
=> User
......
......@@ -10,9 +10,9 @@ import Data.Aeson
import Data.Swagger (ToSchema)
import GHC.Generics (Generic)
import Servant
import Servant.Job.Async
import qualified Data.Text as T
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogSuccess)
import Gargantext.API.Prelude
......@@ -28,14 +28,15 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType')
import Gargantext.Prelude
import Gargantext.Database.Admin.Types.Hyperdata.Corpus (HyperdataCorpus(..))
import Gargantext.Utils.Jobs (serveJobsAPI)
data DocumentUpload = DocumentUpload
{ _du_abstract :: T.Text
, _du_authors :: T.Text
, _du_sources :: T.Text
, _du_title :: T.Text
, _du_date :: T.Text
, _du_title :: T.Text
, _du_date :: T.Text
}
deriving (Generic)
......@@ -65,12 +66,10 @@ type API = Summary " Document upload"
:> "async"
:> AsyncJobs JobLog '[JSON] DocumentUpload JobLog
api :: UserId -> NodeId -> GargServer API
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI $
JobFunction (\q log' -> do
serveJobsAPI UploadDocumentJob $ \q log' -> do
documentUploadAsync uId nId q (liftBase . log')
)
documentUploadAsync :: (FlowCmdM env err m)
=> UserId
......@@ -99,8 +98,8 @@ documentUpload nId doc = do
let cId = case mcId of
Just c -> c
Nothing -> panic $ T.pack $ "[G.A.N.DU] Node has no corpus parent: " <> show nId
(theFullDate, (year, month, day)) <- liftBase $ dateSplit EN
(theFullDate, (year, month, day)) <- liftBase $ dateSplit EN
$ Just
$ view du_date doc <> "T:0:0:0"
......@@ -123,9 +122,7 @@ documentUpload nId doc = do
, _hd_publication_minute = Nothing
, _hd_publication_second = Nothing
, _hd_language_iso2 = Just $ T.pack $ show EN }
docIds <- insertMasterDocs (Nothing :: Maybe HyperdataCorpus) (Multi EN) [hd]
_ <- Doc.add cId docIds
pure docIds
......@@ -22,10 +22,11 @@ import Data.Aeson
import Data.Either (Either(..), rights)
import Data.Swagger
import qualified Data.Text as T
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Job (jobLogSuccess, jobLogFailTotalWithMessage)
import Gargantext.API.Prelude (GargServer)
import Gargantext.API.Prelude (GargM, GargError)
import Gargantext.Core (Lang(..))
import Gargantext.Core.Text.Corpus.Parsers.FrameWrite
import Gargantext.Core.Text.Terms (TermType(..))
......@@ -39,9 +40,9 @@ import Gargantext.Database.Query.Table.Node (getChildrenByType, getClosestParent
import Gargantext.Database.Schema.Node (node_hyperdata)
import qualified Gargantext.Defaults as Defaults
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import GHC.Generics (Generic)
import Servant
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
------------------------------------------------------------------------
type API = Summary " Documents from Write nodes."
......@@ -55,15 +56,13 @@ instance ToJSON Params where
toJSON = genericToJSON defaultOptions
instance ToSchema Params
------------------------------------------------------------------------
api :: UserId -> NodeId -> GargServer API
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI $
JobFunction (\p log'' ->
serveJobsAPI DocumentFromWriteNodeJob $ \p log'' ->
let
log' x = do
liftBase $ log'' x
in documentsFromWriteNodes uId nId p (liftBase . log')
)
documentsFromWriteNodes :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -11,7 +11,6 @@ import Data.Swagger
import Data.Text
import GHC.Generics (Generic)
import Servant
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.MIME.Types as DMT
......@@ -19,6 +18,7 @@ import qualified Gargantext.Database.GargDB as GargDB
import qualified Network.HTTP.Media as M
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Node.Types
import Gargantext.API.Prelude
......@@ -31,6 +31,7 @@ import Gargantext.Database.Query.Table.Node (getNodeWith)
import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Data.Either
data RESPONSE deriving Typeable
......@@ -99,15 +100,14 @@ type FileAsyncApi = Summary "File Async Api"
:> "add"
:> AsyncJobs JobLog '[FormUrlEncoded] NewWithFile JobLog
fileAsyncApi :: UserId -> NodeId -> GargServer FileAsyncApi
fileAsyncApi :: UserId -> NodeId -> ServerT FileAsyncApi (GargM Env GargError)
fileAsyncApi uId nId =
serveJobsAPI $
JobFunction (\i l ->
serveJobsAPI AddFileJob $ \i l ->
let
log' x = do
printDebug "addWithFile" x
liftBase $ l x
in addWithFile uId nId i log')
in addWithFile uId nId i log'
addWithFile :: (HasSettings env, FlowCmdM env err m)
......
......@@ -14,9 +14,9 @@ import GHC.Generics (Generic)
import Network.HTTP.Client (newManager, httpLbs, parseRequest, responseBody)
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Servant
import Servant.Job.Async
import Web.FormUrlEncoded (FromForm)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Job (jobLogInit, jobLogSuccess, jobLogFail)
import Gargantext.API.Node.Corpus.New (addToCorpusWithForm)
......@@ -31,6 +31,7 @@ import Gargantext.Database.Prelude (HasConfig)
import Gargantext.Database.Query.Table.Node (getClosestParentIdByType, getNodeWith)
import Gargantext.Database.Schema.Node (node_hyperdata)
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
data FrameCalcUpload = FrameCalcUpload ()
deriving (Generic)
......@@ -46,12 +47,11 @@ type API = Summary " FrameCalc upload"
:> "async"
:> AsyncJobs JobLog '[JSON] FrameCalcUpload JobLog
api :: UserId -> NodeId -> GargServer API
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI $
JobFunction (\p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
)
serveJobsAPI UploadFrameCalcJob $ \p logs ->
frameCalcUploadAsync uId nId p (liftBase . logs) (jobLogInit 5)
frameCalcUploadAsync :: (HasConfig env, FlowCmdM env err m)
......
......@@ -18,10 +18,6 @@ Polymorphic Get Node API
module Gargantext.API.Node.Get
where
-- import Gargantext.API.Admin.Types (HasSettings)
-- import Servant.Job.Async (JobFunction(..), serveJobsAPI)
-- import Test.QuickCheck (elements)
-- import Gargantext.Database.Action.Flow.Types (FlowCmdM)
import Data.Aeson
import Data.Swagger
import GHC.Generics (Generic)
......@@ -30,7 +26,7 @@ import Test.QuickCheck.Arbitrary
import Gargantext.API.Prelude
import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (JSONB{-, getNodeWith-})
import Gargantext.Database.Prelude (JSONB)
import Gargantext.Prelude
------------------------------------------------------------------------
......
......@@ -26,11 +26,11 @@ import Data.Swagger
import Data.Text (Text)
import GHC.Generics (Generic)
import Servant
import Servant.Job.Async
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
import Web.FormUrlEncoded (FromForm, ToForm)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Prelude
import Gargantext.Database.Action.Flow.Types
......@@ -41,6 +41,7 @@ import Gargantext.Database.Query.Table.Node.Error (HasNodeError(..))
import Gargantext.Database.Query.Table.Node.User
import Gargantext.Database.Schema.Node
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
------------------------------------------------------------------------
data PostNode = PostNode { pn_name :: Text
......@@ -73,10 +74,11 @@ type PostNodeAsync = Summary "Post Node"
:> AsyncJobs JobLog '[FormUrlEncoded] PostNode JobLog
postNodeAsyncAPI :: UserId -> NodeId -> GargServer PostNodeAsync
postNodeAsyncAPI
:: UserId -> NodeId -> ServerT PostNodeAsync (GargM Env GargError)
postNodeAsyncAPI uId nId =
serveJobsAPI $
JobFunction (\p logs -> postNodeAsync uId nId p (liftBase . logs))
serveJobsAPI NewNodeJob $ \p logs ->
postNodeAsync uId nId p (liftBase . logs)
------------------------------------------------------------------------
postNodeAsync :: FlowCmdM env err m
......
......@@ -21,11 +21,12 @@ import Data.Aeson
import Data.Maybe (Maybe(..), fromMaybe)
import Data.Swagger
import GHC.Generics (Generic)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types (JobLog(..), AsyncJobs)
import Gargantext.API.Admin.Types (HasSettings)
import Gargantext.API.Ngrams.List (reIndexWith)
--import Gargantext.API.Ngrams.Types (TabType(..))
import Gargantext.API.Prelude (GargServer, simuLogs)
import Gargantext.API.Prelude (GargM, GargError, simuLogs)
import Gargantext.Core.Methods.Distances (GraphMetric(..))
import Gargantext.Core.Types.Main (ListType(..))
import Gargantext.Core.Viz.Graph (Strength)
......@@ -43,9 +44,9 @@ import Gargantext.Database.Query.Table.Node.UpdateOpaleye (updateHyperdata)
import Gargantext.Database.Schema.Ngrams (NgramsType(NgramsTerms))
import Gargantext.Database.Schema.Node (node_parent_id)
import Gargantext.Prelude (Bool(..), Ord, Eq, (<$>), ($), liftBase, (.), printDebug, pure, show, cs, (<>), panic, (<*>))
import Gargantext.Utils.Jobs (serveJobsAPI)
import Prelude (Enum, Bounded, minBound, maxBound)
import Servant
import Servant.Job.Async (JobFunction(..), serveJobsAPI)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
import qualified Data.Set as Set
......@@ -88,16 +89,14 @@ data Charts = Sources | Authors | Institutes | Ngrams | All
deriving (Generic, Eq, Ord, Enum, Bounded)
------------------------------------------------------------------------
api :: UserId -> NodeId -> GargServer API
api :: UserId -> NodeId -> ServerT API (GargM Env GargError)
api uId nId =
serveJobsAPI $
JobFunction (\p log'' ->
serveJobsAPI UpdateNodeJob $ \p log'' ->
let
log' x = do
printDebug "updateNode" x
liftBase $ log'' x
in updateNode uId nId p (liftBase . log')
)
updateNode :: (HasSettings env, FlowCmdM env err m)
=> UserId
......
......@@ -40,6 +40,7 @@ import Gargantext.Database.Prelude
import Gargantext.Database.Query.Table.Node.Error (NodeError(..), HasNodeError(..))
import Gargantext.Database.Query.Tree
import Gargantext.Prelude
import qualified Gargantext.Utils.Jobs.Monad as Jobs
import Servant
import Servant.Job.Async
import Servant.Job.Core (HasServerError(..), serverError)
......@@ -108,6 +109,7 @@ data GargError
| GargInvalidError Validation
| GargJoseError Jose.Error
| GargServerError ServerError
| GargJobError Jobs.JobError
deriving (Show, Typeable)
makePrisms ''GargError
......
......@@ -25,11 +25,11 @@ import Data.Validity
import Servant
import Servant.Auth as SA
import Servant.Auth.Swagger ()
import Servant.Job.Async
import Servant.Swagger.UI
import Gargantext.API.Admin.Auth (ForgotPasswordAPI, ForgotPasswordAsyncAPI, withAccess)
import Gargantext.API.Admin.Auth.Types (AuthRequest, AuthResponse, AuthenticatedUser(..), PathId(..))
import Gargantext.API.Admin.EnvTypes (Env, GargJob(..))
import Gargantext.API.Admin.FrontEnd (FrontEndAPI)
import Gargantext.API.Context
import Gargantext.API.Count (CountAPI, count, Query)
......@@ -44,6 +44,7 @@ import Gargantext.Database.Admin.Types.Node
import Gargantext.Database.Prelude (HasConfig(..))
import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_max_docs_scrapers)
import Gargantext.Utils.Jobs (serveJobsAPI)
import qualified Gargantext.API.GraphQL as GraphQL
import qualified Gargantext.API.Ngrams.List as List
import qualified Gargantext.API.Node.Contact as Contact
......@@ -219,7 +220,8 @@ serverGargAdminAPI = roots
:<|> nodesAPI
serverPrivateGargAPI' :: AuthenticatedUser -> GargServer GargPrivateAPI'
serverPrivateGargAPI'
:: AuthenticatedUser -> ServerT GargPrivateAPI' (GargM Env GargError)
serverPrivateGargAPI' (AuthenticatedUser (NodeId uid))
= serverGargAdminAPI
:<|> nodeAPI (Proxy :: Proxy HyperdataAny) uid
......@@ -272,47 +274,35 @@ waitAPI n = do
pure $ "Waited: " <> (cs $ show n)
----------------------------------------
addCorpusWithQuery :: User -> GargServer New.AddWithQuery
addCorpusWithQuery :: User -> ServerT New.AddWithQuery (GargM Env GargError)
addCorpusWithQuery user cid =
serveJobsAPI $
JobFunction (\q log' -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . log')
serveJobsAPI AddCorpusQueryJob $ \q log' -> do
limit <- view $ hasConfig . gc_max_docs_scrapers
New.addToCorpusWithQuery user cid q (Just limit) (liftBase . log')
{- let log' x = do
printDebug "addToCorpusWithQuery" x
liftBase $ log x
-}
)
{-
addWithFile :: GargServer New.AddWithFile
addWithFile cid i f =
serveJobsAPI $
JobFunction (\_i log -> New.addToCorpusWithFile cid i f (liftBase . log))
-}
addCorpusWithForm :: User -> GargServer New.AddWithForm
addCorpusWithForm :: User -> ServerT New.AddWithForm (GargM Env GargError)
addCorpusWithForm user cid =
serveJobsAPI $
JobFunction (\i log' ->
serveJobsAPI AddCorpusFormJob $ \i log' ->
let
log'' x = do
printDebug "[addToCorpusWithForm] " x
liftBase $ log' x
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3))
in New.addToCorpusWithForm user cid i log'' (jobLogInit 3)
addCorpusWithFile :: User -> GargServer New.AddWithFile
addCorpusWithFile :: User -> ServerT New.AddWithFile (GargM Env GargError)
addCorpusWithFile user cid =
serveJobsAPI $
JobFunction (\i log' ->
serveJobsAPI AddCorpusFileJob $ \i log' ->
let
log'' x = do
printDebug "[addToCorpusWithFile]" x
liftBase $ log' x
in New.addToCorpusWithFile user cid i log'')
in New.addToCorpusWithFile user cid i log''
addAnnuaireWithForm :: GargServer Annuaire.AddWithForm
addAnnuaireWithForm :: ServerT Annuaire.AddWithForm (GargM Env GargError)
addAnnuaireWithForm cid =
serveJobsAPI $
JobFunction (\i log' -> Annuaire.addToAnnuaireWithForm cid i (liftBase . log'))
serveJobsAPI AddAnnuaireFormJob $ \i log' ->
Annuaire.addToAnnuaireWithForm cid i (liftBase . log')
......@@ -17,7 +17,6 @@ module Gargantext.API.Server where
import Control.Lens ((^.))
import Control.Monad.Except (withExceptT)
import Control.Monad.Reader (runReaderT)
import Data.Aeson
import Data.Text (Text)
import Data.Version (showVersion)
import Servant
......@@ -29,6 +28,7 @@ import qualified Gargantext.API.Public as Public
import Gargantext.API.Admin.Auth.Types (AuthContext)
import Gargantext.API.Admin.Auth (auth, forgotPassword, forgotPasswordAsync)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Admin.FrontEnd (frontEndServer)
import qualified Gargantext.API.GraphQL as GraphQL
import Gargantext.API.Prelude
......@@ -41,7 +41,7 @@ import Gargantext.Prelude
import Gargantext.Prelude.Config (gc_url_backend_api)
serverGargAPI :: ToJSON err => Text -> GargServerM env err GargAPI
serverGargAPI :: Text -> ServerT GargAPI (GargM Env GargError)
serverGargAPI baseUrl -- orchestrator
= auth
:<|> forgotPassword
......@@ -56,7 +56,7 @@ serverGargAPI baseUrl -- orchestrator
gargVersion = pure (cs $ showVersion PG.version)
-- | Server declarations
server :: forall env. (Typeable env, EnvC env) => env -> IO (Server API)
server :: Env -> IO (Server API)
server env = do
-- orchestrator <- scrapyOrchestrator env
pure $ swaggerSchemaUIServer swaggerDoc
......@@ -72,7 +72,7 @@ server env = do
GraphQL.api
:<|> frontEndServer
where
transform :: forall a. GargM env GargError a -> Handler a
transform :: forall a. GargM Env GargError a -> Handler a
transform = Handler . withExceptT showAsServantErr . (`runReaderT` env)
......
......@@ -18,12 +18,12 @@ module Gargantext.API.ThrowAll where
import Control.Monad.Except (MonadError(..))
import Control.Lens ((#))
import Data.Aeson
import Servant
import Servant.Auth.Server (AuthResult(..))
import Gargantext.Prelude
import Gargantext.API.Prelude (GargServerM, _ServerError)
import Gargantext.API.Admin.EnvTypes (Env)
import Gargantext.API.Prelude
import Gargantext.API.Routes (GargPrivateAPI, serverPrivateGargAPI')
class ThrowAll' e a | a -> e where
......@@ -46,7 +46,8 @@ instance {-# OVERLAPPING #-} ThrowAll' e b => ThrowAll' e (a -> b) where
instance {-# OVERLAPPABLE #-} (MonadError e m) => ThrowAll' e (m a) where
throwAll' = throwError
serverPrivateGargAPI :: ToJSON err => GargServerM env err GargPrivateAPI
serverPrivateGargAPI
:: ServerT GargPrivateAPI (GargM Env GargError)
serverPrivateGargAPI (Authenticated auser) = serverPrivateGargAPI' auser
serverPrivateGargAPI _ = throwAll' (_ServerError # err401)
-- Here throwAll' requires a concrete type for the monad.
......@@ -23,6 +23,7 @@ import Data.Swagger
import Data.Text hiding (head)
import Debug.Trace (trace)
import GHC.Generics (Generic)
import Gargantext.API.Admin.EnvTypes (GargJob(..), Env)
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Ngrams.Tools
import Gargantext.API.Prelude
......@@ -46,8 +47,9 @@ import Gargantext.Database.Query.Table.Node.User (getNodeUser)
import Gargantext.Database.Schema.Node
import Gargantext.Database.Schema.Ngrams
import Gargantext.Prelude
import Gargantext.Utils.Jobs (serveJobsAPI)
import Servant
import Servant.Job.Async
import Servant.Job.Async (AsyncJobsAPI)
import Servant.XML
import qualified Data.HashMap.Strict as HashMap
......@@ -72,7 +74,7 @@ instance FromJSON GraphVersions
instance ToJSON GraphVersions
instance ToSchema GraphVersions
graphAPI :: UserId -> NodeId -> GargServer GraphAPI
graphAPI :: UserId -> NodeId -> ServerT GraphAPI (GargM Env GargError)
graphAPI u n = getGraph u n
:<|> graphAsync u n
:<|> graphClone u n
......@@ -231,10 +233,10 @@ type GraphAsyncAPI = Summary "Recompute graph"
:> AsyncJobsAPI JobLog () JobLog
graphAsync :: UserId -> NodeId -> GargServer GraphAsyncAPI
graphAsync :: UserId -> NodeId -> ServerT GraphAsyncAPI (GargM Env GargError)
graphAsync u n =
serveJobsAPI $
JobFunction (\_ log' -> graphRecompute u n (liftBase . log'))
serveJobsAPI RecomputeGraphJob $ \_ log' ->
graphRecompute u n (liftBase . log')
--graphRecompute :: UserId
......
module Gargantext.Utils.Jobs where
import Control.Monad.Except
import Control.Monad.Reader
import Prelude
import System.Directory (doesFileExist)
import Text.Read (readMaybe)
import Gargantext.API.Admin.EnvTypes
import Gargantext.API.Admin.Orchestrator.Types (JobLog)
import Gargantext.API.Prelude
import qualified Gargantext.Utils.Jobs.API as API
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import qualified Servant.Job.Async as SJ
jobErrorToGargError
:: JobError -> GargError
jobErrorToGargError = GargJobError
serveJobsAPI
:: Foldable callbacks
=> GargJob
-> (input -> Logger JobLog -> GargM Env GargError JobLog)
-> JobsServerAPI ctI ctO callbacks input
serveJobsAPI t f = API.serveJobsAPI ask t jobErrorToGargError $ \env i l -> do
putStrLn ("Running job of type: " ++ show t)
runExceptT $ runReaderT (f i l) env
type JobsServerAPI ctI ctO callbacks input =
SJ.AsyncJobsServerT' ctI ctO callbacks JobLog input JobLog
(GargM Env GargError)
parseGargJob :: String -> Maybe GargJob
parseGargJob s = case s of
"tablengrams" -> Just TableNgramsJob
"forgotpassword" -> Just ForgotPasswordJob
"updatengramslistjson" -> Just UpdateNgramsListJobJSON
"updatengramslistcsv" -> Just UpdateNgramsListJobCSV
"addcontact" -> Just AddContactJob
"addfile" -> Just AddFileJob
"documentfromwritenode" -> Just DocumentFromWriteNodeJob
"updatenode" -> Just UpdateNodeJob
"updateframecalc" -> Just UploadFrameCalcJob
"updatedocument" -> Just UploadDocumentJob
"newnode" -> Just NewNodeJob
"addcorpusquery" -> Just AddCorpusQueryJob
"addcorpusform" -> Just AddCorpusFormJob
"addcorpusfile" -> Just AddCorpusFileJob
"addannuaireform" -> Just AddAnnuaireFormJob
"recomputegraph" -> Just RecomputeGraphJob
_ -> Nothing
parsePrios :: [String] -> IO [(GargJob, Int)]
parsePrios [] = return []
parsePrios (x : xs) = (:) <$> go x <*> parsePrios xs
where go s = case break (=='=') s of
([], _) -> error "parsePrios: empty jobname?"
(prop, valS)
| Just val <- readMaybe (tail valS)
, Just j <- parseGargJob prop -> return (j, val)
| otherwise -> error $
"parsePrios: invalid input. " ++ show (prop, valS)
readPrios :: FilePath -> IO [(GargJob, Int)]
readPrios fp = do
exists <- doesFileExist fp
case exists of
False -> do
putStrLn $
"Warning: " ++ fp ++ " doesn't exist, using default job priorities."
return []
True -> parsePrios . lines =<< readFile fp
{-# LANGUAGE TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.API where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Lens
import Control.Monad
import Control.Monad.Except
import Data.Aeson (ToJSON)
import Data.Monoid
import Prelude
import Servant.API
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import qualified Data.Text as T
import qualified Servant.Client as C
import qualified Servant.Job.Async as SJ
import qualified Servant.Job.Client as SJ
import qualified Servant.Job.Types as SJ
serveJobsAPI
:: ( Ord t, Exception e, MonadError e m
, MonadJob m t (Dual [event]) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callback
)
=> m env
-> t
-> (JobError -> e)
-> (env -> input -> Logger event -> IO (Either e output))
-> SJ.AsyncJobsServerT' ctI ctO callback event input output m
serveJobsAPI getenv t joberr f
= newJob getenv t f (SJ.JobInput undefined Nothing)
:<|> newJob getenv t f
:<|> serveJobAPI t joberr
serveJobAPI
:: forall (m :: * -> *) e t event output.
(Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
=> t
-> (JobError -> e)
-> SJ.JobID 'SJ.Unsafe
-> SJ.AsyncJobServerT event output m
serveJobAPI t joberr jid' = wrap' (killJob t)
:<|> wrap' pollJob
:<|> wrap (waitJob joberr)
where wrap
:: forall a.
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
-> m a
wrap g = do
jid <- handleIDError joberr (checkJID jid')
job <- maybe (throwError $ joberr UnknownJob) pure =<< findJob jid
g jid job
wrap' g limit offset = wrap (g limit offset)
newJob
:: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callbacks
)
=> m env
-> t
-> (env -> input -> Logger event -> IO (Either e output))
-> SJ.JobInput callbacks input
-> m (SJ.JobStatus 'SJ.Safe event)
newJob getenv jobkind f input = do
je <- getJobEnv
env <- getenv
let postCallback m = forM_ (input ^. SJ.job_callback) $ \url ->
C.runClientM (SJ.clientMCallback m)
(C.mkClientEnv (jeManager je) (url ^. SJ.base_url))
pushLog logF e = do
postCallback (SJ.mkChanEvent e)
logF e
f' inp logF = do
r <- f env inp (pushLog logF . Dual . (:[]))
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
jid <- queueJob jobkind (input ^. SJ.job_input) f'
return (SJ.JobStatus jid [] SJ.IsPending Nothing)
pollJob
:: MonadJob m t (Dual [event]) output
=> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> m (SJ.JobStatus 'SJ.Safe event)
pollJob limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
<*> pure SJ.IsRunning
<*> pure Nothing
DoneJ ls r ->
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
in pure (ls, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
waitJob
:: (MonadError e m, MonadJob m t (Dual [event]) output)
=> (JobError -> e)
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> m (SJ.JobOutput output)
waitJob joberr jid je = do
r <- case jTask je of
QueuedJ _qj -> do
m <- getJobsMap
erj <- waitTilRunning
case erj of
Left res -> return res
Right rj -> do
(res, _logs) <- liftIO (waitJobDone jid rj m)
return res
RunningJ rj -> do
m <- getJobsMap
(res, _logs) <- liftIO (waitJobDone jid rj m)
return res
DoneJ _ls res -> return res
either (throwError . joberr . JobException) (pure . SJ.JobOutput) r
where waitTilRunning =
findJob jid >>= \mjob -> case mjob of
Nothing -> error "impossible"
Just je' -> case jTask je' of
QueuedJ _qj -> do
liftIO $ threadDelay 50000 -- wait 50ms
waitTilRunning
RunningJ rj -> return (Right rj)
DoneJ _ls res -> return (Left res)
killJob
:: (Ord t, MonadJob m t (Dual [event]) output)
=> t
-> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> m (SJ.JobStatus 'SJ.Safe event)
killJob t limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
QueuedJ _ -> do
removeJob True t jid
return (mempty, SJ.IsKilled, Nothing)
RunningJ rj -> do
liftIO $ cancel (rjAsync rj)
lgs <- liftIO (rjGetLog rj)
removeJob False t jid
return (lgs, SJ.IsKilled, Nothing)
DoneJ lgs r -> do
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
removeJob False t jid
pure (lgs, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
{-# LANGUAGE GADTs #-}
module Gargantext.Utils.Jobs.Map where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Map (Map)
import Data.Time.Clock
import Prelude
import qualified Data.Map as Map
import Gargantext.Utils.Jobs.Settings
-- | (Mutable) 'Map' containing job id -> job info mapping.
newtype JobMap jid w a = JobMap
{ jobMap :: TVar (Map jid (JobEntry jid w a))
}
-- | Information associated to a job ID
data JobEntry jid w a = JobEntry
{ jID :: jid
, jTask :: J w a
, jTimeoutAfter :: Maybe UTCTime
, jRegistered :: UTCTime
, jStarted :: Maybe UTCTime
, jEnded :: Maybe UTCTime
}
-- | A job computation, which has a different representation depending on the
-- status of the job.
--
-- A queued job consists of the input to the computation and the computation.
-- A running job consists of an 'Async' as well as an action to get the current logs.
-- A done job consists of the result of the computation and the final logs.
data J w a
= QueuedJ (QueuedJob w a)
| RunningJ (RunningJob w a)
| DoneJ w (Either SomeException a)
-- | An unexecuted job is an input paired with a computation
-- to run with it. Input type is "hidden" to
-- be able to store different job types together.
data QueuedJob w r where
QueuedJob :: a -> (a -> Logger w -> IO r) -> QueuedJob w r
-- | A running job points to the async computation for the job and provides a
-- function to peek at the current logs.
data RunningJob w a = RunningJob
{ rjAsync :: Async a
, rjGetLog :: IO w
}
-- | A @'Logger' w@ is a function that can do something with "messages" of type
-- @w@ in IO.
type Logger w = w -> IO ()
newJobMap :: IO (JobMap jid w a)
newJobMap = JobMap <$> newTVarIO Map.empty
-- | Lookup a job by ID
lookupJob
:: Ord jid
=> jid
-> JobMap jid w a
-> IO (Maybe (JobEntry jid w a))
lookupJob jid (JobMap mvar) = Map.lookup jid <$> readTVarIO mvar
-- | Ready to use GC thread
gcThread :: Ord jid => JobSettings -> JobMap jid w a -> IO ()
gcThread js (JobMap mvar) = go
where go = do
now <- getCurrentTime
candidateEntries <- Map.filter (expired now) <$> readTVarIO mvar
forM_ candidateEntries $ \je -> do
mrunningjob <- atomically $ do
case jTask je of
RunningJ rj -> modifyTVar' mvar (Map.delete (jID je))
>> return (Just rj)
_ -> return Nothing
case mrunningjob of
Nothing -> return ()
Just a -> killJ a
threadDelay (jsGcPeriod js * 1000000)
go
expired now jobentry = case jTimeoutAfter jobentry of
Just t -> now >= t
_ -> False
-- | Make a 'Logger' that 'mappend's monoidal values in a 'TVar'.
jobLog :: Semigroup w => TVar w -> Logger w -- w -> IO ()
jobLog logvar = \w -> atomically $ modifyTVar' logvar (\old_w -> old_w <> w)
-- | Generating new 'JobEntry's.
addJobEntry
:: Ord jid
=> jid
-> a
-> (a -> Logger w -> IO r)
-> JobMap jid w r
-> IO (JobEntry jid w r)
addJobEntry jid input f (JobMap mvar) = do
now <- getCurrentTime
let je = JobEntry
{ jID = jid
, jTask = QueuedJ (QueuedJob input f)
, jRegistered = now
, jTimeoutAfter = Nothing
, jStarted = Nothing
, jEnded = Nothing
}
atomically $ modifyTVar' mvar (Map.insert jid je)
return je
deleteJob :: Ord jid => jid -> JobMap jid w a -> STM ()
deleteJob jid (JobMap mvar) = modifyTVar' mvar (Map.delete jid)
runJob
:: (Ord jid, Monoid w)
=> jid
-> QueuedJob w a
-> JobMap jid w a
-> JobSettings
-> IO (RunningJob w a)
runJob jid qj (JobMap mvar) js = do
rj <- runJ qj
now <- getCurrentTime
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jTask = RunningJ rj
, jStarted = Just now
, jTimeoutAfter = Just $ addUTCTime (fromIntegral (jsJobTimeout js)) now
}
return rj
waitJobDone
:: Ord jid
=> jid
-> RunningJob w a
-> JobMap jid w a
-> IO (Either SomeException a, w)
waitJobDone jid rj (JobMap mvar) = do
r <- waitJ rj
now <- getCurrentTime
logs <- rjGetLog rj
atomically $ modifyTVar' mvar $
flip Map.adjust jid $ \je ->
je { jEnded = Just now, jTask = DoneJ logs r }
return (r, logs)
-- | Turn a queued job into a running job by setting up the logging of @w@s and
-- firing up the async action.
runJ :: Monoid w => QueuedJob w a -> IO (RunningJob w a)
runJ (QueuedJob a f) = do
logs <- newTVarIO mempty
act <- async $ f a (jobLog logs)
let readLogs = readTVarIO logs
return (RunningJob act readLogs)
-- | Wait for a running job to return (blocking).
waitJ :: RunningJob w a -> IO (Either SomeException a)
waitJ (RunningJob act _) = waitCatch act
-- | Poll a running job to see if it's done.
pollJ :: RunningJob w a -> IO (Maybe (Either SomeException a))
pollJ (RunningJob act _) = poll act
-- | Kill a running job by cancelling the action.
killJ :: RunningJob w a -> IO ()
killJ (RunningJob act _) = cancel act
{-# LANGUAGE MultiWayIf, FunctionalDependencies, MultiParamTypeClasses #-}
module Gargantext.Utils.Jobs.Monad where
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.State
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import Data.Map (Map)
import Data.Time.Clock
import Network.HTTP.Client (Manager)
import Prelude
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
data JobEnv t w a = JobEnv
{ jeSettings :: JobSettings
, jeState :: JobsState t w a
, jeManager :: Manager
}
newJobEnv
:: (EnumBounded t, Monoid w)
=> JobSettings
-> Map t Prio
-> Manager
-> IO (JobEnv t w a)
newJobEnv js prios mgr = JobEnv js <$> newJobsState js prios <*> pure mgr
defaultJobSettings :: SJ.SecretKey -> JobSettings
defaultJobSettings k = JobSettings
{ jsNumRunners = 2
, jsJobTimeout = 30 * 60 -- 30 minutes
, jsIDTimeout = 30 * 60 -- 30 minutes
, jsGcPeriod = 1 * 60 -- 1 minute
, jsSecretKey = k
}
genSecret :: IO SJ.SecretKey
genSecret = SJ.generateSecretKey
class MonadIO m => MonadJob m t w a | m -> t w a where
getJobEnv :: m (JobEnv t w a)
getJobsSettings :: MonadJob m t w a => m JobSettings
getJobsSettings = jeSettings <$> getJobEnv
getJobsState :: MonadJob m t w a => m (JobsState t w a)
getJobsState = jeState <$> getJobEnv
getJobsMap :: MonadJob m t w a => m (JobMap (SJ.JobID 'SJ.Safe) w a)
getJobsMap = jobsData <$> getJobsState
getJobsQueue :: MonadJob m t w a => m (Queue t (SJ.JobID 'SJ.Safe))
getJobsQueue = jobsQ <$> getJobsState
queueJob
:: (MonadJob m t w a, Ord t)
=> t
-> i
-> (i -> Logger w -> IO a)
-> m (SJ.JobID 'SJ.Safe)
queueJob jobkind input f = do
js <- getJobsSettings
st <- getJobsState
liftIO (pushJob jobkind input f js st)
findJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Safe
-> m (Maybe (JobEntry (SJ.JobID 'SJ.Safe) w a))
findJob jid = do
jmap <- getJobsMap
liftIO $ lookupJob jid jmap
data JobError
= InvalidIDType
| IDExpired
| InvalidMacID
| UnknownJob
| JobException SomeException
deriving Show
checkJID
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> m (Either JobError (SJ.JobID 'SJ.Safe))
checkJID (SJ.PrivateID tn n t d) = do
now <- liftIO getCurrentTime
js <- getJobsSettings
if | tn /= "job" -> return (Left InvalidIDType)
| now > addUTCTime (fromIntegral $ jsIDTimeout js) t -> return (Left IDExpired)
| d /= SJ.macID tn (jsSecretKey js) t n -> return (Left InvalidMacID)
| otherwise -> return $ Right (SJ.PrivateID tn n t d)
withJob
:: MonadJob m t w a
=> SJ.JobID 'SJ.Unsafe
-> (SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) w a -> m r)
-> m (Either JobError (Maybe r))
withJob jid f = do
r <- checkJID jid
case r of
Left e -> return (Left e)
Right jid' -> do
mj <- findJob jid'
case mj of
Nothing -> return (Right Nothing)
Just j -> Right . Just <$> f jid' j
handleIDError
:: MonadError e m
=> (JobError -> e)
-> m (Either JobError a)
-> m a
handleIDError toE act = act >>= \r -> case r of
Left err -> throwError (toE err)
Right a -> return a
removeJob
:: (Ord t, MonadJob m t w a)
=> Bool -- is it queued (and we have to remove jid from queue)
-> t
-> SJ.JobID 'SJ.Safe
-> m ()
removeJob queued t jid = do
q <- getJobsQueue
m <- getJobsMap
liftIO . atomically $ do
when queued $
deleteQueue t jid q
deleteJob jid m
{-# LANGUAGE ConstraintKinds #-}
module Gargantext.Utils.Jobs.Queue where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Data.List
import Data.Ord
import Data.Maybe
import Prelude
import System.IO
import qualified Data.Map as Map
import qualified Data.Vector as Vector
type EnumBounded t = (Ord t, Enum t, Bounded t)
data Q a = Q [a] [a] !Int
emptyQ :: Q a
emptyQ = Q [] [] 0
singletonQ :: a -> Q a
singletonQ a = Q [a] [] 1
snocQ :: a -> Q a -> Q a
snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
normalizeQ :: Q a -> Q a
normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
normalizeQ q = q
deleteQ :: Eq a => a -> Q a -> Q a
deleteQ x (Q xs ys sz) = Q xs' ys' sz'
where (xs_num_x, xs') = go xs (0, [])
(ys_num_x, ys') = go ys (0, [])
sz' = sz - xs_num_x - ys_num_x
go [] (n, bs) = (n, reverse bs)
go (a:as) (n, bs)
| a == x = go as (n+1, bs)
| otherwise = go as (n, a:bs)
popQ :: Q a -> Maybe (a, Q a)
popQ q@(Q as bs sz) = case as of
x:xs -> Just (x, Q xs bs (sz-1))
_ -> case normalizeQ q of
Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
_ -> Nothing
sizeQ :: Q a -> Int
sizeQ (Q _ _ sz) = sz
-- | A priority is just a number. The greater, the earlier the job will get picked.
type Prio = Int
applyPrios
:: Ord t
=> [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
-- | A queue with different kinds of values, described by @t@, where each
-- kind can have a higher or lower priority than other kinds, as described
-- by the 'queuePrios' field.
data Queue t a = Queue
{ queueData :: Vector.Vector (TVar (Q a))
, queueIndices :: Map.Map t Int -- indices into queueData
, queuePrios :: Map.Map t Prio
}
-- | Default priorities for the enumeration of job types @t@: everyone at 0.
defaultPrios :: EnumBounded t => Map.Map t Prio
defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
-- | Create a new queue that'll apply the given priorities
newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
newQueue prios = do
let allTs = [ minBound .. maxBound ]
indices = Map.fromList (zip allTs [0..])
n = Map.size indices
vars <- Vector.replicateM n (newTVarIO emptyQ)
return $ Queue vars indices prios
-- | Add a new element to the queue, with the given kind.
addQueue :: Ord t => t -> a -> Queue t a -> IO ()
addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> atomically $ modifyTVar (queueData q Vector.! i) (snocQ a)
Nothing -> error "addQueue: couldn't find queue for given job kind"
deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
Nothing -> error "deleteQueue: queue type not found?!"
-- | Try to pop the highest priority item off of the queue, per the priorities
-- defined by the @'Map.Map' t 'Prio'@ argument to 'newQueue'.
popQueue :: Ord t => Queue t a -> IO (Maybe a)
popQueue q = go queues
where prios = sortOn (Down . snd) $ Map.toList (queuePrios q)
indices = flip map prios $ \(t, _prio) ->
case Map.lookup t (queueIndices q) of
Just i -> i
Nothing -> error "popQueue: couldn't find queue index for given job kind"
queues = [ queueData q Vector.! i | i <- indices ]
go [] = return Nothing
go (q1:qs) = do
mitem <- atomically $ do
qa <- readTVar q1
case popQ qa of
Just (a, qa') -> writeTVar q1 qa' >> return (Just a)
Nothing -> return Nothing
case mitem of
Nothing -> go qs
a -> return a
-- | A ready-to-use runner that pops the highest priority item off the queue
-- and processes it using the given function.
queueRunner :: Ord t => (a -> IO ()) -> Queue t a -> IO ()
queueRunner f q = go
where go = do
mres <- popQueue q
case mres of
Just a -> f a `catch` exc
Nothing -> return ()
threadDelay 5000 -- 5ms
go
exc :: SomeException -> IO ()
exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
-- | Create a queue and @n@ runner actions for it, with the given priorities
-- for the runners to apply when picking a new item.
newQueueWithRunners
:: EnumBounded t
=> Int -- ^ number of runners
-> Map.Map t Prio -- ^ priorities
-> (a -> IO ()) -- ^ what to do with each item
-> IO (Queue t a, [IO ()])
newQueueWithRunners n prios f = do
q <- newQueue prios
let runners = replicate n (queueRunner f q)
return (q, runners)
module Gargantext.Utils.Jobs.Settings where
import Prelude
import qualified Servant.Job.Core as SJ
-- | A few control knobs for the job system.
data JobSettings = JobSettings
{ jsNumRunners :: Int
, jsJobTimeout :: Int -- in seconds. TODO: timeout per job type? Map t Int
, jsIDTimeout :: Int -- in seconds, how long a job ID is valid
, jsGcPeriod :: Int -- in seconds, how long between each GC
, jsSecretKey :: SJ.SecretKey
}
module Gargantext.Utils.Jobs.State where
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Queue
import Gargantext.Utils.Jobs.Settings
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Map (Map)
import Data.Proxy
import Data.Time.Clock
import Prelude
-- import qualified Data.Map as Map
import qualified Servant.Job.Core as SJ
import qualified Servant.Job.Types as SJ
type IDGenerator = TVar Int
data JobsState t w a = JobsState
{ jobsData :: JobMap (SJ.JobID 'SJ.Safe) w a
, jobsQ :: Queue t (SJ.JobID 'SJ.Safe)
, jobsIdGen :: IDGenerator
, jsGC :: Async ()
, jsRunners :: [Async ()]
}
nextID :: JobSettings -> JobsState t w a -> IO (SJ.JobID 'SJ.Safe)
nextID js st = do
now <- getCurrentTime
n <- atomically $ stateTVar (jobsIdGen st) $ \i -> (i, i+1)
return $ SJ.newID (Proxy :: Proxy "job") (jsSecretKey js) now n
newJobsState
:: (EnumBounded t, Monoid w)
=> JobSettings
-> Map t Prio
-> IO (JobsState t w a)
newJobsState js prios = do
jmap <- newJobMap
idgen <- newTVarIO 0
(q, runners) <- newQueueWithRunners (jsNumRunners js) prios $ \jid -> do
mje <- lookupJob jid jmap
case mje of
Nothing -> return ()
Just je -> case jTask je of
QueuedJ qj -> do
rj <- runJob jid qj jmap js
(_res, _logs) <- waitJobDone jid rj jmap
return ()
_ -> return ()
putStrLn $ "Starting " ++ show (jsNumRunners js) ++ " job runners."
gcAsync <- async $ gcThread js jmap
runnersAsyncs <- traverse async runners
return (JobsState jmap q idgen gcAsync runnersAsyncs)
pushJob
:: Ord t
=> t
-> a
-> (a -> Logger w -> IO r)
-> JobSettings
-> JobsState t w r
-> IO (SJ.JobID 'SJ.Safe)
pushJob jobkind input f js st@(JobsState jmap jqueue _idgen _ _) = do
jid <- nextID js st
_je <- addJobEntry jid input f jmap
addQueue jobkind jid jqueue
return jid
......@@ -85,9 +85,8 @@ extra-deps:
#- arxiv-0.0.3@sha256:02de1114091d11f1f3ab401d104d125ad4301260806feb7f63b3dcefc7db88cf,1588
# NP libs
#- git: https://github.com/np/servant-job.git # waiting for PR
- git: https://github.com/alpmestan/servant-job.git
commit: ceb251b91e8ec1804198422a3cdbdab08d843b79
commit: b4182487cfe479777c11ca19f3c0d47840b376f6
#- git: https://github.com/np/patches-map
- git: https://github.com/delanoe/patches-map
commit: 76cae88f367976ff091e661ee69a5c3126b94694
......
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Data.Either
import Data.List
import Prelude
import Test.Hspec
import Gargantext.Utils.Jobs
import Gargantext.Utils.Jobs.Map
import Gargantext.Utils.Jobs.Monad
import Gargantext.Utils.Jobs.Queue (applyPrios, defaultPrios)
import Gargantext.Utils.Jobs.Settings
import Gargantext.Utils.Jobs.State
data JobT = A | B deriving (Eq, Ord, Show, Enum, Bounded)
data Counts = Counts { countAs :: Int, countBs :: Int }
deriving (Eq, Show)
inc, dec :: JobT -> Counts -> Counts
inc A cs = cs { countAs = countAs cs + 1 }
inc B cs = cs { countBs = countBs cs + 1 }
dec A cs = cs { countAs = countAs cs - 1 }
dec B cs = cs { countBs = countBs cs - 1 }
testMaxRunners = do
-- max runners = 2 with default settings
k <- genSecret
let settings = defaultJobSettings k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
runningJs <- newTVarIO []
let j num _inp l = do
atomically $ modifyTVar runningJs (\xs -> ("Job #" ++ show num) : xs)
-- putStrLn $ "Job #" ++ show num ++ " started"
threadDelay (5 * 1000000) -- 5s
-- putStrLn $ "Job #" ++ show num ++ " done"
atomically $ modifyTVar runningJs (\xs -> filter (/=("Job #" ++ show num)) xs)
jobs = [ (n, j n) | n <- [1..4] ]
jids <- forM jobs $ \(i, f) -> do
-- putStrLn ("Submitting job #" ++ show i)
pushJob A () f settings st
threadDelay 10000 -- 10ms
r1 <- readTVarIO runningJs
-- putStrLn ("Jobs running: " ++ show r1)
sort r1 `shouldBe` ["Job #1", "Job #2"]
threadDelay (6 * 1000000) -- 6s
r2 <- readTVarIO runningJs
sort r2 `shouldBe` ["Job #3", "Job #4"]
threadDelay (5 * 1000000) -- 5s
r3 <- readTVarIO runningJs
r3 `shouldBe` []
testPrios = do
k <- genSecret
let settings = defaultJobSettings k
st :: JobsState JobT [String] () <- newJobsState settings $
applyPrios [(B, 10)] defaultPrios -- B has higher priority
runningJs <- newTVarIO (Counts 0 0)
let j num jobt _inp l = do
atomically $ modifyTVar runningJs (inc jobt)
-- putStrLn $ "Job #" ++ show num ++ " started"
threadDelay (5 * 1000000) -- 5s
-- putStrLn $ "Job #" ++ show num ++ " done"
atomically $ modifyTVar runningJs (dec jobt)
jobs = [ (0, A, j 0 A)
, (1, A, j 1 A)
, (2, B, j 2 B)
, (3, B, j 3 B)
]
jids <- forM jobs $ \(i, t, f) -> do
-- putStrLn ("Submitting job #" ++ show i)
pushJob t () f settings st
threadDelay 10000 -- 10ms
r1 <- readTVarIO runningJs
r1 `shouldBe` (Counts 0 2)
threadDelay (6 * 1000000) -- 6s
r2 <- readTVarIO runningJs
r2 `shouldBe` (Counts 2 0)
threadDelay (5 * 1000000) -- 5s
r3 <- readTVarIO runningJs
r3 `shouldBe` (Counts 0 0)
testExceptions = do
-- max runners = 2 with default settings
k <- genSecret
let settings = defaultJobSettings k
st :: JobsState JobT [String] () <- newJobsState settings defaultPrios
jid <- pushJob A ()
(\_inp _log -> readFile "/doesntexist.txt" >>= putStrLn)
settings st
threadDelay 50000
mjob <- lookupJob jid (jobsData st)
case mjob of
Nothing -> error "boo"
Just je -> case jTask je of
DoneJ _ r -> isLeft r `shouldBe` True
_ -> error "boo2"
return ()
main :: IO ()
main = hspec $ do
describe "job queue" $ do
it "respects max runners limit" $
testMaxRunners
it "respects priorities" $
testPrios
it "can handle exceptions" $
testExceptions
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment