Celery doesn't support async
To start, Celery doesn't support async code. However, you can wrap your functions in a task that runs the underlying async Celery task via:
asyncio.run(async_function())
However, it isn't that simple. What will happen when you also use a connection pool is that you will quickly exhaust the pool as connections aren't released back to the pool, and you'll get a ton of timeouts. The answer is in how Django's async support works, but first, we need to understand how Django's async support in the ORM works.
How Django's async ORM actually works
Django doesn't have true async database support (yet?). Here's what happens under the hood:
- You call
await MyModel.objects.aget()
- Django detects this is an async call to a sync ORM
- Django internally uses
sync_to_async
to run the query in a separate thread - By default,
sync_to_async
uses a global process-wide single_thread_executor - That thread never shuts down during the process lifetime
- Database connections opened in that thread never get closed or returned to the pool
Therefore, if you set this (required for async calls with Django):
# settings.py
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
...
"CONN_MAX_AGE": 0
}
}
Django will open new connections for every use. However, if you use the pool
attribute, it is ignored, and you'll have the aforementioned problem. However, the answer to this working is found in how Django handles async view functions.
Django async request flow
This is the flow of a request and how it interactions with your async code and the database:
Request Arrives │ └─ ASGIHandler.__call__() │ └─ async with ThreadSensitiveContext(): ← Creates per-request executor │ ├─ await signals.request_started.asend() │ └─ Triggers close_old_connections() in main thread │ └─ async def my_view(request): │ └─ await MyModel.objects.aget() │ └─ Django ORM wraps with sync_to_async() │ ├─ Checks: Is there a ThreadSensitiveContext? │ YES → Use context's dedicated executor │ NO → Use global single_thread_executor │ └─ Per-Request Executor Thread: └─ Runs sync query └─ Opens DB connection (thread-local) └─ Returns result │ ├─ Response sent to client │ ├─ await signals.request_finished.asend() │ └─ Triggers close_old_connections() in main thread │ └─ ThreadSensitiveContext.__aexit__: └─ executor.shutdown() ← Kills the executor thread └─ Thread dies └─ Thread-local connections cleaned up ✅ └─ Connections returned to pool ✅
The key is the use of ThreadSensitiveContext
. This creates a context that causes the Django ORM to create a thread to serialize DB calls for the lifetime of that context. In the case of an HTTP request, that means for the lifetime of the HTTP request. The close_old_connections()
will only close connections from the current thread, not the one spawned to serialize db calls when the ORM is called from an async context.
However, for a Celery task, we don't have any of that. Even if we called close_old_connections()
manually, it would only clean up connections associated with the current thread and miss those from the global thread.
What ThreadSensitiveContext
does is ensure that when, within an async function, a request to make an ORM call is made, it creates a new thread tied to the async function's thread, and then when the ThreadSensitiveContext
exits, it is cleaned up and with it, any database connections it had requested from the pool.
Key Concepts
What is ThreadSensitiveContext?
ThreadSensitiveContext
is an async context manager from the asgiref
library that:
- Creates an isolated thread pool executor for the context scope
- Ensures all
sync_to_async()
calls within the context use this executor (not the global one) - On context exit (
__aexit__
), shuts down the executor - Executor shutdown kills the thread
- Thread death triggers cleanup of thread-local data (including database connections)
Why Thread Shutdown Cleans Up Connections
- Django's
ConnectionHandler
usesLocal(thread_critical=True)
(thread-local storage) - Each thread gets its own set of database connection objects
- When a thread terminates, Python's garbage collector cleans up thread-local data
- The psycopg3 pool detects connections are closed and returns them to the pool
Why You Can't Access Another Thread's Connections
Thread-local storage is isolated per thread by design. From Python's perspective, each thread has completely separate storage. This is why cross-thread cleanup is impossible and why per-task executors are necessary.
The fix for Celery
Celery doesn't support async functions natively. Detect and wrap them:
# celeryapp.py
import asyncio
import functools
import inspect
from typing import Any, Callable
from django.db import close_old_connections
def _async_to_sync_wrapper(async_func: Callable[..., Any]) -> Callable[..., Any]:
"""Wraps async functions for Celery tasks with proper connection cleanup."""
@functools.wraps(async_func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
async def wrapped_with_context() -> Any:
async with ThreadSensitiveContext():
return await async_func(*args, **kwargs)
try:
# Close stale connections BEFORE task execution
# (Matches Django's request_started signal behavior)
close_old_connections()
# Run the async task with per-task executor isolation
return asyncio.run(wrapped_with_context())
finally:
# Close connections AFTER task execution
# (Matches Django's request_finished signal behavior)
close_old_connections()
# Preserve function signature for inspection
sync_wrapper.__signature__ = inspect.signature(async_func)
sync_wrapper.__annotations__ = async_func.__annotations__
return sync_wrapper
def async_task(**kwargs):
"""Custom task decorator that supports async functions."""
def inner(func):
# Detect async functions and wrap them
if inspect.iscoroutinefunction(func):
func = _async_to_sync_wrapper(func)
return app.task(**kwargs)(func)
return inner
Usage:
@async_task(name="my_app.process_data")
async def process_data_async(org_id: int):
obj = await MyModel.objects.aget(id=1)
# ... more async work
What we did:
- We close any connections associated with the main thread before and after the task. In an async context, that really just catches any accidental sync ORM calls you made, but it doesn't hurt to add it
- We wrap the task execution in
ThreadSensitiveContext
to ensure the ORM calling thread is created pre "request" (though celery task call in this case). - We created an
@async_task
decorator to handle everything automatically, and it can even support sync tasks
One more thing: Handling Celery's prefork
If you do all the above and use the default mode of prefork
, your app won't work at all. This is because the connection pool, created by Django, is created in the main worker process before it is forked for other workers. Any connections created at this point will be invalid as they are tied to the process that created them.
Therefore, when the worker process initializes, you need to clean any existing connections:
from celery.signals import worker_process_init, worker_process_shutdown
from django.core.cache import caches
from django.db import connections
@worker_process_init.connect
def init_worker_process(**kwargs: Any) -> None:
# Close all database connections inherited from parent
for conn in connections.all():
conn.close()
# Also close cache connections
for cache in caches.all():
if hasattr(cache, "close"):
cache.close()
@worker_process_shutdown.connect
def shutdown_worker_process(**kwargs: Any) -> None:
for conn in connections.all():
conn.close()
Note: if you use gunicorn
and its --preload
flag to load the app in memory before forking, you'll have the same issue. In that case, however, you can just remove the flag.
References
- Django ASGI Handler:
django/core/handlers/asgi.py
(lines 161-162) - ThreadSensitiveContext:
asgiref/sync.py
(lines 111-149) - Django Ticket #32889: Introduction of ThreadSensitiveContext for ASGI
- ConnectionHandler Implementation:
django/db/utils.py
(line 145:thread_critical = True
) - psycopg3 Connection Pooling: Documentation
Conclusion
I'm writing this as I couldn't find any other documentation referring to using ThreadSensitiveContext
to apply the same async handling Django uses for web requests and how to apply it to other traditionally sync-only contexts like Celery. It sucked as you can't replicate a lot of these issues in dev but all's well that ends well :)