Working with Celery and Django Database Transactions

Last updated June 20th, 2020

In this tutorial, we'll look at how to prevent a Celery task dependent on a Django database transaction from executing before the database commits the transaction. This is a fairly common issue.

Django + Celery Series:

  1. Asynchronous Tasks with Django and Celery
  2. Handling Periodic Tasks in Django with Celery and Docker
  3. Automatically Retrying Failed Celery Tasks
  4. Working with Celery and Database Transactions (this article!)

Contents

Objectives

After reading, you should be able to:

  1. Describe what a database transaction is and how to use it in Django
  2. Explain why you might get a DoesNotExist error in a Celery worker and how to solve it
  3. Prevent a task from executing before the database commits a transaction

What is a database transaction?

A database transaction is a unit of work that is either committed (applied to the database) or rolled back (undone from the database) as a unit.

Most databases use the following pattern:

  1. Begin the transaction.
  2. Execute a set of data manipulations and/or queries.
  3. If no error occurs, then commit the transaction.
  4. If an error occurs, then roll back the transaction.

As you can see, a transaction is a very useful way to keep your data far away from chaos.

How to use database transactions in Django

You can find the source code for this tutorial on Github. If you decide to use this code as you work through this post, keep in mind that the username must be unique. You could use a random username generator for testing purposes like Faker.

Let's first take a look at this Django view:

def test_view(request):
    user = User.objects.create_user('john', '[email protected]', 'johnpassword')
    logger.info(f'create user {user.pk}')
    raise Exception('test')

What happens when you visit this view?

Default behavior

Django's default behavior is to autocommit: Each query is directly committed to the database unless a transaction is active. In other words, with autocommit, each query starts a transaction and either commits or rolls back the transaction as well. If you have a view with three queries, then each will run one-by-one. If one fails, the other two will be committed.

So, in the above view, the exception is raised after the transaction is committed, creating the user john.

Explicit control

If you'd prefer to have more control over database transactions, you can override the default behavior with transaction.atomic. In this mode, before calling a view function, Django starts a transaction. If the response is produced without problems, Django commits the transaction. On the other hand, if the view produces an exception, Django rolls back the transaction. If you have three queries and one fails, then none of the queries will be committed.

So, let's re-write the view using transaction.atomic:

def transaction_test(request):
    with transaction.atomic():
        user = User.objects.create_user('john1', '[email protected]', 'johnpassword')
        logger.info(f'create user {user.pk}')
        raise Exception('force transaction to rollback')

Now the user create operation will rollback when the exception is raised, so the user will not be created in the end.

transaction.atomic is a very useful tool which can keep your data organized, especially when you need to manipulate data in models.

It can also used as decorator like so:

@transaction.atomic
def transaction_test2(request):
    user = User.objects.create_user('john1', '[email protected]', 'johnpassword')
    logger.info(f'create user {user.pk}')
    raise Exception('force transaction to rollback')

So if some error gets raised in the view, and we do not catch it, then the transaction would rollback.

If you want to use transaction.atomic for all view functions, you can set ATOMIC_REQUESTS to True in your Django settings file:

ATOMIC_REQUESTS=True

# or

DATABASES["default"]["ATOMIC_REQUESTS"] = True

You can then override the behavior so that the view runs in autocommit mode:

@transaction.non_atomic_requests

DoesNotExist exception

If you don't have a solid understanding of how Django manages database transactions, it can be quite confusing when you come across random database-related errors in a Celery worker.

Let's look at an example:

@transaction.atomic
def transaction_celery(request):
    username = random_username()
    user = User.objects.create_user(username, '[email protected]', 'johnpassword')
    logger.info(f'create user {user.pk}')
    task_send_welcome_email.delay(user.pk)

    time.sleep(1)
    return HttpResponse('test')

The task code looks like:

@shared_task()
def task_send_welcome_email(user_pk):
    user = User.objects.get(pk=user_pk)
    logger.info(f'send email to {user.email} {user.pk}')
  1. Since the view uses the transaction.atomic decorator, all database operations are only committed if an error isn't raised in the view, including the Celery task.
  2. The task is fairly simple: We create a user and then pass the primary key to the task to send a welcome email.
  3. time.sleep(1) is used to introduce a race condition.

When run, you will see the following error:

django.contrib.auth.models.User.DoesNotExist: User matching query does not exist.

Why?

  1. We pause for 1 second after enqueueing the task.
  2. Since the task executes immediately, user = User.objects.get(pk=user_pk) fails as the user is not in the database because the transaction in Django has not yet been committed.

Solution

There are three ways to solve this:

  1. Disable the database transaction, so Django would use the autocommit feature. To do so, you can simply remove the transaction.atomic decorator. However, this isn't recommended since the atomic database transaction is a powerful tool.

  2. Force the Celery task to run after a period of time.

    For example, to pause for 10 seconds:

    task_send_welcome_email.apply_async(args=[user.pk], countdown=10)
    
  3. Django has a callback function called transaction.on_commit that executes after a transaction successfully commits. To use this, update the view like so:

    @transaction.atomic
    def transaction_celery2(request):
        username = random_username()
        user = User.objects.create_user(username, '[email protected]', 'johnpassword')
        logger.info(f'create user {user.pk}')
        # the task does not get called until after the transaction is committed
        transaction.on_commit(lambda: task_send_welcome_email.delay(user.pk))
    
        time.sleep(1)
        return HttpResponse('test')
    

    Now, the task doesn't get called until after the database transaction commit. So, when the Celery worker finds the user, it can be found because the code in the worker always runs after the Django database transaction commits successfully.

    This is the recommended solution.

It's worth noting that you may not want your transaction to commit right away, especially if you're running in a high-scale environment. If either the database or instance are at high-utilization, forcing a commit will only add to the existing usage. In this case, you may want to use the second solution and wait for a sufficient amount of time (20 seconds, perhaps) to ensure that the changes are made to the database before the task executes.

Testing

Django's TestCase wraps each test in a transaction which is then rolled back after each test. Since no transactions are ever committed, on_commit() never runs either. So, if you need to test code fired in an on_commit callback, use TransactionTestCase in your test code.

Database transaction in a Celery task

If your Celery task needs to update a database record, it makes sense to use a database transaction in the Celery task.

One simple way is with transaction.atomic():

@shared_task()
def task_transaction_test():
    with transaction.atomic():
        from .views import random_username
        username = random_username()
        user = User.objects.create_user(username, '[email protected]', 'johnpassword')
        user.save()
        logger.info(f'send email to {user.pk}')
        raise Exception('test')

A better approach is to write a custom decorator which has transaction support:

class custom_celery_task:
    """
    This is a decorator we can use to add custom logic to our Celery task
    such as retry or database transaction
    """
    def __init__(self, *args, **kwargs):
        self.task_args = args
        self.task_kwargs = kwargs

    def __call__(self, func):
        @functools.wraps(func)
        def wrapper_func(*args, **kwargs):
            try:
                with transaction.atomic():
                    return func(*args, **kwargs)
            except Exception as e:
                # task_func.request.retries
                raise task_func.retry(exc=e, countdown=5)

        task_func = shared_task(*self.task_args, **self.task_kwargs)(wrapper_func)
        return task_func

...

@custom_celery_task(max_retries=5)
def task_transaction_test():
    # do something

Conclusion

This tutorial looked at how to make Celery work nicely with Django database transactions.

The source code for this tutorial can be found on GitHub.

Thanks for your reading. If you have any question, please feel free to contact me.

Django + Celery Series:

  1. Asynchronous Tasks with Django and Celery
  2. Handling Periodic Tasks in Django with Celery and Docker
  3. Automatically Retrying Failed Celery Tasks
  4. Working with Celery and Database Transactions (this article!)
Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.

SaaS Pegasus advertisement - https://www.saaspegasus.com/?utm_source=testdriven&utm_medium=banner&utm_campaign=ad1
Featured Course

Test-Driven Development with Django, Django REST Framework, and Docker

In this course, you'll learn how to set up a development environment with Docker in order to build and deploy a RESTful API powered by Python, Django, and Django REST Framework.