Integrate the orchestrator with a route to query scrapyd

parent d6d1cb30
......@@ -44,6 +44,10 @@ sudo apt-get install libbz2-dev lipq-dev
- https://docs.haskellstack.org/en/stable/README/
- curl -sSL https://get.haskellstack.org/ | sh
### Get the orchestrator library
git clone https://github.com/np/servant-job.git
## Building and installing
stack install
......
......@@ -78,6 +78,8 @@ library:
- fclabels
- fast-logger
# - haskell-gi-base
- http-client
- http-client-tls
- http-conduit
- http-api-data
- http-types
......@@ -107,6 +109,7 @@ library:
- servant
- servant-auth
- servant-client
- servant-job
- servant-mock
- servant-multipart
- servant-server
......
......@@ -38,25 +38,25 @@ module Gargantext.API
---------------------------------------------------------------------
import Gargantext.Prelude
import System.IO (FilePath, print)
import System.IO (FilePath)
import GHC.Generics (D1, Meta (..), Rep, Generic)
import GHC.Generics (D1, Meta (..), Rep)
import GHC.TypeLits (AppendSymbol, Symbol)
import Control.Lens
import Data.Aeson.Encode.Pretty (encodePretty)
import qualified Data.ByteString.Lazy.Char8 as BL8
import Data.Swagger
import Data.Text (Text, pack)
import Data.Text (Text)
import qualified Data.Text.IO as T
--import qualified Data.Set as Set
import Database.PostgreSQL.Simple (Connection, connect)
import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Handler.Warp hiding (defaultSettings)
import Servant
import Servant.Mock (mock)
import Servant.Job.Server (WithCallbacks)
import Servant.Swagger
import Servant.Swagger.UI
-- import Servant.API.Stream
......@@ -69,7 +69,8 @@ import Gargantext.API.Node ( Roots , roots
, NodesAPI , nodesAPI
)
import Gargantext.API.Count ( CountAPI, count, Query)
import Gargantext.Database.Utils (databaseParameters)
import Gargantext.API.Orchestrator
import Gargantext.API.Orchestrator.Types
---------------------------------------------------------------------
......@@ -90,20 +91,7 @@ import Network.Wai.Middleware.RequestLogger
import Network.HTTP.Types hiding (Query)
-- import Gargantext.API.Settings
data FireWall = FireWall { unFireWall :: Bool }
data GEnv conn = Env
{ _env_conn :: !conn
, _env_firewall :: !FireWall
}
deriving (Generic)
makeLenses ''GEnv
type ProdEnv = GEnv Connection
type MockEnv = GEnv ()
import Gargantext.API.Settings
fireWall :: Applicative f => Request -> FireWall -> f Bool
fireWall req fw = do
......@@ -121,15 +109,15 @@ fireWall req fw = do
else pure False
-- makeApp :: Env -> IO (Warp.Settings, Application)
makeApp :: MockEnv -> IO Application
makeApp env = do
-- makeMockApp :: Env -> IO (Warp.Settings, Application)
makeMockApp :: MockEnv -> IO Application
makeMockApp env = do
let serverApp = appMock
-- logWare <- mkRequestLogger def { destination = RequestLogger.Logger $ env^.logger }
--logWare <- mkRequestLogger def { destination = RequestLogger.Logger "/tmp/logs.txt" }
let checkOriginAndHost app req resp = do
blocking <- fireWall req (env ^. env_firewall)
blocking <- fireWall req (env ^. menv_firewall)
case blocking of
True -> app req resp
False -> resp ( responseLBS status401 []
......@@ -155,8 +143,6 @@ makeApp env = do
pure $ logStdoutDev $ checkOriginAndHost $ corsMiddleware $ serverApp
---------------------------------------------------------------------
type PortNumber = Int
---------------------------------------------------------------------
-- | API Global
......@@ -180,6 +166,8 @@ type GargAPI = "user" :> Summary "First user endpoint"
:<|> "count" :> Summary "Count endpoint"
:> ReqBody '[JSON] Query :> CountAPI
:<|> "scraper" :> WithCallbacks ScraperAPI
-- /mv/<id>/<id>
-- /merge/<id>/<id>
-- /rename/<id>
......@@ -194,14 +182,16 @@ type API = SwaggerFrontAPI :<|> GargAPI
---------------------------------------------------------------------
-- | Server declaration
server :: ProdEnv -> Server API
server env
= swaggerFront
:<|> roots conn
:<|> nodeAPI conn
:<|> nodeAPI conn
:<|> nodesAPI conn
:<|> count
server :: Env -> IO (Server API)
server env = do
orchestrator <- scrapyOrchestrator env
pure $ swaggerFront
:<|> roots conn
:<|> nodeAPI conn
:<|> nodeAPI conn
:<|> nodesAPI conn
:<|> count
:<|> orchestrator
where
conn = env ^. env_conn
......@@ -214,9 +204,8 @@ gargMock :: Server GargAPI
gargMock = mock apiGarg Proxy
---------------------------------------------------------------------
app :: ProdEnv -> Application
app = serve api . server
-- TODO firewall
makeApp :: Env -> IO Application
makeApp = fmap (serve api) . server
appMock :: Application
appMock = serve api (swaggerFront :<|> gargMock)
......@@ -234,7 +223,7 @@ schemaUiServer :: (Server api ~ Handler Swagger)
schemaUiServer = swaggerSchemaUIServer
-- Type Familiy for the Documentation
-- Type Family for the Documentation
type family TypeName (x :: *) :: Symbol where
TypeName Int = "Int"
TypeName Text = "Text"
......@@ -266,27 +255,23 @@ swaggerWriteJSON = BL8.writeFile "swagger.json" (encodePretty swaggerDoc)
portRouteInfo :: PortNumber -> IO ()
portRouteInfo port = do
print (pack " ----Main Routes----- ")
print ("http://localhost:" <> show port <> "/index.html")
print ("http://localhost:" <> show port <> "/swagger-ui")
T.putStrLn " ----Main Routes----- "
T.putStrLn $ "http://localhost:" <> toUrlPiece port <> "/index.html"
T.putStrLn $ "http://localhost:" <> toUrlPiece port <> "/swagger-ui"
-- | startGargantext takes as parameters port number and Ini file.
startGargantext :: PortNumber -> FilePath -> IO ()
startGargantext port file = do
param <- databaseParameters file
conn <- connect param
let env = Env conn (FireWall False)
env <- newEnv port file
portRouteInfo port
run port (app env)
app <- makeApp env
run port app
startGargantextMock :: PortNumber -> IO ()
startGargantextMock port = do
portRouteInfo port
let env = Env () (FireWall False)
application <- makeApp env
application <- makeMockApp . MockEnv $ FireWall False
run port application
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Orchestrator where
import Gargantext.Prelude
import Gargantext.API.Settings
import Gargantext.API.Orchestrator.Types
import Gargantext.API.Orchestrator.Scrapy.Schedule
import Control.Lens hiding (elements)
import Data.Aeson
import qualified Data.ByteString.Lazy.Char8 as LBS
import Servant
import Servant.Job.Async
import Servant.Job.Client
import Servant.Job.Server
import Servant.Job.Utils (extendBaseUrl)
callJobScrapy :: (ToJSON e, FromJSON e, FromJSON o, MonadClientJob m)
=> JobServerURL e Schedule o
-> (URL -> Schedule)
-> m o
callJobScrapy jurl schedule = do
progress $ NewTask jurl
out <- view job_output <$>
retryOnTransientFailure (clientCallbackJob' jurl
(fmap (const ()) . scrapySchedule . schedule))
progress $ Finished jurl Nothing
pure out
logConsole :: ToJSON a => a -> IO ()
logConsole = LBS.putStrLn . encode
callScraper :: MonadClientJob m => URL -> ScraperInput -> m ScraperStatus
callScraper url input =
callJobScrapy jurl $ \cb ->
Schedule
{ s_project = "gargantext"
, s_spider = input ^. scin_spider
, s_setting = []
, s_jobid = Nothing
, s_version = Nothing
, s_extra =
[("query", input ^.. scin_query . _Just)
,("user", [input ^. scin_user])
,("corpus", [input ^. scin_corpus . to toUrlPiece])
,("report_every", input ^.. scin_report_every . _Just . to toUrlPiece)
,("limit", input ^.. scin_limit . _Just . to toUrlPiece)
,("url", input ^.. scin_local_file . _Just)
,("count_only", input ^.. scin_count_only . _Just . to toUrlPiece)
,("callback", [toUrlPiece cb])]
}
where
jurl :: JobServerURL ScraperStatus Schedule ScraperStatus
jurl = JobServerURL url Callback
pipeline :: FromJSON e => URL -> ClientEnv -> ScraperInput
-> (e -> IO ()) -> IO ScraperStatus
pipeline scrapyurl client_env input log_status = do
e <- runJobMLog client_env log_status $ callScraper scrapyurl input
either (panic . cs . show) pure e
scrapyOrchestrator :: Env -> IO (Server (WithCallbacks ScraperAPI))
scrapyOrchestrator env = do
apiWithCallbacksServer (Proxy :: Proxy ScraperAPI)
defaultSettings (extendBaseUrl ("scraper" :: String) $ env ^. env_self_url)
(env ^. env_manager) (LogEvent logConsole) $
serveJobsAPI (env ^. env_scrapers) .
JobFunction . pipeline (URL $ env ^. env_settings . scrapydUrl)
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Orchestrator.Scrapy.Schedule where
import Control.Lens
import Data.Aeson
import qualified Data.HashMap.Strict as H
import Data.Text (Text)
import GHC.Generics
import Servant
import Servant.Job.Utils (jsonOptions)
import Servant.Client
import Web.FormUrlEncoded hiding (parseMaybe)
data Schedule = Schedule
{ s_project :: !Text
, s_spider :: !Text
, s_setting :: ![Text]
, s_jobid :: !(Maybe Text)
, s_version :: !(Maybe Text)
, s_extra :: ![(Text,[Text])]
}
deriving (Generic)
data ScheduleResponse = ScheduleResponse
{ r_status :: !Text
, r_jobid :: !Text
}
deriving (Generic)
instance FromJSON ScheduleResponse where
parseJSON = genericParseJSON (jsonOptions "r_")
instance ToForm Schedule where
toForm s =
Form . H.fromList $
[("project", [s_project s])
,("spider", [s_spider s])
,("setting", s_setting s)
,("jobid", s_jobid s ^.. _Just)
,("_version", s_version s ^.. _Just)
] ++ s_extra s
type Scrapy =
"schedule.json" :> ReqBody '[FormUrlEncoded] Schedule
:> Post '[JSON] ScheduleResponse
scrapyAPI :: Proxy Scrapy
scrapyAPI = Proxy
scrapySchedule :: Schedule -> ClientM ScheduleResponse
scrapySchedule = client scrapyAPI
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeOperators #-}
module Gargantext.API.Orchestrator.Types where
import Gargantext.Prelude
import Control.Lens hiding (elements)
import Data.Aeson
import Data.Text (Text)
import Data.Swagger hiding (URL, url, port)
import GHC.Generics hiding (to)
import Servant.Job.Async
import Servant.Job.Client
import Servant.Job.Types
import Servant.Job.Utils (jsonOptions)
import Test.QuickCheck (elements)
import Test.QuickCheck.Arbitrary
instance Arbitrary a => Arbitrary (JobStatus 'Safe a) where
arbitrary = panic "TODO"
instance Arbitrary a => Arbitrary (JobOutput a) where
arbitrary = JobOutput <$> arbitrary
instance ToSchema URL where
declareNamedSchema = panic "TODO"
instance ToSchema AnyOutput where
declareNamedSchema = panic "TODO"
instance ToSchema AnyInput where
declareNamedSchema = panic "TODO"
instance ToSchema AnyEvent where
declareNamedSchema = panic "TODO"
instance ToSchema a => ToSchema (JobInput a)
instance ToSchema a => ToSchema (JobOutput a)
data ScraperInput = ScraperInput
{ _scin_spider :: !Text
, _scin_query :: !(Maybe Text)
, _scin_user :: !Text
, _scin_corpus :: !Int
, _scin_report_every :: !(Maybe Int)
, _scin_limit :: !(Maybe Int)
, _scin_local_file :: !(Maybe Text)
, _scin_count_only :: !(Maybe Bool)
}
deriving Generic
makeLenses ''ScraperInput
instance FromJSON ScraperInput where
parseJSON = genericParseJSON $ jsonOptions "_scin_"
data ScraperEvent = ScraperEvent
{ _scev_message :: !(Maybe Text)
, _scev_level :: !(Maybe Text)
, _scev_date :: !(Maybe Text)
}
deriving Generic
instance Arbitrary ScraperEvent where
arbitrary = ScraperEvent <$> elements [Nothing, Just "test message"]
<*> elements [Nothing, Just "INFO", Just "WARN"]
<*> elements [Nothing, Just "2018-04-18"]
instance ToJSON ScraperEvent where
toJSON = genericToJSON $ jsonOptions "_scev_"
instance FromJSON ScraperEvent where
parseJSON = genericParseJSON $ jsonOptions "_scev_"
data ScraperStatus = ScraperStatus
{ _scst_succeeded :: !(Maybe Int)
, _scst_failed :: !(Maybe Int)
, _scst_remaining :: !(Maybe Int)
, _scst_events :: !(Maybe [ScraperEvent])
}
deriving Generic
instance Arbitrary ScraperStatus where
arbitrary = ScraperStatus <$> arbitrary <*> arbitrary <*> arbitrary <*> arbitrary
instance ToJSON ScraperStatus where
toJSON = genericToJSON $ jsonOptions "_scst_"
instance FromJSON ScraperStatus where
parseJSON = genericParseJSON $ jsonOptions "_scst_"
instance ToSchema ScraperStatus -- TODO _scst_ prefix
instance ToSchema ScraperInput -- TODO _scin_ prefix
instance ToSchema ScraperEvent -- TODO _scev_ prefix
instance ToParamSchema Offset where
toParamSchema = panic "TODO"
instance ToParamSchema Limit where
toParamSchema = panic "TODO"
type ScrapersEnv = JobEnv ScraperStatus ScraperStatus
type ScraperAPI = AsyncJobsAPI ScraperStatus ScraperInput ScraperStatus
......@@ -36,17 +36,20 @@ import GHC.Enum
import GHC.Generics (Generic)
import Prelude (Bounded())
import System.Environment (lookupEnv)
-- import Control.Applicative ((<*>))
import System.IO (FilePath)
import Database.PostgreSQL.Simple (Connection, connect)
import Network.HTTP.Client (Manager)
import Network.HTTP.Client.TLS (newTlsManager)
import Data.Maybe (fromMaybe)
import Data.Either (either)
-- import Data.Map
import Data.Text
import Data.Text.Encoding (encodeUtf8)
import Data.ByteString.Lazy.Internal
import Servant
import Servant.Client (BaseUrl, parseBaseUrl)
import Servant.Job.Async (newJobEnv, defaultSettings)
import Web.HttpApiData (parseUrlPiece)
import qualified Jose.Jwk as Jose
import qualified Jose.Jwa as Jose
......@@ -54,7 +57,10 @@ import qualified Jose.Jwa as Jose
import Control.Monad.Logger
import Control.Lens
import Gargantext.Prelude
import Gargantext.Database.Utils (databaseParameters)
import Gargantext.API.Orchestrator.Types
type PortNumber = Int
data SendEmailType = SendEmailViaAws
| LogEmailToConsole
......@@ -65,11 +71,13 @@ data SendEmailType = SendEmailViaAws
data Settings = Settings
{ _allowedOrigin :: ByteString -- ^ allowed origin for CORS
, _allowedHost :: ByteString -- ^ allowed host for CORS
, _appPort :: Int
, _appPort :: PortNumber
, _logLevelLimit :: LogLevel -- ^ log level from the monad-logger package
, _dbServer :: Text
-- , _dbServer :: Text
-- ^ this is not used yet
, _jwtSecret :: Jose.Jwk -- ^ key from the jose-jwt package
, _sendLoginEmails :: SendEmailType
, _scrapydUrl :: BaseUrl
}
makeLenses ''Settings
......@@ -90,12 +98,13 @@ devSettings = Settings
, _allowedHost = "localhost:3000"
, _appPort = 3000
, _logLevelLimit = LevelDebug
, _dbServer = "localhost"
-- , _dbServer = "localhost"
-- generate with dd if=/dev/urandom bs=1 count=32 | base64
-- make sure jwtSecret differs between development and production, because you do not want
-- your production key inside source control.
, _jwtSecret = parseJwk "MVg0YAPVSPiYQc/qIs/rV/X32EFR0zOJWfHFgMbszMw="
, _sendLoginEmails = LogEmailToConsole
, _scrapydUrl = fromMaybe (panic "Invalid scrapy URL") $ parseBaseUrl "http://localhost:6800"
}
......@@ -122,14 +131,43 @@ optSetting name d = do
-- <*> (parseJwk <$> reqSetting "JWT_SECRET")
-- <*> optSetting "SEND_EMAIL" SendEmailViaAws
data FireWall = FireWall { unFireWall :: Bool }
data Env = Env
{ _settings :: Settings
, _logger :: LoggerSet
-- , _dbConfig :: ConnectionPool -- from Database.Persist.Postgresql
}
{ _env_settings :: !Settings
, _env_logger :: !LoggerSet
, _env_conn :: !Connection
, _env_manager :: !Manager
, _env_self_url :: !BaseUrl
, _env_scrapers :: !ScrapersEnv
}
deriving (Generic)
makeLenses ''Env
createEnv :: Settings -> IO Env
createEnv _ = undefined {- implementation here: connect to db, init logger, etc -}
data MockEnv = MockEnv
{ _menv_firewall :: !FireWall
}
deriving (Generic)
makeLenses ''MockEnv
newEnv :: PortNumber -> FilePath -> IO Env
newEnv port file = do
manager <- newTlsManager
settings <- pure (devSettings & appPort .~ port) -- TODO read from 'file'
when (port /= settings ^. appPort) $
panic "TODO: conflicting settings of port"
self_url <- parseBaseUrl $ "http://0.0.0.0:" <> show port
param <- databaseParameters file
conn <- connect param
scrapers_env <- newJobEnv defaultSettings manager
logger <- newStderrLoggerSet defaultBufSize
pure $ Env
{ _env_settings = settings
, _env_logger = logger
, _env_conn = conn
, _env_manager = manager
, _env_scrapers = scrapers_env
, _env_self_url = self_url
}
......@@ -153,7 +153,8 @@ isStop c = c `elem` ['.','?','!']
-- | Tests
-- TODO http://hackage.haskell.org/package/tokenize-0.3.0/docs/NLP-Tokenize-Text.html
ngramsTest = ws
ngramsTest :: (IO [Text], IO [Text], IO (Map Text Occ))
ngramsTest = (ws, ls, ocs)
where
txt = concat <$> lines <$> clean <$> readFile "Giono-arbres.txt"
-- | Number of sentences
......
......@@ -31,7 +31,7 @@ import Data.Maybe (isJust, fromJust, maybe)
import Protolude ( Bool(True, False), Int, Double, Integer
, Fractional, Num, Maybe(Just,Nothing)
, Floating, Char, IO
, pure, (<$>), panic
, pure, (<*>), (<$>), panic
, head, flip
, Ord, Integral, Foldable, RealFrac, Monad, filter
, reverse, map, zip, drop, take, zipWith
......@@ -42,9 +42,9 @@ import Protolude ( Bool(True, False), Int, Double, Integer
, Eq, (==), (>=), (<=), (<>), (/=)
, (&&), (||), not
, fst, snd, toS
, elem, die, mod, div, const
, elem, die, mod, div, const, either
, curry, uncurry
, otherwise
, otherwise, when
)
-- TODO import functions optimized in Utils.Count
......
......@@ -310,5 +310,3 @@ instance ToSchema (NodePoly NodeId NodeTypeId
)
instance ToSchema Status
......@@ -2,6 +2,7 @@ flags: {}
extra-package-dbs: []
packages:
- .
- servant-job
allow-newer: true
extra-deps:
......@@ -18,15 +19,16 @@ extra-deps:
- haskell-src-exts-1.18.2
- http-types-0.12.1
- protolude-0.2
- servant-0.12.1
- servant-0.13
- servant-auth-0.3.0.1
- servant-client-0.12.0.1
- servant-client-core-0.12
- servant-client-0.13
- servant-client-core-0.13
- servant-docs-0.11.1
- servant-multipart-0.11.1
- servant-server-0.12
- servant-server-0.13
- servant-swagger-ui-0.2.3.2.2.8
- stemmer-0.5.2
- text-1.2.3.0
- text-show-3.6.2
- servant-flatten-0.2
resolver: lts-10.6
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment