Commit 9f6a0000 authored by sim's avatar sim

wip3

parent 7c296b9c
......@@ -13,6 +13,6 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'gargantext.settings')
from django.conf import settings #noqa
app = Celery('gargantext')
app = Celery('gargantext', task_cls='gargantext.core.task:Task')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
......@@ -3,56 +3,22 @@ import logging
from django.conf import settings
from rest_framework.decorators import api_view, renderer_classes
from rest_framework.response import Response
from rest_framework.renderers import JSONRenderer
from gargantext.core.task import shared_task, get_task_logger, schedule
from gargantext.core.task import shared_task, get_task_logger
from gargantext.models import UserNode
from .celery_app import app
logger = get_task_logger(__name__)
def task_view(task, *args, **kwargs):
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def view(request, **params):
logging.getLogger('gargantext').info('From view: %r', request.user)
r = schedule(task, args=args, kwargs=kwargs)
return Response({
"task": task.__name__,
"id": r.id,
})
return view
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def task_state(request, task_id):
r = app.AsyncResult(task_id)
result = r.result
return Response({
"id": r.id,
"failed": r.failed(),
"finished": r.ready(),
"successful": r.successful(),
"state": r.state,
"result": result if isinstance(result, (bool, str, int, float, dict, list, tuple, type(None))) else repr(r.result),
})
@shared_task(bind=True)
def dummy(self, duration=30):
def dummy(self, duration=15):
logger.info('Start %r task (DEBUG=%r): wait %s seconds...' % (
self.name, settings.DEBUG, duration))
logger.info('Dummy: %r', self.request)
me = self.db.query(UserNode).filter_by(user_id=self.user.id).one_or_none()
logger.info('User: %r', me)
time.sleep(duration)
me = 1234 #request.db.query(UserNode).filter_by(user_id=request.user.id).one_or_none()
logger.info('End task %r.' % self.name)
......@@ -88,7 +54,7 @@ def simulate_work(self):
for i in range(10):
time.sleep(1)
self.update_state(state="PROGRESS", meta={'progress': 1/(10-i)})
if i % 5 == 0:
if i > 0 and i % 5 == 0:
logger.info("50%% done %r", self)
logger.info("Finished %r", self)
......
......@@ -18,8 +18,8 @@ from django.contrib import admin
from django.views.generic.base import RedirectView as Redirect
from django.contrib.staticfiles.storage import staticfiles_storage as static
from rest_framework_jwt.views import obtain_jwt_token
from .tasks import task_state, task_view, dummy, fail, fail_random, simulate_work, workflow
from .views import projects_view
from .tasks import dummy, fail, fail_random, simulate_work, workflow
from .views import projects_view, task_state, task_view
urlpatterns = [
url(r'^admin/', admin.site.urls),
......
......@@ -7,10 +7,11 @@ from rest_framework.response import Response
from rest_framework.renderers import JSONRenderer
from rest_framework.views import exception_handler as _exception_handler
from ..models import ProjectNode
from gargantext.models import ProjectNode
from gargantext.core.task import schedule
from .exceptions import base_exception_handler
from .celery_app import app
logger = logging.getLogger(__name__)
......@@ -43,3 +44,38 @@ def projects_view(request):
r = list(map(orm_to_dict, request.db.query(ProjectNode).all()))
return Response({'results': r})
def task_view(task, *args, **kwargs):
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def view(request, **params):
logging.getLogger('gargantext').info('From view: %r', request.user)
r = schedule(task, user=request.user, args=args, kwargs=kwargs)
return Response({
"task": task.__name__,
"id": r.id,
})
return view
@api_view(['GET'])
@renderer_classes((JSONRenderer,))
def task_state(request, task_id):
r = app.AsyncResult(task_id)
result = r.result
json_value = (bool, str, int, float, dict, list, tuple, type(None))
return Response({
"id": r.id,
"failed": r.failed(),
"finished": r.ready(),
"successful": r.successful(),
"state": r.state,
"result": result if isinstance(result, json_value) else repr(r.result),
})
from celery import shared_task
from celery import shared_task, Task as TaskBase
from celery.utils.log import get_task_logger
from .db import Session
__all__ = ['shared_task', 'get_task_logger', 'schedule']
def schedule(task, when=None, args=None, kwargs=None):
return task.apply_async(args=args, kwargs=kwargs)
logger = get_task_logger(__name__)
def user_as_dict(u):
attrs = ['id', 'username', 'is_active', 'is_staff', 'is_superuser']
return {name: getattr(u, name) for name in attrs}
class objectview(object):
def __init__(self, d):
self.__dict__ = d
class Task(TaskBase):
_db = None
@property
def db(self):
if self.user is None:
raise ValueError("Database can't be used from a task unless it " \
"is associated with a user.")
if not self._db:
self._db = Session()
self._db.login(self.user)
return self._db
def after_return(self, *args, **kwargs):
if self._db:
self._db.remove()
def __call__(self, *args, **kwargs):
user_dict = self.request.get('user')
self.user = user_dict and objectview(user_dict)
return super().__call__(*args, **kwargs)
def schedule(task, when=None, user=None, args=None, kwargs=None):
headers = { 'user': user_as_dict(user) } if user is not None else {}
return task.apply_async(args=args, kwargs=kwargs, headers=headers)
......@@ -41,7 +41,7 @@ celery() {
( log_progress_msg "[$(tail -1 $ERROR_LOG)]" && return 2 )
fi
$DAEMON multi $1 gargantext -A "$CELERY_APP" -B -E \
$DAEMON multi $1 gargantext -A "$CELERY_APP" -E \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" 2> $ERROR_LOG 1> /dev/null
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment