In this article, we'll look at how to prevent a Celery task dependent on a Django database transaction from executing before the database commits the transaction. This is a fairly common issue.
Django + Celery Series:
Contents
Objectives
After reading, you should be able to:
- Describe what a database transaction is and how to use it in Django
- Explain why you might get a
DoesNotExist
error in a Celery worker and how to solve it - Prevent a task from executing before the database commits a transaction
What is a database transaction?
A database transaction is a unit of work that is either committed (applied to the database) or rolled back (undone from the database) as a unit.
Most databases use the following pattern:
- Begin the transaction.
- Execute a set of data manipulations and/or queries.
- If no error occurs, then commit the transaction.
- If an error occurs, then roll back the transaction.
As you can see, a transaction is a very useful way to keep your data far away from chaos.
How to use database transactions in Django
You can find the source code for this article on Github. If you decide to use this code as you work through this article, keep in mind that the username must be unique. You could use a random username generator for testing purposes like Faker.
Let's first take a look at this Django view:
def test_view(request):
user = User.objects.create_user('john', '[email protected]', 'johnpassword')
logger.info(f'create user {user.pk}')
raise Exception('test')
What happens when you visit this view?
Default behavior
Django's default behavior is to autocommit: Each query is directly committed to the database unless a transaction is active. In other words, with autocommit, each query starts a transaction and either commits or rolls back the transaction as well. If you have a view with three queries, then each will run one-by-one. If one fails, the other two will be committed.
So, in the above view, the exception is raised after the transaction is committed, creating the user john
.
Explicit control
If you'd prefer to have more control over database transactions, you can override the default behavior with transaction.atomic. In this mode, before calling a view function, Django starts a transaction. If the response is produced without problems, Django commits the transaction. On the other hand, if the view produces an exception, Django rolls back the transaction. If you have three queries and one fails, then none of the queries will be committed.
So, let's re-write the view using transaction.atomic
:
def transaction_test(request):
with transaction.atomic():
user = User.objects.create_user('john1', '[email protected]', 'johnpassword')
logger.info(f'create user {user.pk}')
raise Exception('force transaction to rollback')
Now the user create
operation will roll back when the exception is raised, so the user will not be created in the end.
transaction.atomic
is a very useful tool which can keep your data organized, especially when you need to manipulate data in models.
It can also be used as a decorator like so:
@transaction.atomic
def transaction_test2(request):
user = User.objects.create_user('john1', '[email protected]', 'johnpassword')
logger.info(f'create user {user.pk}')
raise Exception('force transaction to rollback')
So if some error gets raised in the view, and we do not catch it, then the transaction would roll back.
If you want to use transaction.atomic
for all view functions, you can set ATOMIC_REQUESTS
to True
in your Django settings file:
ATOMIC_REQUESTS=True
# or
DATABASES["default"]["ATOMIC_REQUESTS"] = True
You can then override the behavior so that the view runs in autocommit mode:
@transaction.non_atomic_requests
DoesNotExist exception
If you don't have a solid understanding of how Django manages database transactions, it can be quite confusing when you come across random database-related errors in a Celery worker.
Let's look at an example:
@transaction.atomic
def transaction_celery(request):
username = random_username()
user = User.objects.create_user(username, '[email protected]', 'johnpassword')
logger.info(f'create user {user.pk}')
task_send_welcome_email.delay(user.pk)
time.sleep(1)
return HttpResponse('test')
The task code looks like:
@shared_task()
def task_send_welcome_email(user_pk):
user = User.objects.get(pk=user_pk)
logger.info(f'send email to {user.email} {user.pk}')
- Since the view uses the
transaction.atomic
decorator, all database operations are only committed if an error isn't raised in the view, including the Celery task. - The task is fairly simple: We create a user and then pass the primary key to the task to send a welcome email.
time.sleep(1)
is used to introduce a race condition.
When run, you will see the following error:
django.contrib.auth.models.User.DoesNotExist: User matching query does not exist.
Why?
- We pause for 1 second after enqueueing the task.
- Since the task executes immediately,
user = User.objects.get(pk=user_pk)
fails as the user is not in the database because the transaction in Django has not yet been committed.
Solution
There are three ways to solve this:
-
Disable the database transaction, so Django would use the
autocommit
feature. To do so, you can simply remove thetransaction.atomic
decorator. However, this isn't recommended since the atomic database transaction is a powerful tool. -
Force the Celery task to run after a period of time.
For example, to pause for 10 seconds:
task_send_welcome_email.apply_async(args=[user.pk], countdown=10)
-
Django has a callback function called
transaction.on_commit
that executes after a transaction successfully commits. To use this, update the view like so:@transaction.atomic def transaction_celery2(request): username = random_username() user = User.objects.create_user(username, '[email protected]', 'johnpassword') logger.info(f'create user {user.pk}') # the task does not get called until after the transaction is committed transaction.on_commit(lambda: task_send_welcome_email.delay(user.pk)) time.sleep(1) return HttpResponse('test')
Now, the task doesn't get called until after the database transaction commit. So, when the Celery worker finds the user, it can be found because the code in the worker always runs after the Django database transaction commits successfully.
This is the recommended solution.
It's worth noting that you may not want your transaction to commit right away, especially if you're running in a high-scale environment. If either the database or instance are at high-utilization, forcing a commit will only add to the existing usage. In this case, you may want to use the second solution and wait for a sufficient amount of time (20 seconds, perhaps) to ensure that the changes are made to the database before the task executes.
Testing
Django's TestCase
wraps each test in a transaction which is then rolled back after each test. Since no transactions are ever committed, on_commit()
never runs either. So, if you need to test code fired in an on_commit
callback, you can use either TransactionTestCase or TestCase.captureOnCommitCallbacks() in your test code.
Database transaction in a Celery task
If your Celery task needs to update a database record, it makes sense to use a database transaction in the Celery task.
One simple way is with transaction.atomic()
:
@shared_task()
def task_transaction_test():
with transaction.atomic():
from .views import random_username
username = random_username()
user = User.objects.create_user(username, '[email protected]', 'johnpassword')
user.save()
logger.info(f'send email to {user.pk}')
raise Exception('test')
A better approach is to write a custom decorator
which has transaction
support:
class custom_celery_task:
"""
This is a decorator we can use to add custom logic to our Celery task
such as retry or database transaction
"""
def __init__(self, *args, **kwargs):
self.task_args = args
self.task_kwargs = kwargs
def __call__(self, func):
@functools.wraps(func)
def wrapper_func(*args, **kwargs):
try:
with transaction.atomic():
return func(*args, **kwargs)
except Exception as e:
# task_func.request.retries
raise task_func.retry(exc=e, countdown=5)
task_func = shared_task(*self.task_args, **self.task_kwargs)(wrapper_func)
return task_func
...
@custom_celery_task(max_retries=5)
def task_transaction_test():
# do something
Conclusion
This article looked at how to make Celery work nicely with Django database transactions.
The source code for this article can be found on GitHub.
Thanks for your reading. If you have any questions, please feel free to contact me.
Django + Celery Series: