From dca8cf013b2c0086539697ae6e4515a1c819092c Mon Sep 17 00:00:00 2001 From: David Lord Date: Wed, 8 Feb 2023 17:30:53 -0800 Subject: [PATCH 1/2] rewrite celery background tasks docs --- docs/patterns/celery.rst | 270 +++++++++++++++++++++++++++++---------- 1 file changed, 200 insertions(+), 70 deletions(-) diff --git a/docs/patterns/celery.rst b/docs/patterns/celery.rst index 228a04a8..a236f6dd 100644 --- a/docs/patterns/celery.rst +++ b/docs/patterns/celery.rst @@ -1,105 +1,235 @@ -Celery Background Tasks -======================= +Background Tasks with Celery +============================ -If your application has a long running task, such as processing some uploaded -data or sending email, you don't want to wait for it to finish during a -request. Instead, use a task queue to send the necessary data to another -process that will run the task in the background while the request returns -immediately. +If your application has a long running task, such as processing some uploaded data or +sending email, you don't want to wait for it to finish during a request. Instead, use a +task queue to send the necessary data to another process that will run the task in the +background while the request returns immediately. + +`Celery`_ is a powerful task queue that can be used for simple background tasks as well +as complex multi-stage programs and schedules. This guide will show you how to configure +Celery using Flask. Read Celery's `First Steps with Celery`_ guide to learn how to use +Celery itself. + +.. _Celery: https://celery.readthedocs.io +.. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html -Celery is a powerful task queue that can be used for simple background tasks -as well as complex multi-stage programs and schedules. This guide will show you -how to configure Celery using Flask, but assumes you've already read the -`First Steps with Celery `_ -guide in the Celery documentation. Install ------- -Celery is a separate Python package. Install it from PyPI using pip:: +Install Celery from PyPI, for example using pip: + +.. code-block:: text $ pip install celery -Configure ---------- -The first thing you need is a Celery instance, this is called the celery -application. It serves the same purpose as the :class:`~flask.Flask` -object in Flask, just for Celery. Since this instance is used as the -entry-point for everything you want to do in Celery, like creating tasks -and managing workers, it must be possible for other modules to import it. +Integrate Celery with Flask +--------------------------- -For instance you can place this in a ``tasks`` module. While you can use -Celery without any reconfiguration with Flask, it becomes a bit nicer by -subclassing tasks and adding support for Flask's application contexts and -hooking it up with the Flask configuration. +You can use Celery without any integration with Flask, but it's convenient to configure +it through Flask's config, and to let tasks access the Flask application. -This is all that is necessary to integrate Celery with Flask: +Celery uses similar ideas to Flask, with a ``Celery`` app object that has configuration +and registers tasks. While creating a Flask app, use the following code to create and +configure a Celery app as well. .. code-block:: python - from celery import Celery + from celery import Celery, Task - def make_celery(app): - celery = Celery(app.import_name) - celery.conf.update(app.config["CELERY_CONFIG"]) - - class ContextTask(celery.Task): - def __call__(self, *args, **kwargs): + def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): return self.run(*args, **kwargs) - celery.Task = ContextTask - return celery + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app -The function creates a new Celery object, configures it with the broker -from the application config, updates the rest of the Celery config from -the Flask config and then creates a subclass of the task that wraps the -task execution in an application context. +This creates and returns a ``Celery`` app object. Celery `configuration`_ is taken from +the ``CELERY`` key in the Flask configuration. The Celery app is set as the default, so +that it is seen during each request. The ``Task`` subclass automatically runs task +functions with a Flask app context active, so that services like your database +connections are available. -.. note:: - Celery 5.x deprecated uppercase configuration keys, and 6.x will - remove them. See their official `migration guide`_. +.. _configuration: https://celery.readthedocs.io/en/stable/userguide/configuration.html -.. _migration guide: https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-old-settings-map. +Here's a basic ``example.py`` that configures Celery to use Redis for communication. We +enable a result backend, but ignore results by default. This allows us to store results +only for tasks where we care about the result. -An example task ---------------- - -Let's write a task that adds two numbers together and returns the result. We -configure Celery's broker and backend to use Redis, create a ``celery`` -application using the factory from above, and then use it to define the task. :: +.. code-block:: python from flask import Flask - flask_app = Flask(__name__) - flask_app.config.update(CELERY_CONFIG={ - 'broker_url': 'redis://localhost:6379', - 'result_backend': 'redis://localhost:6379', - }) - celery = make_celery(flask_app) + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + celery_app = celery_init_app(app) - @celery.task() - def add_together(a, b): +Point the ``celery worker`` command at this and it will find the ``celery_app`` object. + +.. code-block:: text + + $ celery -A example worker --loglevel INFO + +You can also run the ``celery beat`` command to run tasks on a schedule. See Celery's +docs for more information about defining schedules. + +.. code-block:: text + + $ celery -A example beat --loglevel INFO + + +Application Factory +------------------- + +When using the Flask application factory pattern, call the ``celery_init_app`` function +inside the factory. It sets ``app.extensions["celery"]`` to the Celery app object, which +can be used to get the Celery app from the Flask app returned by the factory. + +.. code-block:: python + + def create_app() -> Flask: + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + app.config.from_prefixed_env() + celery_init_app(app) + return app + +To use ``celery`` commands, Celery needs an app object, but that's no longer directly +available. Create a ``make_celery.py`` file that calls the Flask app factory and gets +the Celery app from the returned Flask app. + +.. code-block:: python + + from example import create_app + + flask_app = create_app() + celery_app = flask_app.extensions["celery"] + +Point the ``celery`` command to this file. + +.. code-block:: text + + $ celery -A make_celery worker --loglevel INFO + $ celery -A make_celery beat --loglevel INFO + + +Defining Tasks +-------------- + +Using ``@celery_app.task`` to decorate task functions requires access to the +``celery_app`` object, which won't be available when using the factory pattern. It also +means that the decorated tasks are tied to the specific Flask and Celery app instances, +which could be an issue during testing if you change configuration for a test. + +Instead, use Celery's ``@shared_task`` decorator. This creates task objects that will +access whatever the "current app" is, which is a similar concept to Flask's blueprints +and app context. This is why we called ``celery_app.set_default()`` above. + +Here's an example task that adds two numbers together and returns the result. + +.. code-block:: python + + from celery import shared_task + + @shared_task(ignore_result=False) + def add_together(a: int, b: int) -> int: return a + b -This task can now be called in the background:: +Earlier, we configured Celery to ignore task results by default. Since we want to know +the return value of this task, we set ``ignore_result=False``. On the other hand, a task +that didn't need a result, such as sending an email, wouldn't set this. - result = add_together.delay(23, 42) - result.wait() # 65 -Run a worker ------------- +Calling Tasks +------------- -If you jumped in and already executed the above code you will be -disappointed to learn that ``.wait()`` will never actually return. -That's because you also need to run a Celery worker to receive and execute the -task. :: +The decorated function becomes a task object with methods to call it in the background. +The simplest way is to use the ``delay(*args, **kwargs)`` method. See Celery's docs for +more methods. - $ celery -A your_application.celery worker +A Celery worker must be running to run the task. Starting a worker is shown in the +previous sections. -The ``your_application`` string has to point to your application's package -or module that creates the ``celery`` object. +.. code-block:: python -Now that the worker is running, ``wait`` will return the result once the task -is finished. + from flask import request + + @app.post("/add") + def start_add() -> dict[str, object]: + a = request.form.get("a", type=int) + b = request.form.get("b", type=int) + result = add_together.delay(a, b) + return {"result_id": result.id} + +The route doesn't get the task's result immediately. That would defeat the purpose by +blocking the response. Instead, we return the running task's result id, which we can use +later to get the result. + + +Getting Results +--------------- + +To fetch the result of the task we started above, we'll add another route that takes the +result id we returned before. We return whether the task is finished (ready), whether it +finished successfully, and what the return value (or error) was if it is finished. + +.. code-block:: python + + from celery.result import AsyncResult + + @app.get("/result/") + def task_result(id: str) -> dict[str, object]: + result = AsyncResult(id) + return { + "ready": result.ready(), + "successful": result.successful(), + "value": result.result if result.ready() else None, + } + +Now you can start the task using the first route, then poll for the result using the +second route. This keeps the Flask request workers from being blocked waiting for tasks +to finish. + + +Passing Data to Tasks +--------------------- + +The "add" task above took two integers as arguments. To pass arguments to tasks, Celery +has to serialize them to a format that it can pass to other processes. Therefore, +passing complex objects is not recommended. For example, it would be impossible to pass +a SQLAlchemy model object, since that object is probably not serializable and is tied to +the session that queried it. + +Pass the minimal amount of data necessary to fetch or recreate any complex data within +the task. Consider a task that will run when the logged in user asks for an archive of +their data. The Flask request knows the logged in user, and has the user object queried +from the database. It got that by querying the database for a given id, so the task can +do the same thing. Pass the user's id rather than the user object. + +.. code-block:: python + + @shared_task + def generate_user_archive(user_id: str) -> None: + user = db.session.get(User, user_id) + ... + + generate_user_archive.delay(current_user.id) From 3f195248dcd59f8eb08e282b7980dc04b97d7391 Mon Sep 17 00:00:00 2001 From: David Lord Date: Thu, 9 Feb 2023 10:50:57 -0800 Subject: [PATCH 2/2] add celery example --- docs/patterns/celery.rst | 7 ++ examples/celery/README.md | 27 +++++ examples/celery/make_celery.py | 4 + examples/celery/pyproject.toml | 11 ++ examples/celery/requirements.txt | 56 +++++++++ examples/celery/src/task_app/__init__.py | 39 +++++++ examples/celery/src/task_app/tasks.py | 23 ++++ .../celery/src/task_app/templates/index.html | 108 ++++++++++++++++++ examples/celery/src/task_app/views.py | 38 ++++++ 9 files changed, 313 insertions(+) create mode 100644 examples/celery/README.md create mode 100644 examples/celery/make_celery.py create mode 100644 examples/celery/pyproject.toml create mode 100644 examples/celery/requirements.txt create mode 100644 examples/celery/src/task_app/__init__.py create mode 100644 examples/celery/src/task_app/tasks.py create mode 100644 examples/celery/src/task_app/templates/index.html create mode 100644 examples/celery/src/task_app/views.py diff --git a/docs/patterns/celery.rst b/docs/patterns/celery.rst index a236f6dd..2e9a43a7 100644 --- a/docs/patterns/celery.rst +++ b/docs/patterns/celery.rst @@ -14,6 +14,10 @@ Celery itself. .. _Celery: https://celery.readthedocs.io .. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html +The Flask repository contains `an example `_ +based on the information on this page, which also shows how to use JavaScript to submit +tasks and poll for progress and results. + Install ------- @@ -209,6 +213,9 @@ Now you can start the task using the first route, then poll for the result using second route. This keeps the Flask request workers from being blocked waiting for tasks to finish. +The Flask repository contains `an example `_ +using JavaScript to submit tasks and poll for progress and results. + Passing Data to Tasks --------------------- diff --git a/examples/celery/README.md b/examples/celery/README.md new file mode 100644 index 00000000..91782019 --- /dev/null +++ b/examples/celery/README.md @@ -0,0 +1,27 @@ +Background Tasks with Celery +============================ + +This example shows how to configure Celery with Flask, how to set up an API for +submitting tasks and polling results, and how to use that API with JavaScript. See +[Flask's documentation about Celery](https://flask.palletsprojects.com/patterns/celery/). + +From this directory, create a virtualenv and install the application into it. Then run a +Celery worker. + +```shell +$ python3 -m venv .venv +$ . ./.venv/bin/activate +$ pip install -r requirements.txt && pip install -e . +$ celery -A make_celery worker --loglevel INFO +``` + +In a separate terminal, activate the virtualenv and run the Flask development server. + +```shell +$ . ./.venv/bin/activate +$ flask -A task_app --debug run +``` + +Go to http://localhost:5000/ and use the forms to submit tasks. You can see the polling +requests in the browser dev tools and the Flask logs. You can see the tasks submitting +and completing in the Celery logs. diff --git a/examples/celery/make_celery.py b/examples/celery/make_celery.py new file mode 100644 index 00000000..f7d138e6 --- /dev/null +++ b/examples/celery/make_celery.py @@ -0,0 +1,4 @@ +from task_app import create_app + +flask_app = create_app() +celery_app = flask_app.extensions["celery"] diff --git a/examples/celery/pyproject.toml b/examples/celery/pyproject.toml new file mode 100644 index 00000000..88ba6b96 --- /dev/null +++ b/examples/celery/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "flask-example-celery" +version = "1.0.0" +description = "Example Flask application with Celery background tasks." +readme = "README.md" +requires-python = ">=3.7" +dependencies = ["flask>=2.2.2", "celery[redis]>=5.2.7"] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" diff --git a/examples/celery/requirements.txt b/examples/celery/requirements.txt new file mode 100644 index 00000000..b2834013 --- /dev/null +++ b/examples/celery/requirements.txt @@ -0,0 +1,56 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile pyproject.toml +# +amqp==5.1.1 + # via kombu +async-timeout==4.0.2 + # via redis +billiard==3.6.4.0 + # via celery +celery[redis]==5.2.7 + # via flask-example-celery (pyproject.toml) +click==8.1.3 + # via + # celery + # click-didyoumean + # click-plugins + # click-repl + # flask +click-didyoumean==0.3.0 + # via celery +click-plugins==1.1.1 + # via celery +click-repl==0.2.0 + # via celery +flask==2.2.2 + # via flask-example-celery (pyproject.toml) +itsdangerous==2.1.2 + # via flask +jinja2==3.1.2 + # via flask +kombu==5.2.4 + # via celery +markupsafe==2.1.2 + # via + # jinja2 + # werkzeug +prompt-toolkit==3.0.36 + # via click-repl +pytz==2022.7.1 + # via celery +redis==4.5.1 + # via celery +six==1.16.0 + # via click-repl +vine==5.0.0 + # via + # amqp + # celery + # kombu +wcwidth==0.2.6 + # via prompt-toolkit +werkzeug==2.2.2 + # via flask diff --git a/examples/celery/src/task_app/__init__.py b/examples/celery/src/task_app/__init__.py new file mode 100644 index 00000000..dafff8aa --- /dev/null +++ b/examples/celery/src/task_app/__init__.py @@ -0,0 +1,39 @@ +from celery import Celery +from celery import Task +from flask import Flask +from flask import render_template + + +def create_app() -> Flask: + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + app.config.from_prefixed_env() + celery_init_app(app) + + @app.route("/") + def index() -> str: + return render_template("index.html") + + from . import views + + app.register_blueprint(views.bp) + return app + + +def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app diff --git a/examples/celery/src/task_app/tasks.py b/examples/celery/src/task_app/tasks.py new file mode 100644 index 00000000..b6b3595d --- /dev/null +++ b/examples/celery/src/task_app/tasks.py @@ -0,0 +1,23 @@ +import time + +from celery import shared_task +from celery import Task + + +@shared_task(ignore_result=False) +def add(a: int, b: int) -> int: + return a + b + + +@shared_task() +def block() -> None: + time.sleep(5) + + +@shared_task(bind=True, ignore_result=False) +def process(self: Task, total: int) -> object: + for i in range(total): + self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total}) + time.sleep(1) + + return {"current": total, "total": total} diff --git a/examples/celery/src/task_app/templates/index.html b/examples/celery/src/task_app/templates/index.html new file mode 100644 index 00000000..4e1145cb --- /dev/null +++ b/examples/celery/src/task_app/templates/index.html @@ -0,0 +1,108 @@ + + + + + Celery Example + + +

Celery Example

+Execute background tasks with Celery. Submits tasks and shows results using JavaScript. + +
+

Add

+

Start a task to add two numbers, then poll for the result. +

+
+
+ +
+

Result:

+ +
+

Block

+

Start a task that takes 5 seconds. However, the response will return immediately. +

+ +
+

+ +
+

Process

+

Start a task that counts, waiting one second each time, showing progress. +

+
+ +
+

+ + + + diff --git a/examples/celery/src/task_app/views.py b/examples/celery/src/task_app/views.py new file mode 100644 index 00000000..99cf92dc --- /dev/null +++ b/examples/celery/src/task_app/views.py @@ -0,0 +1,38 @@ +from celery.result import AsyncResult +from flask import Blueprint +from flask import request + +from . import tasks + +bp = Blueprint("tasks", __name__, url_prefix="/tasks") + + +@bp.get("/result/") +def result(id: str) -> dict[str, object]: + result = AsyncResult(id) + ready = result.ready() + return { + "ready": ready, + "successful": result.successful() if ready else None, + "value": result.get() if ready else result.result, + } + + +@bp.post("/add") +def add() -> dict[str, object]: + a = request.form.get("a", type=int) + b = request.form.get("b", type=int) + result = tasks.add.delay(a, b) + return {"result_id": result.id} + + +@bp.post("/block") +def block() -> dict[str, object]: + result = tasks.block.delay() + return {"result_id": result.id} + + +@bp.post("/process") +def process() -> dict[str, object]: + result = tasks.process.delay(total=request.form.get("total", type=int)) + return {"result_id": result.id}