Commit ce469a9d authored by sim's avatar sim

wip

parent 2def0d25
import time
from django.conf import settings
from rest_framework.decorators import api_view, renderer_classes
from rest_framework.response import Response
from rest_framework.renderers import JSONRenderer
from gargantext.core.task import shared_task, get_task_logger, schedule
from gargantext.models import UserNode
from .celery_app import app
logger = get_task_logger(__name__)
def task_view(task, *args, **kwargs):
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def view(*_, **__):
r = schedule(task, args=args, kwargs=kwargs)
return Response({
"task": task.__name__,
"id": r.id,
})
return view
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def task_state(request, task_id):
r = app.AsyncResult(task_id)
result = r.result
return Response({
"id": r.id,
"failed": r.failed(),
"finished": r.ready(),
"successful": r.successful(),
"state": r.state,
"result": result if isinstance(result, (bool, str, int, float, dict, list, tuple, type(None))) else repr(r.result),
})
@shared_task(bind=True)
def dummy(self, duration=30):
logger.info('Start %r task (DEBUG=%r): wait %s seconds...' % (
self.name, settings.DEBUG, duration))
time.sleep(duration)
me = 1234 #request.db.query(UserNode).filter_by(user_id=request.user.id).one_or_none()
logger.info('End task %r.' % self.name)
return str(me)
@shared_task(bind=True)
def fail(self):
logger.info("Run failing task")
time.sleep(2)
raise Exception("Ay Caramba! Failed again!")
@shared_task(bind=True, default_retry_delay=30, max_retries=10)
def fail_random(self):
# Failing randomly
from random import random
p = random()
logger.info("Run randomly failing task (p=%s)" % p)
try:
if p < 2/3:
raise Exception("Fail fail fail")
except Exception as e:
self.retry()
@shared_task(bind=True)
def simulate_work(self):
logger.info("Start %r", self)
for i in range(10):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 1/(10-i)})
if i % 5 == 0:
logger.info("50%% done %r", self)
logger.info("Finished %r", self)
return "Done!"
@shared_task
def produce():
return "Bidule"
@shared_task
def process1(data):
return "{%s}" % data
@shared_task
def process2(data):
return "[%s]" % data
@shared_task
def workflow():
from celery import chord
#chain = produce.s("Bidule", 10) | process.map.s()
#r = chain.apply_async()
r = chord([ produce.s() for _ in range(10) ])(process1.s() | process2.s())
logger.info("Worflow: %r", r)
return r
......@@ -18,10 +18,19 @@ from django.contrib import admin
from django.views.generic.base import RedirectView as Redirect
from django.contrib.staticfiles.storage import staticfiles_storage as static
from rest_framework_jwt.views import obtain_jwt_token
from .tasks import task_state, task_view, dummy, fail, fail_random, simulate_work, workflow
from .views import projects_view
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^favicon.ico$', Redirect.as_view(url=static.url('favicon.ico'),
permanent=False), name="favicon"),
url(r'^api/auth/token$', obtain_jwt_token),
url(r'^projects$', projects_view),
url(r'^dummy$', task_view(dummy)),
url(r'^fail$', task_view(fail)),
url(r'^fail-random$', task_view(fail_random)),
url(r'^simulate-work$', task_view(simulate_work)),
url(r'^workflow$', task_view(workflow)),
url(r'^task/([-0-9A-z]+)/?$', task_state)
]
import logging
from sqlalchemy.orm.attributes import ScalarObjectAttributeImpl
from rest_framework.decorators import api_view, renderer_classes
from rest_framework.response import Response
from rest_framework.renderers import JSONRenderer
from rest_framework.views import exception_handler as _exception_handler
from ..models import ProjectNode
from .exceptions import base_exception_handler
logger = logging.getLogger(__name__)
def exception_handler(exc, context):
response = base_exception_handler(exc, context)
......@@ -11,3 +25,21 @@ def exception_handler(exc, context):
response.data['status_code'] = response.status_code
return response
# https://bitbucket.org/zzzeek/sqlalchemy/issues/3976/built-in-way-to-convert-an-orm-object-ie
from sqlalchemy import inspect
def orm_is_value(a):
return hasattr(a, 'key') and \
not isinstance(getattr(a, 'impl'), ScalarObjectAttributeImpl)
def orm_to_dict(obj):
attrs = inspect(type(obj)).all_orm_descriptors
return {a.key: getattr(obj, a.key) for a in attrs if orm_is_value(a)}
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def projects_view(request):
r = list(map(orm_to_dict, request.db.query(ProjectNode).all()))
return Response({'results': r})
from celery import shared_task
from celery.utils.log import get_task_logger
__all__ = ['shared_task', 'get_task_logger', 'schedule']
def schedule(task, when=None, args=None, kwargs=None):
return task.apply_async(args=args, kwargs=kwargs)
......@@ -8,7 +8,7 @@ from .base import Base, Column, ForeignKey, relationship, TypeDecorator, Index,
MutableList, MutableDict, validates, ValidatorMixin, text
from .users import User
__all__ = ['Node', 'NodeNode', 'CorpusNode']
__all__ = ['Node', 'NodeNode', 'UserNode', 'ProjectNode', 'CorpusNode']
class NodeType(TypeDecorator):
"""Define a new type of column to describe a Node's type.
......@@ -221,6 +221,18 @@ class Node(ValidatorMixin, Base):
return self['statuses'][-1]
class UserNode(Node):
__mapper_args__ = {
'polymorphic_identity': 'USER'
}
class ProjectNode(Node):
__mapper_args__ = {
'polymorphic_identity': 'PROJECT'
}
class CorpusNode(Node):
__mapper_args__ = {
'polymorphic_identity': 'CORPUS'
......
......@@ -211,6 +211,12 @@ CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_TIMEZONE = TIME_ZONE
CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_IMPORTS = ()
# See: https://wiredcraft.com/blog/3-gotchas-for-celery/
# And: http://docs.celeryproject.org/en/3.1/userguide/optimizing.html#optimizing-prefetch-limit
#CELERY_ACKS_LATE = True
#CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_TRACK_STARTED = True
CELERY_RESULT_BACKEND = 'rpc://'
# REST-API
......
......@@ -41,7 +41,7 @@ celery() {
( log_progress_msg "[$(tail -1 $ERROR_LOG)]" && return 2 )
fi
$DAEMON multi $1 1 -A "$CELERY_APP" -B -E \
$DAEMON multi $1 gargantext -A "$CELERY_APP" -B -E \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" 2> $ERROR_LOG 1> /dev/null
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment