Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
haskell-gargantext
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
194
Issues
194
List
Board
Labels
Milestones
Merge Requests
9
Merge Requests
9
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
gargantext
haskell-gargantext
Commits
2d61da1c
Verified
Commit
2d61da1c
authored
Nov 19, 2024
by
Przemyslaw Kaminski
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
[tests] fix tests, README moved things to dev docs
parent
f84d7734
Pipeline
#6992
canceled with stages
in 26 minutes and 22 seconds
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
2 additions
and
210 deletions
+2
-210
README.md
README.md
+0
-207
GraphQL.hs
test/Test/API/GraphQL.hs
+0
-1
Instances.hs
test/Test/Instances.hs
+2
-2
No files found.
README.md
View file @
2d61da1c
...
@@ -396,21 +396,6 @@ cabal v2-test garg-test-tasty --test-show-details=streaming --flags 'test-crypto
...
@@ -396,21 +396,6 @@ cabal v2-test garg-test-tasty --test-show-details=streaming --flags 'test-crypto
# Async workers <a name="async-workers"></a>
# Async workers <a name="async-workers"></a>
Async workers allow us to accept long-running jobs and execute them
asynchronously. Workers can be spawned on multiple machines, which
allows for scaling.
To run the worker, follow these steps:
-
start a PostgreSQL DB, usually the one with Gargantext DB is enough
-
`"default"`
worker definition is in
`gargantext-settings.toml_toModify`
-
run worker:
`cabal v2-run gargantext-cli -- worker --name default`
The project that we base our worker is
[
haskell-bee
](
https://gitlab.iscpif.fr/gargantext/haskell-bee/
)
. It's
a more generic framework for managing asynchronous workers, supporting
different brokers. Here, we decided to use
`pgmq`
because we already
have PostgreSQL deployed.
## Configuration
## Configuration
Edit your
`gargantext-settings.toml`
file and add this section:
Edit your
`gargantext-settings.toml`
file and add this section:
...
@@ -450,142 +435,11 @@ server, it is just assumed that the `worker.database` from above
...
@@ -450,142 +435,11 @@ server, it is just assumed that the `worker.database` from above
config exists. See
`G.C.W.Broker`
->
`initBrokerWithDBCreate`
for more
config exists. See
`G.C.W.Broker`
->
`initBrokerWithDBCreate`
for more
details.
details.
## Design
Thanks to the fact that we already use
`Servant.Jobs`
(which executes
the jobs in the gargantext-API process), we can migrate our jobs more
easily to a different backend.
All job definitions are in
`G.C.W.J.Types`
under
`Job`
datatype.
If you want to add a new job, just add a new constructor to
`Job`
(with all the arguments this job needs), implement to/from JSON
serialization (so we can send and read that job via the broker) and
implement appropriate handler in
`G.C.Worker`
->
`performAction`
.
## No storage backend
When you compare
`haskell-bee`
with the
[
celery
project](https://docs.celeryq.dev/en/stable/), you can notice 2
things:
-
both have configurable brokers
-
`haskell-bee`
doesn't have a storage backend This is because, when
analyzing our tasks, we actually don't store any
**task**
results
anywhere. The progress can be reported immediately via notifications,
same for errors. And when a task is executed (e.g. add to corpus with
query), the garg db state is mutated.
One could discuss if such a
**task**
storage backend is needed: if you
want to debug some task, just run the worker and log its results. You
can inspect later the errors in the log. On the other hand, if we had
a production worker, which processes lots of tasks, it could be a pain
to store the results of all of them, as we mostly don't care about the
finished ones.
Also, the
`haskell-bee`
framework allows to add custom hooks to the
worker. In particular, search for
`onJobError`
/
`onJobTimeout`
in
`Worker.State`
. We could trigger some
`IO`
action on these hooks
(logging, sending mail, firing rockets).
# Notifications <a name="notifications"></a>
# Notifications <a name="notifications"></a>
The notifications mechanism has 3 basic components: the
`central
The notifications mechanism has 3 basic components: the
`central
exchange`
, the
`dispatcher`
and the
`websocket server`
.
exchange`
, the
`dispatcher`
and the
`websocket server`
.
## Central exchange
This is the first entrypoint for notifications. The code lies in
`G.C.N.CentralExchange`
. To create a central exchange, just call
`gServer`
from that module. Notice that currently there can be 1
central exchange, but in principle there could be many.
The purpose of central exchange is to be
**quick**
: accept as many
notifications as possible (they can be sent via
`G.C.N.CentralExchange`
->
`notify`
), then forward them to the
`dispatcher`
.
You can send only predefined messages, see
`G.C.N.CE.Types`
->
`CEMessage`
.
Please note that the central exchange acts as a proxy and, as such, it
can run as a separate linux process. Currently, though, it is just a
GHC thread, part of
`gargantext-server`
.
## Dispatcher
The code for the dispatcher lies in
`G.C.N.Dispatcher`
. To spawn the
server, use
`withDispatcher`
.
The dispatcher is responsible for subscribing clients to messages and
dispatching incoming messages to appropriate clients. Also, it
implements a throttling mechanism so if it is bombarded with lots of
equal messages, it will send only some of them to the client
(c.f.
`sendDataMessageThrottled`
).
The basic data structure associated with the dispatcher is a set of
`Subscription`
's. This is a
`TVar`
, i.e. a global mutable variable. An
incoming subscription is registered in this set, along with topics
(see
`G.C.N.D.Types`
->
`Topic`
datatype).
For example, a client can say: "i want to listen to update tree events
under node X", which translates to subscribing to a
`UpdateTree X`
topic.
Or, the client might want to listen to progress of an async task with
id
`X`
, then it's topic
`UpdateWorkerProgress JobInfo`
.
When a message is forwarded to the dispatcher (preferably from the
central exchange), the dispatcher then associates that message with
all subscriptions (see
`G.C.N.Dispatcher`
->
`ceMessageSubPred`
and
the filtering associated with that predicate function) and sends an
appropriate message to them.
## WebSockets
In theory, clients could subscribe to the dispatcher by different
means, but currently the only implemented way is by connecting to the
websocket server (which lies under
`/ws`
API endpoint).
The implementation of the websocket server lies under
`G.C.N.Dispatcher.WebSocket`
. It is closely related with the
`Subscription`
datatype. Each incoming connection is registered as a
separate
`Subscription`
. A user can optionally authenticate himself,
his subscription will then be "updated" to allow to listen for node id
changes that he has permissions to (this is still a bit on TODO).
A disconnection results in a removed subscription.
There are also
`subscribe`
and
`unsubscribe`
requests, see the
`WSRequest`
datatype in
`G.C.N.Dispatcher.Types`
for possible requests
that you can make to the websocket endpoint.
When the dispatcher sends a message, it reuses that websocket
connection (it's stored under the
`Subscription`
datatype).
**Important**
Because dispatcher is so closely related to the
websockets (i.e. it reuses the same connection), it has to run in the
same process as websockets (in a different thread). Our API is
monolithic, i.e. all services are bundled together into one
`gargantext-server`
process. However, if needed, we could split the
API from the websocket/dispatcher and have 2 different linux services
running.
## Overall architecture remarks
All connections to central exchange and dispatcher are made via
[
nanomsg
](
https://nanomsg.org/
)
. You can think of nanomsg as
"upgraded" TCP sockets. nanomsg helps us, in particular, in
reconnection and buffering. This is an implementation detail and if
we're not satisfied with how it works, we could change to something
different, e.g. the
`LISTEN`
/
`NOTIFY`
mechanism in postgresql (we use
postgresql anyways).
My main idea in chosing nanomsg was having something reliable, which
at the same time, didn't require maintenance (e.g. setting up another
service just for notifications).
## Configuration
## Configuration
See
`gargantex-settings.toml_toModify`
. Overall, the config for
See
`gargantex-settings.toml_toModify`
. Overall, the config for
...
@@ -601,64 +455,3 @@ or `inproc` (this limits us to inter-process communication).
...
@@ -601,64 +455,3 @@ or `inproc` (this limits us to inter-process communication).
The
`bind`
part is for the server, the
`connect`
part is for the
The
`bind`
part is for the server, the
`connect`
part is for the
clients connecting to that server.
clients connecting to that server.
## Async workers
A particular use-case of notifications is the async worker mechanism
described above.
In particular, async workers don't use a storage backend to store task
progress and this has consequences for notifications.
You can't just ask the API: "give me all tasks for this node" anymore
(this was possible with servant-jobs). Instead, what the frontend
does, is it sends an "update tree" subscription for nodes in the tree.
Take a look at
`G.C.W.J.Types`
->
`getWorkerMNodeId`
: it is a function
that, given a job, returns a possible
`NodeId`
associated with that
task (not all tasks need a
`NodeId`
though).
If the dispatcher receives an
`UpdateWorkerProgress`
message, it
checks the associated
`NodeId`
and finds
`UpdateTree`
subscriptions
associated with that node id. It then sends 2 messages: one for the
"normal" subscription to the
`UpdateWorkerProgress`
topic (the
frontend needs to know the job id already) and another
`UpdateWorkerProgress`
to all those being subscribed to
`UpdateTree`
for that node id.
This way, the frontend is able to be informed of new jobs that appear
under a node, and render the progress bar accordingly.
This "push" approach has many benefits:
-
if server is idle, we don't keep polling for new jobs
-
when user refreshes the page, even when his jobs are running, the
frontend is able to quickly discover new jobs and there is no need
to store the jobs id in local storage as we did previously (it's
important though that jobs send a progress notification quite often,
otherwise the job looks stale and isn't that quickly discoverable)
-
we can propagate job progress not only to the user who created it,
but to all users having access to that node (so to all users of a
shared node, for example) without any effort
-
if the API server is restarted (and the workers do keep running in
the background), the frontend keeps reconnecting to WS. Upon
connection, all node subscriptions are resent and the user can pick
up worker progress quickly
Of course, everything has it's bad sides:
-
we have to watch progress updates and decide, based on
`remaining`
count, that the job is finished (however, async workers has the
`onJobFinish`
hook - c.f.
`G.C.Worker`
- and we could send a special
message about a finished job, if needed)
-
as noted above, the progress messages are sent twice (via
`UpdateTree`
and via
`UpdateWorkerProgres`
subscriptions)
-
you need to wait a while to discover running jobs and if they're
stale or don't report very often, you won't know what's being
processed currently
Still, I prefer the "push" approach, because it more closely mimicks
the "broker" design. Thing is, a broker is not a database: you should
just expect it to deliver your message and forget about it.
If need arises, we could create a postgresql table which stores task
progress/task results, but it's just another burden to keep up with
(consider failing tasks, repeated ones etc).
test/Test/API/GraphQL.hs
View file @
2d61da1c
...
@@ -7,7 +7,6 @@ module Test.API.GraphQL (
...
@@ -7,7 +7,6 @@ module Test.API.GraphQL (
tests
tests
)
where
)
where
import
Control.Monad
(
void
)
import
Gargantext.Core.Types.Individu
import
Gargantext.Core.Types.Individu
import
Prelude
import
Prelude
import
Servant.Auth.Client
()
import
Servant.Auth.Client
()
...
...
test/Test/Instances.hs
View file @
2d61da1c
...
@@ -157,7 +157,7 @@ instance Arbitrary Phylo.ComputeTimeHistory where
...
@@ -157,7 +157,7 @@ instance Arbitrary Phylo.ComputeTimeHistory where
instance
Arbitrary
Phylo
.
CorpusParser
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
CorpusParser
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
Filter
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
Filter
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
ListParser
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
ListParser
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
MaxClqueFilter
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
MaxCl
i
queFilter
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
Order
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
Order
where
arbitrary
=
genericArbitrary
-- The 'resize' ensure our tests won't take too long as
-- The 'resize' ensure our tests won't take too long as
-- we won't be generating very long lists.
-- we won't be generating very long lists.
...
@@ -176,7 +176,7 @@ instance Arbitrary Phylo.PhyloCounts where arbitrary = genericArbitrary
...
@@ -176,7 +176,7 @@ instance Arbitrary Phylo.PhyloCounts where arbitrary = genericArbitrary
instance
Arbitrary
Phylo
.
PhyloGroup
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloGroup
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloLabel
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloLabel
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloParam
where
instance
Arbitrary
Phylo
.
PhyloParam
where
arbitrary
=
Phylo
.
defaultPhyloParam
arbitrary
=
pure
Phylo
.
defaultPhyloParam
instance
Arbitrary
Phylo
.
PhyloPeriod
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloPeriod
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloScale
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloScale
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloSimilarity
where
arbitrary
=
genericArbitrary
instance
Arbitrary
Phylo
.
PhyloSimilarity
where
arbitrary
=
genericArbitrary
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment