Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
haskell-gargantext
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
157
Issues
157
List
Board
Labels
Milestones
Merge Requests
9
Merge Requests
9
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gargantext
haskell-gargantext
Commits
a81bb4ef
Verified
Commit
a81bb4ef
authored
Oct 10, 2024
by
Przemyslaw Kaminski
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[worker] support for --run-single
parent
1959214c
Pipeline
#6815
canceled with stages
Changes
6
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
56 additions
and
20 deletions
+56
-20
Types.hs
bin/gargantext-cli/CLI/Types.hs
+3
-2
Worker.hs
bin/gargantext-cli/CLI/Worker.hs
+10
-4
cabal.project
cabal.project
+1
-1
Worker.hs
src/Gargantext/Core/Worker.hs
+22
-1
Jobs.hs
src/Gargantext/Core/Worker/Jobs.hs
+9
-0
Jobs.hs
src/Gargantext/Utils/Jobs.hs
+11
-12
No files found.
bin/gargantext-cli/CLI/Types.hs
View file @
a81bb4ef
...
...
@@ -81,8 +81,9 @@ data CLIRoutes
deriving
(
Show
,
Eq
)
data
WorkerArgs
=
WorkerArgs
{
worker_toml
::
!
SettingsFile
,
worker_name
::
!
Text
{
worker_toml
::
!
SettingsFile
,
worker_name
::
!
Text
,
worker_run_single
::
!
Bool
}
deriving
(
Show
,
Eq
)
data
CLICmd
...
...
bin/gargantext-cli/CLI/Worker.hs
View file @
a81bb4ef
...
...
@@ -19,7 +19,7 @@ import Data.Text qualified as T
import
Gargantext.Core.Config
(
hasConfig
,
_gc_worker
)
import
Gargantext.Core.Config.Types
(
SettingsFile
(
..
))
import
Gargantext.Core.Config.Worker
(
WorkerDefinition
(
..
),
WorkerSettings
(
..
),
findDefinitionByName
)
import
Gargantext.Core.Worker
(
withPGMQWorker
)
import
Gargantext.Core.Worker
(
withPGMQWorker
,
withPGMQWorkerSingle
)
import
Gargantext.Core.Worker.Env
(
withWorkerEnv
)
import
Gargantext.Core.Worker.Jobs
(
sendJob
)
import
Gargantext.Core.Worker.Jobs.Types
(
Job
(
Ping
))
...
...
@@ -56,9 +56,13 @@ workerCLI (WorkerArgs { .. }) = do
Just
wd
->
do
putStrLn
(
"Starting worker '"
<>
worker_name
<>
"'"
)
putStrLn
(
"Worker settings: "
<>
show
ws
::
Text
)
withPGMQWorker
env
wd
$
\
a
_state
->
do
runReaderT
(
sendJob
Ping
)
env
wait
a
if
worker_run_single
then
withPGMQWorkerSingle
env
wd
$
\
a
_state
->
do
wait
a
else
withPGMQWorker
env
wd
$
\
a
_state
->
do
runReaderT
(
sendJob
Ping
)
env
wait
a
workerCmd
::
HasCallStack
=>
Mod
CommandFields
CLI
...
...
@@ -70,4 +74,6 @@ worker_p = fmap CCMD_worker $ WorkerArgs
<*>
strOption
(
long
"name"
<>
metavar
"STRING"
<>
help
"Worker name, as defined in the .toml file"
)
<*>
flag
False
True
(
long
"run-single"
<>
help
"Whether to loop or run a single job from queue"
)
cabal.project
View file @
a81bb4ef
...
...
@@ -196,7 +196,7 @@ source-repository-package
source
-
repository
-
package
type
:
git
location
:
https
://
gitlab
.
iscpif
.
fr
/
gargantext
/
haskell
-
bee
tag
:
58
ab07e0110281f94ecc8840b8cd0c0a9081b672
tag
:
d783198e1b7ca8a61e5332965db468da3faef796
source
-
repository
-
package
type
:
git
...
...
src/Gargantext/Core/Worker.hs
View file @
a81bb4ef
...
...
@@ -64,7 +64,28 @@ withPGMQWorker env (WorkerDefinition { .. }) cb = do
withAsync
(
Worker
.
run
state'
)
(
\
a
->
cb
a
state'
)
withPGMQWorkerSingle
::
(
HasWorkerBroker
PGMQBroker
Job
)
=>
WorkerEnv
->
WorkerDefinition
->
(
Async
()
->
Worker
.
State
PGMQBroker
Job
->
IO
()
)
->
IO
()
withPGMQWorkerSingle
env
(
WorkerDefinition
{
..
})
cb
=
do
let
gargConfig
=
env
^.
hasConfig
broker
<-
initBrokerWithDBCreate
gargConfig
let
state'
=
Worker
.
State
{
broker
,
queueName
=
_wdQueue
,
name
=
T
.
unpack
_wdName
,
performAction
=
performAction
env
,
onMessageReceived
=
Nothing
,
onJobFinish
=
Nothing
,
onJobTimeout
=
Nothing
,
onJobError
=
Nothing
}
withAsync
(
Worker
.
runSingle
state'
)
(
\
a
->
cb
a
state'
)
-- | How the worker should process jobs
performAction
::
(
HasWorkerBroker
b
Job
)
=>
WorkerEnv
->
Worker
.
State
b
Job
...
...
@@ -91,4 +112,4 @@ performAction env _state bm = do
liftBase
$
putStrLn
(
"new node async "
::
Text
)
void
$
postNode'
_nna_authenticatedUser
_nna_node_id
_nna_postNode
return
()
GargJob
{
_gj_garg_job
}
->
putStrLn
(
"Garg job: "
<>
show
_gj_garg_job
::
Text
)
GargJob
{
_gj_garg_job
}
->
putStrLn
(
"Garg job: "
<>
show
_gj_garg_job
<>
" (handling of this job is still not implemented!)"
::
Text
)
src/Gargantext/Core/Worker/Jobs.hs
View file @
a81bb4ef
...
...
@@ -17,6 +17,7 @@ import Async.Worker.Broker.PGMQ (PGMQBroker)
import
Async.Worker
qualified
as
Worker
import
Async.Worker.Types
(
HasWorkerBroker
)
import
Control.Lens
(
view
)
import
Gargantext.API.Admin.EnvTypes
qualified
as
EnvTypes
import
Gargantext.Core.Config
(
gc_worker
,
HasConfig
(
..
))
import
Gargantext.Core.Config.Worker
(
WorkerSettings
(
..
),
WorkerDefinition
(
..
))
import
Gargantext.Core.Worker.Broker
(
initBrokerWithDBCreate
)
...
...
@@ -40,3 +41,11 @@ sendJob job = do
b
<-
initBrokerWithDBCreate
gcConfig
let
queueName
=
_wdQueue
wd
void
$
Worker
.
sendJob'
$
Worker
.
mkDefaultSendJob'
b
queueName
job
-- | This is just a list of what's implemented and what not.
-- After we migrate to async workers, this should be removed
-- (see G.C.Worker -> performAction on what's implemented already)
handledJobs
::
[
EnvTypes
.
GargJob
]
handledJobs
=
[
EnvTypes
.
AddCorpusQueryJob
,
EnvTypes
.
ForgotPasswordJob
]
src/Gargantext/Utils/Jobs.hs
View file @
a81bb4ef
...
...
@@ -22,22 +22,19 @@ module Gargantext.Utils.Jobs (
,
markFailedNoErr
)
where
import
Control.Monad.Except
(
runExceptT
)
import
Control.Monad.Reader
(
MonadReader
(
ask
),
ReaderT
(
runReaderT
)
)
import
Data.Aeson
(
ToJSON
)
import
Data.Text
qualified
as
T
import
Gargantext.API.Admin.EnvTypes
(
mkJobHandle
,
parseGargJob
,
Env
,
GargJob
(
..
)
)
import
Gargantext.API.Errors.Types
(
BackendInternalError
(
InternalJobError
)
)
import
Gargantext.API.Prelude
(
GargM
)
import
Gargantext.Core.Worker.Jobs
qualified
as
Jobs
import
Gargantext.Core.Worker.Jobs.Types
qualified
as
Jobs
import
Gargantext.Prelude
import
Gargantext.System.Logging
import
Gargantext.Utils.Jobs.Internal
qualified
as
Internal
import
Gargantext.Utils.Jobs.Monad
(
JobError
,
MonadJobStatus
(
..
),
markFailureNoErr
,
markFailedNoErr
)
import
Prelude
--
import Prelude
import
Servant.Job.Async
qualified
as
SJ
import
System.Directory
(
doesFileExist
)
import
Text.Read
(
readMaybe
)
jobErrorToGargError
...
...
@@ -62,19 +59,21 @@ serveJobsAPI
serveJobsAPI
jobType
f
=
Internal
.
serveJobsAPI
mkJobHandle
ask
jobType
jobErrorToGargError
$
\
env
jHandle
i
->
do
runExceptT
$
flip
runReaderT
env
$
do
$
(
logLocM
)
INFO
(
T
.
pack
$
"Running job of type: "
++
show
jobType
)
Jobs
.
sendJob
$
Jobs
.
GargJob
{
Jobs
.
_gj_garg_job
=
jobType
}
unless
(
jobType
`
elem
`
Jobs
.
handledJobs
)
$
Jobs
.
sendJob
$
Jobs
.
GargJob
{
Jobs
.
_gj_garg_job
=
jobType
}
f
jHandle
i
getLatestJobStatus
jHandle
parsePrios
::
[
String
]
->
IO
[(
GargJob
,
Int
)]
parsePrios
::
[
Text
]
->
IO
[(
GargJob
,
Int
)]
parsePrios
[]
=
pure
[]
parsePrios
(
x
:
xs
)
=
(
:
)
<$>
go
x
<*>
parsePrios
xs
where
go
s
=
case
break
(
==
'='
)
s
of
(
[]
,
_
)
->
error
"parsePrios: empty jobname?"
parsePrios
(
x
:
xs
)
=
(
:
)
<$>
go
(
T
.
unpack
x
)
<*>
parsePrios
xs
where
go
s
=
case
break
(
==
'='
)
s
of
(
[]
,
_
)
->
errorTrace
"parsePrios: empty jobname?"
(
prop
,
valS
)
|
Just
val
<-
readMaybe
(
tail
valS
)
|
Just
val
<-
readMaybe
(
T
.
tail
$
T
.
pack
valS
)
,
Just
j
<-
parseGargJob
(
T
.
pack
prop
)
->
pure
(
j
,
val
)
|
otherwise
->
error
$
|
otherwise
->
error
Trace
$
"parsePrios: invalid input. "
++
show
(
prop
,
valS
)
readPrios
::
Logger
IO
->
FilePath
->
IO
[(
GargJob
,
Int
)]
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment