Commit be496999 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

Replace `Dual` Monoid with `Seq` in Job API

parent 29418bb5
......@@ -8,8 +8,8 @@ module Gargantext.API.Admin.EnvTypes where
import Control.Lens
import Control.Monad.Except
import Control.Monad.Reader
import Data.Monoid
import Data.Pool (Pool)
import Data.Sequence (Seq)
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
import Network.HTTP.Client (Manager)
......@@ -58,7 +58,7 @@ data Env = Env
, _env_manager :: !Manager
, _env_self_url :: !BaseUrl
, _env_scrapers :: !ScrapersEnv
, _env_jobs :: !(Jobs.JobEnv GargJob (Dual [JobLog]) JobLog)
, _env_jobs :: !(Jobs.JobEnv GargJob (Seq JobLog) JobLog)
, _env_config :: !GargConfig
, _env_mail :: !MailConfig
, _env_nlp :: !NLPServerMap
......@@ -103,10 +103,10 @@ instance Servant.Job.Core.HasEnv Env (Job JobLog JobLog) where
instance HasJobEnv Env JobLog JobLog where
job_env = env_scrapers
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Dual [JobLog]) JobLog where
instance Jobs.MonadJob (ReaderT Env (ExceptT GargError IO)) GargJob (Seq JobLog) JobLog where
getJobEnv = asks (view env_jobs)
instance Jobs.MonadJobStatus (ReaderT Env (ExceptT GargError IO)) Dual where
instance Jobs.MonadJobStatus (ReaderT Env (ExceptT GargError IO)) where
type JobType (ReaderT Env (ExceptT GargError IO)) = GargJob
type JobOutputType (ReaderT Env (ExceptT GargError IO)) = JobLog
type JobEventType (ReaderT Env (ExceptT GargError IO)) = JobLog
......
......@@ -9,7 +9,6 @@ module Gargantext.Utils.Jobs (
import Control.Monad.Except
import Control.Monad.Reader
import Data.Aeson (ToJSON)
import Data.Monoid (Dual)
import Prelude
import System.Directory (doesFileExist)
import Text.Read (readMaybe)
......@@ -33,7 +32,7 @@ serveJobsAPI
, Show (JobType m)
, ToJSON (JobEventType m)
, ToJSON (JobOutputType m)
, MonadJobStatus m Dual
, MonadJobStatus m
, m ~ (GargM env GargError)
)
=> JobType m
......
......@@ -11,8 +11,11 @@ import Control.Lens
import Control.Monad
import Control.Monad.Except
import Data.Aeson (ToJSON)
import Data.Foldable (toList)
import Data.Monoid
import Data.Kind (Type)
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Prelude
import Servant.API
......@@ -33,7 +36,7 @@ newtype JobHandle =
serveJobsAPI
:: ( Ord t, Exception e, MonadError e m
, MonadJob m t (Dual [event]) output
, MonadJob m t (Seq event) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callback
)
......@@ -49,7 +52,7 @@ serveJobsAPI getenv t joberr f
serveJobAPI
:: forall (m :: Type -> Type) e t event output.
(Ord t, MonadError e m, MonadJob m t (Dual [event]) output)
(Ord t, MonadError e m, MonadJob m t (Seq event) output)
=> t
-> (JobError -> e)
-> SJ.JobID 'SJ.Unsafe
......@@ -60,7 +63,7 @@ serveJobAPI t joberr jid' = wrap' (killJob t)
where wrap
:: forall a.
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output -> m a)
(SJ.JobID 'SJ.Safe -> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output -> m a)
-> m a
wrap g = do
jid <- handleIDError joberr (checkJID jid')
......@@ -70,7 +73,7 @@ serveJobAPI t joberr jid' = wrap' (killJob t)
wrap' g limit offset = wrap (g limit offset)
newJob
:: ( Ord t, Exception e, MonadJob m t (Dual [event]) output
:: ( Ord t, Exception e, MonadJob m t (Seq event) output
, ToJSON e, ToJSON event, ToJSON output
, Foldable callbacks
)
......@@ -91,7 +94,7 @@ newJob getenv jobkind f input = do
logF e
f' jId inp logF = do
r <- f env (JobHandle jId) inp (pushLog logF . Dual . (:[]))
r <- f env (JobHandle jId) inp (pushLog logF . Seq.singleton)
case r of
Left e -> postCallback (SJ.mkChanError e) >> throwIO e
Right a -> postCallback (SJ.mkChanResult a) >> return a
......@@ -100,14 +103,14 @@ newJob getenv jobkind f input = do
return (SJ.JobStatus jid [] SJ.IsPending Nothing)
pollJob
:: MonadJob m t (Dual [event]) output
:: MonadJob m t (Seq event) output
=> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
pollJob limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
(logs, status, merr) <- case jTask je of
QueuedJ _ -> pure (mempty, SJ.IsPending, Nothing)
RunningJ rj -> (,,) <$> liftIO (rjGetLog rj)
<*> pure SJ.IsRunning
......@@ -116,13 +119,13 @@ pollJob limit offset jid je = do
let st = either (const SJ.IsFailure) (const SJ.IsFinished) r
me = either (Just . T.pack . show) (const Nothing) r
in pure (ls, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
waitJob
:: (MonadError e m, MonadJob m t (Dual [event]) output)
:: (MonadError e m, MonadJob m t (Seq event) output)
=> (JobError -> e)
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobOutput output)
waitJob joberr jid je = do
r <- case jTask je of
......@@ -152,15 +155,15 @@ waitJob joberr jid je = do
DoneJ _ls res -> return (Left res)
killJob
:: (Ord t, MonadJob m t (Dual [event]) output)
:: (Ord t, MonadJob m t (Seq event) output)
=> t
-> Maybe SJ.Limit
-> Maybe SJ.Offset
-> SJ.JobID 'SJ.Safe
-> JobEntry (SJ.JobID 'SJ.Safe) (Dual [event]) output
-> JobEntry (SJ.JobID 'SJ.Safe) (Seq event) output
-> m (SJ.JobStatus 'SJ.Safe event)
killJob t limit offset jid je = do
(Dual logs, status, merr) <- case jTask je of
(logs, status, merr) <- case jTask je of
QueuedJ _ -> do
removeJob True t jid
return (mempty, SJ.IsKilled, Nothing)
......@@ -174,4 +177,4 @@ killJob t limit offset jid je = do
me = either (Just . T.pack . show) (const Nothing) r
removeJob False t jid
pure (lgs, st, me)
pure $ SJ.jobStatus jid limit offset logs status merr
pure $ SJ.jobStatus jid limit offset (toList logs) status merr
......@@ -34,6 +34,7 @@ import Control.Exception
import Control.Monad.Except
import Data.Kind (Type)
import Data.Map.Strict (Map)
import Data.Sequence (Seq)
import Data.Time.Clock
import Network.HTTP.Client (Manager)
import Prelude
......@@ -166,7 +167,7 @@ removeJob queued t jid = do
--
-- | A monad to query for the status of a particular job /and/ submit updates for in-progress jobs.
class MonadJob m (JobType m) (t [JobEventType m]) (JobOutputType m) => MonadJobStatus m t where
class MonadJob m (JobType m) (Seq (JobEventType m)) (JobOutputType m) => MonadJobStatus m where
type JobType m :: Type
type JobOutputType m :: Type
type JobEventType m :: Type
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment