[demo] app added as a simple demo

parent 47dfb8d3
Pipeline #6850 failed with stages
in 1 minute and 42 seconds
...@@ -83,3 +83,10 @@ any job metadata structure like in haskell-bee. They fall into the ...@@ -83,3 +83,10 @@ any job metadata structure like in haskell-bee. They fall into the
- [redis-job-queue](https://hackage.haskell.org/package/redis-job-queue) - [redis-job-queue](https://hackage.haskell.org/package/redis-job-queue)
(worth investigating the sorted set approach) (worth investigating the sorted set approach)
- [poolboy](https://hackage.haskell.org/package/poolboy) (in-memory only) - [poolboy](https://hackage.haskell.org/package/poolboy) (in-memory only)
## Credit
All credit goes to the [Gargantext
team](https://www.gargantext.org/). This work was done as part of my
contract there.
...@@ -95,7 +95,8 @@ main = do ...@@ -95,7 +95,8 @@ main = do
, onMessageReceived = Nothing , onMessageReceived = Nothing
, onJobFinish = Nothing , onJobFinish = Nothing
, onJobTimeout = Nothing , onJobTimeout = Nothing
, onJobError = Nothing } , onJobError = Nothing
, onWorkerKilledSafely = Nothing }
forkIO $ run state forkIO $ run state
) workersLst ) workersLst
......
This diff is collapsed.
module App.Types
( Program(..)
, GlobalArgs(..)
, Command(..)
, WorkerArgs(..)
, EchoArgs(..)
, ErrorArgs(..)
, WaitArgs(..)
, programParser )
where
import Async.Worker.Broker.Types qualified as B
import Options.Applicative
data Program =
Program GlobalArgs Command
deriving (Eq, Show)
programParser :: ParserInfo Program
programParser = info
( ( Program <$> global <*> commandParser ) <**> helper ) mempty
data GlobalArgs =
GlobalArgs { _ga_queue :: B.Queue }
deriving (Eq, Show)
global :: Parser GlobalArgs
global = GlobalArgs
<$> (B.Queue <$> strOption ( long "queue"
<> value "default"
<> help "Queue that we will use" ) )
data Command
= Worker WorkerArgs
| QueueSize
| Echo EchoArgs
| Error ErrorArgs
| Quit
| Wait WaitArgs
deriving (Eq, Show)
commandParser :: Parser Command
commandParser = subparser
( command "worker" (info (worker <**> helper) mempty )
<> command "queue-size" (info (queueSize <**> helper) mempty )
-- tasks
<> command "echo" (info (echo <**> helper) mempty )
<> command "error" (info (error' <**> helper) mempty )
<> command "quit" (info (quit <**> helper) mempty )
<> command "wait" (info (wait <**> helper) mempty )
)
data WorkerArgs =
WorkerArgs
deriving (Eq, Show)
worker :: Parser Command
worker = pure $ Worker WorkerArgs
queueSize :: Parser Command
queueSize = pure QueueSize
data EchoArgs =
EchoArgs { _ea_message :: String }
deriving (Eq, Show)
echo :: Parser Command
echo = Echo
<$> ( EchoArgs
<$> argument str ( metavar "MESSAGE"
<> help "message to send in echo" ) )
data ErrorArgs =
ErrorArgs { _ea_error :: String }
deriving (Eq, Show)
error' :: Parser Command
error' = Error
<$> ( ErrorArgs
<$> argument str ( metavar "ERROR"
<> help "Error message" ) )
quit :: Parser Command
quit = pure Quit
data WaitArgs =
WaitArgs { _wa_time :: Int }
deriving (Eq, Show)
wait :: Parser Command
wait = Wait
<$> ( WaitArgs
<$> argument auto ( metavar "TIME"
<> help "Time to wait, in seconds") )
module Main where
import App.Types
import Async.Worker qualified as W
import Async.Worker.Broker.Types qualified as B
import Control.Concurrent (throwTo)
import Control.Concurrent.Async (asyncThreadId, wait)
import Control.Monad (void)
import Demo.PGMQ (initBroker, initWorker)
import Demo.Types qualified as DT
import Options.Applicative
import System.Posix.Signals (Handler(Catch), installHandler, keyboardSignal)
main :: IO ()
main = start =<< execParser programParser
start :: Program -> IO ()
start (Program (GlobalArgs { .. }) (Worker WorkerArgs)) = do
initWorker "default" _ga_queue $ \a _state -> do
let tid = asyncThreadId a
_ <- installHandler keyboardSignal (Catch (throwTo tid W.KillWorkerSafely)) Nothing
wait a
start (Program (GlobalArgs { .. }) QueueSize) = do
b <- initBroker
s <- B.getQueueSize b _ga_queue
putStrLn $ "queue size: " <> show s
start (Program (GlobalArgs { .. }) (Echo (EchoArgs { .. }))) = do
b <- initBroker
let sj = W.mkDefaultSendJob' b _ga_queue (DT.Echo _ea_message)
void $ W.sendJob' sj
start (Program (GlobalArgs { .. }) (Error (ErrorArgs { .. }))) = do
b <- initBroker
let sj = W.mkDefaultSendJob' b _ga_queue (DT.Error _ea_error)
void $ W.sendJob' sj
start (Program (GlobalArgs { .. }) Quit) = do
b <- initBroker
let sj = W.mkDefaultSendJob' b _ga_queue DT.Quit
void $ W.sendJob' $ sj { W.resendOnKill = False }
start (Program (GlobalArgs { .. }) (Wait (WaitArgs { .. }))) = do
b <- initBroker
let sj = W.mkDefaultSendJob' b _ga_queue (DT.Wait _wa_time)
void $ W.sendJob' sj
with-compiler: ghc-9.4.8
packages:
./
../
source-repository-package
type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-pgmq
tag: 0591a643d8ba1776af4fac56c1e4ff5fc3e98bb3
cabal-version: 3.0
-- The cabal-version field refers to the version of the .cabal specification,
-- and can be different from the cabal-install (the tool) version and the
-- Cabal (the library) version you are using. As such, the Cabal (the library)
-- version used must be equal or greater than the version stated in this field.
-- Starting from the specification version 2.2, the cabal-version field must be
-- the first thing in the cabal file.
-- Initial package description 'haskell-bee-demo' generated by
-- 'cabal init'. For further documentation, see:
-- http://haskell.org/cabal/users-guide/
--
-- The name of the package.
name: haskell-bee-demo
-- The package version.
-- See the Haskell package versioning policy (PVP) for standards
-- guiding when and how versions should be incremented.
-- https://pvp.haskell.org
-- PVP summary: +-+------- breaking API changes
-- | | +----- non-breaking API additions
-- | | | +--- code changes with no API change
version: 0.1.0.0
-- A short (one-line) description of the package.
-- synopsis:
-- A longer description of the package.
-- description:
-- The license under which the package is released.
license: AGPL-3.0-or-later
-- The file containing the license text.
license-file: LICENSE
-- The package author(s).
author: Przemysław Kaminski
-- An email address to which users can send suggestions, bug reports, and patches.
maintainer: pk@intrepidus.pl
-- A copyright notice.
-- copyright:
category: Concurrency
build-type: Simple
-- Extra doc files to be distributed with the package, such as a CHANGELOG or a README.
extra-doc-files: CHANGELOG.md
-- Extra source files to be distributed with the package, such as examples, or a tutorial module.
-- extra-source-files:
common warnings
ghc-options: -Wall
library
hs-source-dirs: lib
exposed-modules: Demo.Action
, Demo.PGMQ
, Demo.Types
build-depends: base ^>=4.17.2.1
, aeson >= 2.1 && < 2.3
, async >= 2.2 && < 2.3
, haskell-bee
, postgresql-simple >= 0.7.0 && < 0.8
, safe-exceptions >= 0.1.7 && < 0.2
, text >= 2.0 && < 2.2
default-language: Haskell2010
default-extensions:
DeriveGeneric
DuplicateRecordFields
FlexibleContexts
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
NumericUnderscores
OverloadedStrings
RecordWildCards
executable demo
-- Import common warning flags.
import: warnings
-- .hs or .lhs file containing the Main module.
main-is: Main.hs
-- Modules included in this executable, other than Main.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
-- other-extensions:
-- Other library packages from which modules are imported.
build-depends: base ^>=4.17.2.1
, async >= 2.2 && < 2.3
, haskell-bee
, haskell-bee-demo
, optparse-applicative >= 0.18 && < 0.19
, unix >= 2.8 && < 2.9
-- Directories containing source files.
hs-source-dirs: app
other-modules: App.Types
-- Base language which the package is written in.
default-language: Haskell2010
default-extensions:
DeriveGeneric
DuplicateRecordFields
GeneralizedNewtypeDeriving
ImportQualifiedPost
NamedFieldPuns
OverloadedStrings
RecordWildCards
module Demo.Action where
import Async.Worker.Broker qualified as B
import Async.Worker.Broker.Types qualified as B
import Async.Worker.Broker.PGMQ (PGMQBroker)
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Control.Concurrent (threadDelay)
import Control.Exception.Safe (Exception, throwIO)
import Demo.Types (Job(..))
data MyException = MyException String
deriving (Show)
instance Exception MyException
performAction :: (W.HasWorkerBroker PGMQBroker Job)
=> W.State PGMQBroker Job
-> B.BrokerMessage PGMQBroker (W.Job Job)
-> IO ()
performAction state bm = do
case W.getJob (B.toA (B.getMessage bm)) of
Echo s -> putStrLn s
Wait t -> do
putStrLn $ "waiting " <> show t <> "s"
threadDelay (t * 1_000_000)
Error e -> throwIO $ MyException e
Quit -> throwIO $ W.KillWorkerSafely
module Demo.PGMQ where
import Async.Worker.Broker qualified as B
import Async.Worker.Broker.Types qualified as B
import Async.Worker.Broker.PGMQ (PGMQBroker, BrokerInitParams(PGMQBrokerInitConnStr))
import Async.Worker qualified as W
import Async.Worker.Types qualified as W
import Async.Worker.Types (HasWorkerBroker)
import Control.Concurrent.Async (Async, withAsync)
import Data.Maybe (fromMaybe)
import Data.Text qualified as T
import Data.Text.Encoding qualified as T
import Database.PostgreSQL.Simple qualified as PSQL
import Demo.Action (performAction)
import Demo.Types (Job)
import System.Environment (lookupEnv)
brokerInitParams :: IO (B.BrokerInitParams PGMQBroker (W.Job Job))
brokerInitParams = do
mConnInfo <- lookupEnv "POSTGRES_CONN"
let connInfo = T.pack $ fromMaybe "host=localhost port=5432 dbname=postgres user=postgres" mConnInfo
return $ PGMQBrokerInitConnStr (T.encodeUtf8 connInfo) 1
initBroker :: (HasWorkerBroker PGMQBroker Job)
=> IO (B.Broker PGMQBroker (W.Job Job))
initBroker = do
params <- brokerInitParams
B.initBroker params
initWorker :: (HasWorkerBroker PGMQBroker Job)
=> String
-> B.Queue
-> (Async () -> W.State PGMQBroker Job -> IO ())
-> IO ()
initWorker name queueName cb = do
putStrLn $ "[" <> name <> "] starting"
broker <- initBroker
let onWorkerKilledSafely _ _ = do
putStrLn $ "[" <> name <> "] killing me safely"
let state' = W.State { broker
, name
, queueName
, performAction
, onMessageReceived = Nothing
, onJobFinish = Nothing
, onJobTimeout = Nothing
, onJobError = Nothing
, onWorkerKilledSafely = Just onWorkerKilledSafely }
withAsync (W.run state') (\a -> cb a state')
module Demo.Types where
import Data.Aeson
import GHC.Generics
data Job =
Echo String
| Wait Int
| Error String
| Quit
deriving (Eq, Show, Generic)
-- | Generic to/from JSON is bad, but we don't care
instance ToJSON Job
instance FromJSON Job
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment