Automatically Retrying Failed Celery Tasks

Last updated December 11th, 2021

In this article, we'll look at how to automatically retry failed Celery tasks.

Django + Celery Series:

  1. Asynchronous Tasks with Django and Celery
  2. Handling Periodic Tasks in Django with Celery and Docker
  3. Automatically Retrying Failed Celery Tasks (this article!)
  4. Working with Celery and Database Transactions

Contents

Objectives

After reading, you should be able to:

  1. Retry a failed Celery task with both the retry method and a decorator argument
  2. Use exponential backoff when retrying a failed task
  3. Use a class-based task to reuse retry arguments

Celery Task

You can find the source code for this article on GitHub.

Let's assume we have a Celery task like this:

@shared_task
def task_process_notification():
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

In the real world this may call an internal or external third-party service. Regardless of the service, assume it's very unreliable, especially at peak periods. How can we handle failures?

It's worth noting that many Celery beginners get confused as to why some articles use app.task while others use shared_task. Well, shared_task lets you define Celery tasks without having to import the Celery instance, so it can make your task code more reusable.

Solution 1: Use a Try/Except Block

We can use a try/except block to catch the exception and raise retry:

@shared_task(bind=True)
def task_process_notification(self):
    try:
        if not random.choice([0, 1]):
            # mimic random error
            raise Exception()

        requests.post('https://httpbin.org/delay/5')
    except Exception as e:
        logger.error('exception raised, it would be retry after 5 seconds')
        raise self.retry(exc=e, countdown=5)

Notes:

  1. Since we set bind to True, this is a bound task, so the first argument to the task will always be the current task instance (self). Because of this, we can call self.retry to retry the failed task.
  2. Please remember to raise the exception returned by the self.retry method to make it work.
  3. By setting the countdown argument to 5, the task will retry after a 5 second delay.

Let's run the code below in the Python shell:

>>> from polls.tasks import task_process_notification
>>> task_process_notification.delay()

You should see output like this in your Celery worker terminal output:

Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Retry in 5s: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Retry in 5s: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] succeeded in 3.3638455480104312s: None

As you can see, the Celery task failed twice and succeeded the third time.

Solution 2: Task Retry Decorator

Celery 4.0 added built-in support for retrying, so you can let the exception bubble up and specify in the decorator how to handle it:

@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

Notes:

  1. autoretry_for takes a list/tuple of exception types that you'd like to retry for.
  2. retry_kwargs takes a dictionary of additional options for specifying how autoretries are executed. In the above example, the task will retry after a 5 second delay (via countdown) and it allows for a maximum of 7 retry attempts (via max_retries). Celery will stop retrying after 7 failed attempts and raise an exception.

Exponential Backoff

If your Celery task needs to send a request to a third-party service, it's a good idea to use exponential backoff to avoid overwhelming the service.

Celery supports this by default:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

In this example, the first retry should run after 1s, the following after 2s, the third one after 4s, the fourth after 8s, and so forth:

[02:09:59,014: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 1s: Exception()
[02:10:00,210: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 2s: Exception()
[02:10:02,291: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 4s: Exception()

You can also set retry_backoff to a number for use as a delay factor:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

Example:

[02:21:45,887: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 5s: Exception()
[02:21:55,170: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 10s: Exception()
[02:22:15,706: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 20s: Exception()
[02:22:55,450: INFO/ForkPoolWorker-6] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Retry in 40s: Exception()

By default, the exponential backoff will also introduce random jitter to avoid having all the tasks run at the same moment.

Randomness

When you build a custom retry strategy for your Celery task (which needs to send a request to another service), you should add some randomness to the delay calculation to prevent all tasks from being executed simultaneously resulting in a thundering herd.

Celery has you covered here as well with retry_jitter:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_jitter=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

This option is set to True by default, which helps prevent the thundering herd problem when you use Celery's built-in retry_backoff.

Task Base Class

If you find yourself writing the same retry arguments in your Celery task decorators, you can (as of Celery 4.4) define retry arguments in a base class, which you can then use as a base class in your Celery tasks:

class BaseTaskWithRetry(celery.Task):
    autoretry_for = (Exception, KeyError)
    retry_kwargs = {'max_retries': 5}
    retry_backoff = True


@shared_task(bind=True, base=BaseTaskWithRetry)
def task_process_notification(self):
    raise Exception()

So if you run the task in the Python shell, you would see the following:

[03:12:29,002: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 1s: Exception()
[03:12:30,445: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 2s: Exception()
[03:12:33,080: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 3s: Exception()

Conclusion

In this Celery article, we looked at how to automatically retry failed celery tasks.

Again, the source code for this article can be found on GitHub.

Thanks for your reading. If you have any questions, please feel free to contact me.

Django + Celery Series:

  1. Asynchronous Tasks with Django and Celery
  2. Handling Periodic Tasks in Django with Celery and Docker
  3. Automatically Retrying Failed Celery Tasks (this article!)
  4. Working with Celery and Database Transactions
Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.

Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.