Celery taskcls: new decorator, new features

Hello, Habr! I will tell you the story of my professional burnout.







It so happened that I can not stand the routine of monotonous actions. I have several projects using Celery . Each time a task becomes more complicated than 2 + 2 = 5



, the solution template is reduced to creating a class that performs the task, and a starter function that Celery is able to work with - a boilerplate. In this article I will tell you how I struggled with a boilerplate, and what came of it.







Logo







The starting point



Consider the ordinary task of Celery. There is a class that performs the task, and a starter function that instantiates the class and starts one of its methods, in which all the logic of the task is implemented and error handling is inherited:







 class MyTask( FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed) @app.task(bind=True) def my_task(self, arg1, arg2): instance = MyTask( celery_task=self, arg1=arg1, arg2=arg2, ) return instance.full_task()
      
      





In this case, the full_task



method includes a call to main



, however, it also deals with error handling, logging and other nonsense that is not directly related to the main task.







Taskclass idea



At the root of the taskclass lies a simple idea: in the base class, you can define a method of the task



class, implement the behavior of the starter function in it, and then inherit:







 class BaseTask: def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task()
      
      





All auxiliary boredom collected in the base class. We will not return to her again. We realize the logic of the task:







 @app.taskcls(bind=True) class MyTask( BaseTask, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): data = self.do_something() response = self.remote_call(data) parsed = self.parser(response) return self.process(parsed)
      
      





No more husk, much better already. However, what about the entry point?







 MyTask.task.delay(...)
      
      





MyTask.task



has all the usual task methods: delay



, apply_async



, and, generally speaking, it is.







Now the arguments of the decorator. It is especially fun to drag bind = True



into each task. Is it possible to pass default arguments through a base class?







 class BaseTask: class MetaTask: bind = True def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) def full_task(self): try: return self.main() except: self.celery_task.retry(countdown=30) @classmethod def task(cls, task, **kwargs): self = cls( celery_task=celery_task, **kwargs, ) return self.full_task()
      
      





The nested MetaTask



class contains default arguments and will be available to all child classes. Interestingly, it can also be inherited:







 class BaseHasTimeout(BaseTask): class MetaTask(BaseTask.MetaTask): timeout = 42
      
      





Arguments passed to the @app.taskcls



decorator have the highest priority:







 @app.taskcls(timeout=20) class MyTask( BaseHasTimeout, FirstMixin, SecondMixin, ThirdMixin, ): def main(self): ...
      
      





As a result, the timeout for the task will be 20.







Going beyond



In web applications, there is often a need to start a task from view. In case of high adhesion, view and task can be combined:







 class BaseViewTask: @classmethod def task(cls, **kwargs): # Somehow init View class manually self = cls(...) return self.celery() @app.taskcls class MyView( BaseViewTask, FirstMixin, SecondMixin, ThirdMixin, APIView, ): queryset = MyModel.objects.all() def get_some_data(self, *args, **kwargs): # common methed return self.queryset.filtert(...) def get(self, request): data = self.get_some_data(request.field) # used in request handling return Response(json.dumps(data)) def post(self, request): self.task.delay(...) return Response(status=201) def celery(self): data = self.get_some_data(...) # also used in background task return self.do_something(data)
      
      





By the way, to avoid collisions of names, the nested class is called MetaTask



, not Meta



, as in django.







Conclusion



This functionality is expected in Celery 4.5 . However, I also prepared a package that allows you to try out the taskcls decorator today. The idea of ​​the package is that when you upgrade Celery to version 4.5, you can remove its import without changing a single line of code.








All Articles