Asynchronous tasks are used to move intensive, time-consuming processes, which are prone to failure, to the background so a response can be returned immediately to the client.
This tutorial looks at how to integrate Celery, an asynchronous task queue, into the Python-based Falcon web framework. We'll also use Docker and Docker Compose to tie everything together. Finally, we'll look at how to test the Celery tasks with unit and integration tests.
Contents
Learning Objectives
By the end of this tutorial, you should be able to:
- Integrate Celery into a Falcon web app.
- Containerize Falcon, Celery, and Redis with Docker.
- Execute tasks in the background with a separate worker process.
- Save Celery logs to a file.
- Set up Flower to monitor and administer Celery jobs and workers.
- Test a Celery task with both unit and integration tests.
Background Tasks
Again, to improve user experience, long-running processes should be run outside the normal HTTP request/response flow, in a background process.
Examples:
- Sending confirmation emails
- Web scraping and crawling
- Analyzing data
- Image processing
- Producing daily reports
- Running machine learning models
As you're building out an app, try to distinguish tasks that should run during the request/response lifecycle, like CRUD operations, from those that should run in the background.
Falcon Framework
Falcon is a micro Python web framework that's great for creating back-end, RESTful APIs. Falcon feels much like Flask, but it's a lot faster in terms of both development and performance.
Falcon is a minimalist WSGI library for building speedy web APIs and app backends. We like to think of Falcon as the Dieter Rams of web frameworks.
When it comes to building HTTP APIs, other frameworks weigh you down with tons of dependencies and unnecessary abstractions. Falcon cuts to the chase with a clean design that embraces HTTP and the REST architectural style.
Be sure to review the official docs for more information.
Project Setup
Clone down the base project:
$ git clone https://github.com/testdrivenio/falcon-celery --branch base --single-branch
$ cd falcon-celery
Take a quick glance at the code as well as the project structure, and then spin up the app using Docker:
$ docker-compose up -d --build
This should only take a moment to build and run the images. Once done, the app should be live on http://localhost:8000/ping.
Ensure the tests pass:
$ docker-compose run web python test.py
.
----------------------------------------------------------------------
Ran 1 test in 0.001s
OK
Celery
Now comes the fun part -- adding Celery! Start by adding both Celery and Redis to the requirements.txt file:
celery==5.2.7
falcon==3.1.0
gunicorn==20.1.0
redis==4.3.4
Create a Task
Add a new file to the "project/app" directory called tasks.py:
# project/app/tasks.py
import os
from time import sleep
import celery
CELERY_BROKER = os.environ.get('CELERY_BROKER')
CELERY_BACKEND = os.environ.get('CELERY_BACKEND')
app = celery.Celery('tasks', broker=CELERY_BROKER, backend=CELERY_BACKEND)
@app.task
def fib(n):
sleep(2) # simulate slow computation
if n < 0:
return []
elif n == 0:
return [0]
elif n == 1:
return [0, 1]
else:
results = fib(n - 1)
results.append(results[-1] + results[-2])
return results
Here, we created a new instance of Celery and defined a new Celery task called fib
that calculates the fibonacci sequence from a given number.
Celery uses a message broker to facilitate communication between the Celery worker and the web application. Messages are added to the broker, which are then processed by the worker(s). Once done, the results are added to the backend.
Redis will be used as both the broker and backend. Add both Redis and a Celery worker to the docker-compose.yml file:
version: '3.8'
services:
web:
build: ./project
image: web
container_name: web
ports:
- 8000:8000
volumes:
- ./project:/usr/src/app
command: gunicorn -b 0.0.0.0:8000 app:app
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
image: web
volumes:
- ./project:/usr/src/app
command: celery -A app.tasks worker --loglevel=info
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:7-alpine
Add a new route handler to kick off the fib
task to __init__.py:
class CreateTask(object):
def on_post(self, req, resp):
raw_json = req.stream.read()
result = json.loads(raw_json)
task = fib.delay(int(result['number']))
resp.status = falcon.HTTP_200
result = {
'status': 'success',
'data': {
'task_id': task.id
}
}
resp.text = json.dumps(result)
Register the route:
app.add_route('/create', CreateTask())
Import the task:
from app.tasks import fib
Build the image and spin up the containers:
$ docker-compose up -d --build
Test:
$ curl -X POST http://localhost:8000/create \
-d '{"number":"4"}' \
-H "Content-Type: application/json"
You should see something like:
{
"status": "success",
"data": {
"task_id": "d935fa51-44ad-488f-b63d-6b0e178700a8"
}
}
Check Task Status
Next, add a new route handler to check the status of the task:
class CheckStatus(object):
def on_get(self, req, resp, task_id):
task_result = AsyncResult(task_id)
result = {'status': task_result.status, 'result': task_result.result}
resp.status = falcon.HTTP_200
resp.text = json.dumps(result)
Register the route:
app.add_route('/status/{task_id}', CheckStatus())
Import AsyncResult:
from celery.result import AsyncResult
Update the containers:
$ docker-compose up -d --build
Trigger a new task:
$ curl -X POST http://localhost:8000/create \
-d '{"number":"3"}' \
-H "Content-Type: application/json"
{
"status": "success",
"data": {
"task_id": "65a1c427-ee08-4fb1-9842-d0f90d081c54"
}
}
Then, use the returned task_id
to check the status:
$ curl http://localhost:8000/status/65a1c427-ee08-4fb1-9842-d0f90d081c54
{
"status": "SUCCESS", "result": [0, 1, 1, 2]
}
Logs
Update the celery
service so that Celery logs are dumped to a log file:
celery:
image: web
volumes:
- ./project:/usr/src/app
- ./project/logs:/usr/src/app/logs # add this line
command: celery -A app.tasks worker --loglevel=info --logfile=logs/celery.log # update this line
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Update:
$ docker-compose up -d --build
You should see the log file in logs/celery.log locally since we set up a volume:
[2022-11-15 17:44:31,471: INFO/MainProcess] Connected to redis://redis:6379/0
[2022-11-15 17:44:31,476: INFO/MainProcess] mingle: searching for neighbors
[2022-11-15 17:44:32,488: INFO/MainProcess] mingle: all alone
[2022-11-15 17:44:32,503: INFO/MainProcess] celery@80a00f0c917e ready.
[2022-11-15 17:44:32,569: INFO/MainProcess] Received task: app.tasks.fib[0b161c4d-5e1c-424a-ae9f-5c3e84de5043]
[2022-11-15 17:44:32,593: INFO/ForkPoolWorker-1] Task app.tasks.fib[0b161c4d-5e1c-424a-ae9f-5c3e84de5043] succeeded in 6.018030700040981s: [0, 1, 1, 2]
Flower
Flower is a real-time, web-based monitoring tool for Celery. You can monitor currently running tasks, increase or decrease the worker pool, view graphs and a number of statistics, to name a few.
Add it to requirements.txt:
celery==5.2.7
falcon==3.1.0
flower==1.2.0
gunicorn==20.1.0
redis==4.3.4
And then add the service to docker-compose.yml:
monitor:
image: web
ports:
- 5555:5555
command: celery flower -A app.tasks --port=5555 --broker=redis://redis:6379/0
environment:
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Test it out:
$ docker-compose up -d --build
Navigate to http://localhost:5555 to view the dashboard. You should see one worker ready to go:
Trigger a few more tasks:
Flow
Before writing any tests, let's take a step back and look at the overall workflow.
In essence, an HTTP POST request hits /create
. Within the route handler, a message is added to the broker, and the Celery worker process grabs it from the queue and processes the task. Meanwhile, the web application continues to execute and function properly, sending a response back to the client with a task ID. The client can then hit the /status/<TASK_ID>
endpoint with an HTTP GET request to check the status of the task.
Tests
Let's start with a unit test:
class TestCeleryTasks(unittest.TestCase):
def test_fib_task(self):
self.assertEqual(tasks.fib.run(-1), [])
self.assertEqual(tasks.fib.run(1), [0, 1])
self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])
Add the above test case to project/test.py, and then update the imports:
import unittest
from falcon import testing
from app import app, tasks
Run:
$ docker-compose run web python test.py
It should take about 20 seconds to run:
..
----------------------------------------------------------------------
Ran 2 tests in 20.038s
OK
It's worth noting that in the above asserts, we used the .run
method (rather than .delay
) to run the task directly without a Celery worker.
Want to mock out the Celery task?
class TestCeleryTasks(unittest.TestCase):
# def test_fib_task(self):
# self.assertEqual(tasks.fib.run(-1), [])
# self.assertEqual(tasks.fib.run(1), [0, 1])
# self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
# self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])
@patch('app.tasks.fib')
def test_mock_fib_task(self, mock_fib):
mock_fib.run.return_value = []
self.assertEqual(tasks.fib.run(-1), [])
mock_fib.run.return_value = [0, 1]
self.assertEqual(tasks.fib.run(1), [0, 1])
mock_fib.run.return_value = [0, 1, 1, 2]
self.assertEqual(tasks.fib.run(3), [0, 1, 1, 2])
mock_fib.run.return_value = [0, 1, 1, 2, 3, 5]
self.assertEqual(tasks.fib.run(5), [0, 1, 1, 2, 3, 5])
Add the import:
from unittest.mock import patch
$ docker-compose run web python test.py
..
----------------------------------------------------------------------
Ran 2 tests in 0.002s
OK
Much better!
You can also run a full integration test from outside the container by running the following script:
#!/bin/bash
# trigger jobs
test=`curl -X POST http://localhost:8000/create \
-d '{"number":"2"}' \
-H "Content-Type: application/json" \
-s \
| jq -r '.data.task_id'`
# get status
check=`curl http://localhost:8000/status/${test} -s | jq -r '.status'`
while [ "$check" != "SUCCESS" ]
do
check=`curl http://localhost:8000/status/${test} -s | jq -r '.status'`
echo $(curl http://localhost:8000/status/${test} -s)
done
Keep in mind that this is hitting the same broker and backend used in development. You may want to instantiate a new Celery app for testing:
app = celery.Celery('tests', broker=CELERY_BROKER, backend=CELERY_BACKEND)
Next Steps
Looking for some challenges?
- Spin up DigitalOcean and deploy this application across a number of droplets using Docker Swarm or Kubernetes.
- Add a basic client side with React, Angular, Vue, or just vanilla JavaScript. Allow an end user to kick off a new task. Set up a polling mechanism to check the status of a task as well.
Grab the code from the repo.