Commit 7c296b9c authored by sim's avatar sim

wip2

parent ce469a9d
...@@ -19,7 +19,7 @@ class UserAuthMixin(object): ...@@ -19,7 +19,7 @@ class UserAuthMixin(object):
user, token = result user, token = result
logger.debug("%s: authenticate as %s", logger.debug("%s: authenticate as %s",
self.__class__.__name__, user.username or '<anon>') self.__class__.__name__, user.username or '<anon>', token)
# Authenticate user in database session # Authenticate user in database session
request.db.login(user) request.db.login(user)
......
import time import time
import logging
from django.conf import settings from django.conf import settings
...@@ -18,7 +19,8 @@ logger = get_task_logger(__name__) ...@@ -18,7 +19,8 @@ logger = get_task_logger(__name__)
def task_view(task, *args, **kwargs): def task_view(task, *args, **kwargs):
@api_view(['GET']) @api_view(['GET'])
@renderer_classes((JSONRenderer,)) @renderer_classes((JSONRenderer,))
def view(*_, **__): def view(request, **params):
logging.getLogger('gargantext').info('From view: %r', request.user)
r = schedule(task, args=args, kwargs=kwargs) r = schedule(task, args=args, kwargs=kwargs)
return Response({ return Response({
"task": task.__name__, "task": task.__name__,
...@@ -48,6 +50,7 @@ def dummy(self, duration=30): ...@@ -48,6 +50,7 @@ def dummy(self, duration=30):
logger.info('Start %r task (DEBUG=%r): wait %s seconds...' % ( logger.info('Start %r task (DEBUG=%r): wait %s seconds...' % (
self.name, settings.DEBUG, duration)) self.name, settings.DEBUG, duration))
logger.info('Dummy: %r', self.request)
time.sleep(duration) time.sleep(duration)
me = 1234 #request.db.query(UserNode).filter_by(user_id=request.user.id).one_or_none() me = 1234 #request.db.query(UserNode).filter_by(user_id=request.user.id).one_or_none()
...@@ -63,17 +66,20 @@ def fail(self): ...@@ -63,17 +66,20 @@ def fail(self):
raise Exception("Ay Caramba! Failed again!") raise Exception("Ay Caramba! Failed again!")
@shared_task(bind=True, default_retry_delay=30, max_retries=10) @shared_task(bind=True, autoretry_for=(Exception,), default_retry_delay=10,
retry_kwargs=dict(max_retries=3))
def fail_random(self): def fail_random(self):
# Failing randomly # Failing randomly
from random import random from random import random
p = random() p = random()
logger.info("Run randomly failing task (p=%s)" % p) logger.info("Run randomly failing task (p=%s)" % p)
try: if p < 2/3:
if p < 2/3: raise Exception("Fail fail fail")
raise Exception("Fail fail fail") #try:
except Exception as e: # if p < 2/3:
self.retry() # raise Exception("Fail fail fail")
#except Exception as e:
# self.retry()
@shared_task(bind=True) @shared_task(bind=True)
...@@ -91,6 +97,7 @@ def simulate_work(self): ...@@ -91,6 +97,7 @@ def simulate_work(self):
@shared_task @shared_task
def produce(): def produce():
time.sleep(0.1)
return "Bidule" return "Bidule"
@shared_task @shared_task
...@@ -98,8 +105,8 @@ def process1(data): ...@@ -98,8 +105,8 @@ def process1(data):
return "{%s}" % data return "{%s}" % data
@shared_task @shared_task
def process2(data): def reduce(many):
return "[%s]" % data return ':'.join(many)
@shared_task @shared_task
def workflow(): def workflow():
...@@ -108,7 +115,9 @@ def workflow(): ...@@ -108,7 +115,9 @@ def workflow():
#chain = produce.s("Bidule", 10) | process.map.s() #chain = produce.s("Bidule", 10) | process.map.s()
#r = chain.apply_async() #r = chain.apply_async()
r = chord([ produce.s() for _ in range(10) ])(process1.s() | process2.s()) #r = chord([ produce.s() for _ in range(10) ])(process1.s() | process2.s())
maps = (produce.s() | process1.s() for _ in range(20))
r = chord(maps)(reduce.s())
logger.info("Worflow: %r", r) logger.info("Worflow: %r", r)
......
...@@ -213,8 +213,8 @@ CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' ...@@ -213,8 +213,8 @@ CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_IMPORTS = () CELERY_IMPORTS = ()
# See: https://wiredcraft.com/blog/3-gotchas-for-celery/ # See: https://wiredcraft.com/blog/3-gotchas-for-celery/
# And: http://docs.celeryproject.org/en/3.1/userguide/optimizing.html#optimizing-prefetch-limit # And: http://docs.celeryproject.org/en/3.1/userguide/optimizing.html#optimizing-prefetch-limit
#CELERY_ACKS_LATE = True CELERY_ACKS_LATE = True
#CELERYD_PREFETCH_MULTIPLIER = 1 CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_TRACK_STARTED = True CELERY_TRACK_STARTED = True
CELERY_RESULT_BACKEND = 'rpc://' CELERY_RESULT_BACKEND = 'rpc://'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment