Getting Started
Part 1, Chapter 3
Objectives
- Set up Celery with Flask
- Execute Celery tasks in the Flask shell
- Monitor a Celery app with Flower
Setting up Redis
You can set up and run Redis directly from your operating system or from a Docker container. While you don't have to go the Docker route in this chapter, we'll use Docker a lot in later chapters, so it's highly recommend to stick with Docker in this chapter as well.
With Docker
Start by installing Docker if you haven't already done so. Then, open your terminal and run the following command:
$ docker run -p 6379:6379 --name some-redis -d redis
This downloads the official Redis Docker image from Docker Hub and runs it on port 6379 in the background.
To test if Redis is up and running, run:
$ docker exec -it some-redis redis-cli ping
You should see:
PONG
Without Docker
Either download Redis from source or via a package manager (like APT, YUM, Homebrew, or Chocolatey) and then start the Redis server via:
$ redis-server
To test if Redis is up and running, run:
$ redis-cli ping
You should see:
PONG
Next, we'll look at how to set up Celery in a Flask project.
Setting up Celery
Create a Flask project
Create a new project directory:
$ mkdir flask-celery-project && cd flask-celery-project
Then, create and activate a new Python virtual environment:
$ python3.11 -m venv venv
$ source venv/bin/activate
(venv)$
Feel free to swap out virtualenv and Pip for Poetry or Pipenv. For more, review Modern Python Environments.
Add Flask
to a requirements.txt file:
Flask==3.0.3
Install:
(venv)$ pip install -r requirements.txt
Create a new file called app.py:
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello, World!"
Run the app:
(venv)$ FLASK_APP=app.py flask run
* Serving Flask app 'app.py'
* Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
Visit http://localhost:5000 in your browser. You should see Hello, World!
.
Press Ctrl+C
to terminate the development server.
Project structure thus far:
├── app.py
└── requirements.txt
Add Celery
Next, let's install and configure Celery.
Update requirements.txt, adding redis-py and Celery:
celery==5.4.0
redis==5.0.8
Install:
(venv)$ pip install -r requirements.txt
Update app.py:
from celery import Celery
from flask import Flask
app = Flask(__name__)
celery = Celery(
__name__,
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
@app.route("/")
def hello():
return "Hello, World!"
@celery.task
def divide(x, y):
import time
time.sleep(5)
return x / y
Notes:
- After creating a Flask instance, we created a new instance of Celery.
- The
broker
andbackend
tells Celery to use the Redis service we just launched. Rather than hard-coding these values, you can define them in a Flask config or pull them from environment variables. - We defined a Celery task called
divide
, which simulates a long-running task.
Sending a Task to Celery
With the config done, let's try sending a task to Celery to see how it works.
In a new terminal window, navigate to your project directory, activate the virtual environment, and then run:
(venv)$ celery -A app.celery worker --loglevel=info
You should see something similar to:
[config]
.> app: app:0x10a2a9a20
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.divide
Back in the first terminal window, run:
(venv)$ FLASK_APP=app.py flask shell
Let's send some tasks to the Celery worker:
>>> from app import divide
>>> task = divide.delay(1, 2)
What's happening?
- We used the
delay
method to send a new message to the message broker. The worker process then picked up and executed the task from the queue. - After releasing from the Enter key, the code finished executing while the
divide
task ran in the background.
Turn to the Celery worker terminal. You should see something similar to:
[2024-08-20 10:53:55,716: INFO/MainProcess] Task app.divide[a2766690-a2a3-4f5f-b90b-405f34c3b746] received
[2024-08-20 10:54:00,733: INFO/ForkPoolWorker-16] Task app.divide[a2766690-a2a3-4f5f-b90b-405f34c3b746] succeeded in 5.014849075996608s: 0.5
The worker process received the task at 10:53:55,716
. It took about five seconds for the task to start and finish.
Add another task or two. As you do this, picture the workflow in your head:
- The Celery client (the producer) adds a new task to the queue via the message broker.
- The Celery worker (the consumer) grabs the tasks from the queue, again, via the message broker.
- Once processed, results are stored in the result backend.
Add another new task:
>>> task = divide.delay(1, 2)
>>> type(task)
<class 'celery.result.AsyncResult'>
After we called the delay method, we get an AsyncResult
instance, which can be used to check the task state along with the return value or exception details.
Add a new task then print task.state
and task.result
:
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
SUCCESS 0.5
>>> print(task.state, task.result)
SUCCESS 0.5
What happens if there's an error?
>>> task = divide.delay(1, 0)
# wait a few seconds before checking the state and result
>>> task.state
'FAILURE'
>>> task.result
ZeroDivisionError('division by zero')
Monitoring Celery with Flower
Flower is a real-time web application monitoring and administration tool for Celery.
Add the dependency to the requirements.txt file:
flower==2.0.1
Open a third terminal window, navigate to the project directory. Activate your virtual environment and then install Flower:
(venv)$ pip install -r requirements.txt
Once installed, spin up the server:
(venv)$ celery -A app.celery flower --port=5555
Navigate to http://localhost:5555 in your browser of choice to view the dashboard. Click "Tasks" in the nav bar at the top to view the finished tasks.
In the first terminal window, run a few more tasks, making sure you have at least one that will fail:
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 0)
>>> task = divide.delay(1, 2)
>>> task = divide.delay(1, 3)
Back in Flower you should see:
Take note of the UUID
column. This is the id
of AsyncResult
. Copy the UUID for the failed task and open the terminal window where the Flask shell is running to view the details:
>>> from celery.result import AsyncResult
>>> task = AsyncResult('b09f126b-5c68-4695-a0f8-da77c88e9c0d') # replace with your UUID
>>>
>>> task.state
'FAILURE'
>>>
>>> task.result
ZeroDivisionError('division by zero')
Familiarize yourself a bit with the Flower dashboard. It's a powerful tool that can help make it easier to learn Celery since you can get feedback much quicker than from the terminal.
✓ Mark as Completed