Orchestrator.hs 2.96 KB
{-|
Module      : Gargantext.API.Admin.Orchestrator
Description : Jobs Orchestrator
Copyright   : (c) CNRS, 2017-Present
License     : AGPL + CECILL v3
Maintainer  : team@gargantext.org
Stability   : experimental
Portability : POSIX

-}

{-# LANGUAGE TemplateHaskell   #-}
{-# LANGUAGE TypeOperators     #-}

module Gargantext.API.Admin.Orchestrator where

import Control.Lens hiding (elements)
import Data.Aeson
import Data.Text
import Gargantext.API.Admin.Orchestrator.Scrapy.Schedule
import Gargantext.API.Admin.Orchestrator.Types
import Gargantext.API.Admin.Settings
import Gargantext.Prelude
import Servant
import Servant.Job.Async
import Servant.Job.Client
import Servant.Job.Server
import Servant.Job.Utils (extendBaseUrl)
import qualified Data.ByteString.Lazy.Char8 as LBS

callJobScrapy :: (ToJSON e, FromJSON e, FromJSON o, MonadClientJob m)
              => JobServerURL e Schedule o
              -> (URL -> Schedule)
              -> m o
callJobScrapy jurl schedule = do
  progress $ NewTask jurl
  out <- view job_output <$>
          retryOnTransientFailure (clientCallbackJob' jurl
            (fmap (const ()) . scrapySchedule . schedule))
  progress $ Finished jurl Nothing
  pure out

logConsole :: ToJSON a => a -> IO ()
logConsole = LBS.putStrLn . encode

callScraper :: MonadClientJob m => URL -> ScraperInput -> m JobLog
callScraper url input =
  callJobScrapy jurl $ \cb ->
    Schedule
      { s_project = "gargantext"
      , s_spider  = input ^. scin_spider
      , s_setting = []
      , s_jobid   = Nothing
      , s_version = Nothing
      , s_extra   =
          [("query",        input ^.. scin_query . _Just)
          ,("user",         [input ^. scin_user])
          ,("corpus",       [input ^. scin_corpus . to toUrlPiece])
          ,("report_every", input ^.. scin_report_every . _Just . to toUrlPiece)
          ,("limit",        input ^.. scin_limit . _Just . to toUrlPiece)
          ,("url",          input ^.. scin_local_file . _Just)
          ,("count_only",   input ^.. scin_count_only . _Just . to toUrlPiece)
          ,("callback",     [toUrlPiece cb])]
      }
  where
    jurl :: JobServerURL JobLog Schedule JobLog
    jurl = JobServerURL url Callback

pipeline :: FromJSON e => URL -> ClientEnv -> ScraperInput
                       -> (e -> IO ()) -> IO JobLog
pipeline scrapyurl client_env input log_status = do
  e <- runJobMLog client_env log_status $ callScraper scrapyurl input
  either (panic . cs . show) pure e -- TODO throwError

-- TODO integrate to ServerT
--  use:
--  * serveJobsAPI instead of simpleServeJobsAPI
--  * JobFunction  instead of simpleJobFunction
scrapyOrchestrator :: Env -> IO (Server (WithCallbacks ScraperAPI))
scrapyOrchestrator env = do
  apiWithCallbacksServer (Proxy :: Proxy ScraperAPI)
    defaultSettings (extendBaseUrl ("scraper" :: Text) $ env ^. env_self_url)
    (env ^. env_manager) (LogEvent logConsole) $
    simpleServeJobsAPI (env ^. env_scrapers) .
      simpleJobFunction . pipeline (URL $ env ^. env_settings . scrapydUrl)