From dca8cf013b2c0086539697ae6e4515a1c819092c Mon Sep 17 00:00:00 2001 From: David Lord Date: Wed, 8 Feb 2023 17:30:53 -0800 Subject: [PATCH] rewrite celery background tasks docs --- docs/patterns/celery.rst | 270 +++++++++++++++++++++++++++++---------- 1 file changed, 200 insertions(+), 70 deletions(-) diff --git a/docs/patterns/celery.rst b/docs/patterns/celery.rst index 228a04a8..a236f6dd 100644 --- a/docs/patterns/celery.rst +++ b/docs/patterns/celery.rst @@ -1,105 +1,235 @@ -Celery Background Tasks -======================= +Background Tasks with Celery +============================ -If your application has a long running task, such as processing some uploaded -data or sending email, you don't want to wait for it to finish during a -request. Instead, use a task queue to send the necessary data to another -process that will run the task in the background while the request returns -immediately. +If your application has a long running task, such as processing some uploaded data or +sending email, you don't want to wait for it to finish during a request. Instead, use a +task queue to send the necessary data to another process that will run the task in the +background while the request returns immediately. + +`Celery`_ is a powerful task queue that can be used for simple background tasks as well +as complex multi-stage programs and schedules. This guide will show you how to configure +Celery using Flask. Read Celery's `First Steps with Celery`_ guide to learn how to use +Celery itself. + +.. _Celery: https://celery.readthedocs.io +.. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html -Celery is a powerful task queue that can be used for simple background tasks -as well as complex multi-stage programs and schedules. This guide will show you -how to configure Celery using Flask, but assumes you've already read the -`First Steps with Celery `_ -guide in the Celery documentation. Install ------- -Celery is a separate Python package. Install it from PyPI using pip:: +Install Celery from PyPI, for example using pip: + +.. code-block:: text $ pip install celery -Configure ---------- -The first thing you need is a Celery instance, this is called the celery -application. It serves the same purpose as the :class:`~flask.Flask` -object in Flask, just for Celery. Since this instance is used as the -entry-point for everything you want to do in Celery, like creating tasks -and managing workers, it must be possible for other modules to import it. +Integrate Celery with Flask +--------------------------- -For instance you can place this in a ``tasks`` module. While you can use -Celery without any reconfiguration with Flask, it becomes a bit nicer by -subclassing tasks and adding support for Flask's application contexts and -hooking it up with the Flask configuration. +You can use Celery without any integration with Flask, but it's convenient to configure +it through Flask's config, and to let tasks access the Flask application. -This is all that is necessary to integrate Celery with Flask: +Celery uses similar ideas to Flask, with a ``Celery`` app object that has configuration +and registers tasks. While creating a Flask app, use the following code to create and +configure a Celery app as well. .. code-block:: python - from celery import Celery + from celery import Celery, Task - def make_celery(app): - celery = Celery(app.import_name) - celery.conf.update(app.config["CELERY_CONFIG"]) - - class ContextTask(celery.Task): - def __call__(self, *args, **kwargs): + def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): return self.run(*args, **kwargs) - celery.Task = ContextTask - return celery + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app -The function creates a new Celery object, configures it with the broker -from the application config, updates the rest of the Celery config from -the Flask config and then creates a subclass of the task that wraps the -task execution in an application context. +This creates and returns a ``Celery`` app object. Celery `configuration`_ is taken from +the ``CELERY`` key in the Flask configuration. The Celery app is set as the default, so +that it is seen during each request. The ``Task`` subclass automatically runs task +functions with a Flask app context active, so that services like your database +connections are available. -.. note:: - Celery 5.x deprecated uppercase configuration keys, and 6.x will - remove them. See their official `migration guide`_. +.. _configuration: https://celery.readthedocs.io/en/stable/userguide/configuration.html -.. _migration guide: https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-old-settings-map. +Here's a basic ``example.py`` that configures Celery to use Redis for communication. We +enable a result backend, but ignore results by default. This allows us to store results +only for tasks where we care about the result. -An example task ---------------- - -Let's write a task that adds two numbers together and returns the result. We -configure Celery's broker and backend to use Redis, create a ``celery`` -application using the factory from above, and then use it to define the task. :: +.. code-block:: python from flask import Flask - flask_app = Flask(__name__) - flask_app.config.update(CELERY_CONFIG={ - 'broker_url': 'redis://localhost:6379', - 'result_backend': 'redis://localhost:6379', - }) - celery = make_celery(flask_app) + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + celery_app = celery_init_app(app) - @celery.task() - def add_together(a, b): +Point the ``celery worker`` command at this and it will find the ``celery_app`` object. + +.. code-block:: text + + $ celery -A example worker --loglevel INFO + +You can also run the ``celery beat`` command to run tasks on a schedule. See Celery's +docs for more information about defining schedules. + +.. code-block:: text + + $ celery -A example beat --loglevel INFO + + +Application Factory +------------------- + +When using the Flask application factory pattern, call the ``celery_init_app`` function +inside the factory. It sets ``app.extensions["celery"]`` to the Celery app object, which +can be used to get the Celery app from the Flask app returned by the factory. + +.. code-block:: python + + def create_app() -> Flask: + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + app.config.from_prefixed_env() + celery_init_app(app) + return app + +To use ``celery`` commands, Celery needs an app object, but that's no longer directly +available. Create a ``make_celery.py`` file that calls the Flask app factory and gets +the Celery app from the returned Flask app. + +.. code-block:: python + + from example import create_app + + flask_app = create_app() + celery_app = flask_app.extensions["celery"] + +Point the ``celery`` command to this file. + +.. code-block:: text + + $ celery -A make_celery worker --loglevel INFO + $ celery -A make_celery beat --loglevel INFO + + +Defining Tasks +-------------- + +Using ``@celery_app.task`` to decorate task functions requires access to the +``celery_app`` object, which won't be available when using the factory pattern. It also +means that the decorated tasks are tied to the specific Flask and Celery app instances, +which could be an issue during testing if you change configuration for a test. + +Instead, use Celery's ``@shared_task`` decorator. This creates task objects that will +access whatever the "current app" is, which is a similar concept to Flask's blueprints +and app context. This is why we called ``celery_app.set_default()`` above. + +Here's an example task that adds two numbers together and returns the result. + +.. code-block:: python + + from celery import shared_task + + @shared_task(ignore_result=False) + def add_together(a: int, b: int) -> int: return a + b -This task can now be called in the background:: +Earlier, we configured Celery to ignore task results by default. Since we want to know +the return value of this task, we set ``ignore_result=False``. On the other hand, a task +that didn't need a result, such as sending an email, wouldn't set this. - result = add_together.delay(23, 42) - result.wait() # 65 -Run a worker ------------- +Calling Tasks +------------- -If you jumped in and already executed the above code you will be -disappointed to learn that ``.wait()`` will never actually return. -That's because you also need to run a Celery worker to receive and execute the -task. :: +The decorated function becomes a task object with methods to call it in the background. +The simplest way is to use the ``delay(*args, **kwargs)`` method. See Celery's docs for +more methods. - $ celery -A your_application.celery worker +A Celery worker must be running to run the task. Starting a worker is shown in the +previous sections. -The ``your_application`` string has to point to your application's package -or module that creates the ``celery`` object. +.. code-block:: python -Now that the worker is running, ``wait`` will return the result once the task -is finished. + from flask import request + + @app.post("/add") + def start_add() -> dict[str, object]: + a = request.form.get("a", type=int) + b = request.form.get("b", type=int) + result = add_together.delay(a, b) + return {"result_id": result.id} + +The route doesn't get the task's result immediately. That would defeat the purpose by +blocking the response. Instead, we return the running task's result id, which we can use +later to get the result. + + +Getting Results +--------------- + +To fetch the result of the task we started above, we'll add another route that takes the +result id we returned before. We return whether the task is finished (ready), whether it +finished successfully, and what the return value (or error) was if it is finished. + +.. code-block:: python + + from celery.result import AsyncResult + + @app.get("/result/") + def task_result(id: str) -> dict[str, object]: + result = AsyncResult(id) + return { + "ready": result.ready(), + "successful": result.successful(), + "value": result.result if result.ready() else None, + } + +Now you can start the task using the first route, then poll for the result using the +second route. This keeps the Flask request workers from being blocked waiting for tasks +to finish. + + +Passing Data to Tasks +--------------------- + +The "add" task above took two integers as arguments. To pass arguments to tasks, Celery +has to serialize them to a format that it can pass to other processes. Therefore, +passing complex objects is not recommended. For example, it would be impossible to pass +a SQLAlchemy model object, since that object is probably not serializable and is tied to +the session that queried it. + +Pass the minimal amount of data necessary to fetch or recreate any complex data within +the task. Consider a task that will run when the logged in user asks for an archive of +their data. The Flask request knows the logged in user, and has the user object queried +from the database. It got that by querying the database for a given id, so the task can +do the same thing. Pass the user's id rather than the user object. + +.. code-block:: python + + @shared_task + def generate_user_archive(user_id: str) -> None: + user = db.session.get(User, user_id) + ... + + generate_user_archive.delay(current_user.id)