In this article, we'll look at how to automatically retry failed Celery tasks.
Django + Celery Series:
Contents
Objectives
After reading, you should be able to:
- Retry a failed Celery task with both the
retry
method and a decorator argument - Use exponential backoff when retrying a failed task
- Use a class-based task to reuse retry arguments
Celery Task
You can find the source code for this article on GitHub.
Let's assume we have a Celery task like this:
@shared_task
def task_process_notification():
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
In the real world this may call an internal or external third-party service. Regardless of the service, assume it's very unreliable, especially at peak periods. How can we handle failures?
It's worth noting that many Celery beginners get confused as to why some articles use
app.task
while others useshared_task
. Well,shared_task
lets you define Celery tasks without having to import the Celery instance, so it can make your task code more reusable.
Solution 1: Use a Try/Except Block
We can use a try/except block to catch the exception and raise retry
:
@shared_task(bind=True)
def task_process_notification(self):
try:
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
except Exception as e:
logger.error('exception raised, it would be retry after 5 seconds')
raise self.retry(exc=e, countdown=5)
Notes:
- Since we set
bind
toTrue
, this is a bound task, so the first argument to the task will always be the current task instance (self
). Because of this, we can callself.retry
to retry the failed task. - Please remember to
raise
the exception returned by theself.retry
method to make it work. - By setting the
countdown
argument to 5, the task will retry after a 5 second delay.
Let's run the code below in the Python shell:
>>> from polls.tasks import task_process_notification
>>> task_process_notification.delay()
You should see output like this in your Celery worker terminal output:
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Retry in 5s: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Retry in 5s: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] succeeded in 3.3638455480104312s: None
As you can see, the Celery task failed twice and succeeded the third time.
Solution 2: Task Retry Decorator
Celery 4.0 added built-in support for retrying, so you can let the exception bubble up and specify in the decorator how to handle it:
@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
Notes:
autoretry_for
takes a list/tuple of exception types that you'd like to retry for.retry_kwargs
takes a dictionary of additional options for specifying how autoretries are executed. In the above example, the task will retry after a 5 second delay (viacountdown
) and it allows for a maximum of 7 retry attempts (viamax_retries
). Celery will stop retrying after 7 failed attempts and raise an exception.
Exponential Backoff
If your Celery task needs to send a request to a third-party service, it's a good idea to use exponential backoff to avoid overwhelming the service.
Celery supports this by default:
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
In this example, the first retry should run after 1s, the following after 2s, the third one after 4s, the fourth after 8s, and so forth:
[02:09:59,014: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 1s: Exception()
[02:10:00,210: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 2s: Exception()
[02:10:02,291: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 4s: Exception()
You can also set retry_backoff
to a number for use as a delay factor:
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
Example:
[02:21:45,887: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 5s: Exception()
[02:21:55,170: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 10s: Exception()
[02:22:15,706: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 20s: Exception()
[02:22:55,450: INFO/ForkPoolWorker-6] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 40s: Exception()
By default, the exponential backoff will also introduce random jitter to avoid having all the tasks run at the same moment.
Randomness
When you build a custom retry strategy for your Celery task (which needs to send a request to another service), you should add some randomness to the delay calculation to prevent all tasks from being executed simultaneously resulting in a thundering herd.
Celery has you covered here as well with retry_jitter
:
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_jitter=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')
This option is set to True
by default, which helps prevent the thundering herd problem when you use Celery's built-in retry_backoff
.
Task Base Class
If you find yourself writing the same retry arguments in your Celery task decorators, you can (as of Celery 4.4) define retry arguments in a base class, which you can then use as a base class in your Celery tasks:
class BaseTaskWithRetry(celery.Task):
autoretry_for = (Exception, KeyError)
retry_kwargs = {'max_retries': 5}
retry_backoff = True
@shared_task(bind=True, base=BaseTaskWithRetry)
def task_process_notification(self):
raise Exception()
So if you run the task in the Python shell, you would see the following:
[03:12:29,002: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 1s: Exception()
[03:12:30,445: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 2s: Exception()
[03:12:33,080: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 3s: Exception()
Conclusion
In this Celery article, we looked at how to automatically retry failed celery tasks.
Again, the source code for this article can be found on GitHub.
Thanks for your reading. If you have any questions, please feel free to contact me.
Django + Celery Series: