Thursday, October 9, 2025

Using Async Functions in Celery with Django Connection Pooling

We have a Django application and wanted to start writing async code. Thanks to recent Django versions, you can, but if you also use Celery, especially with the new connection pooling from Django 5.1, things aren't so simple. Here is how we figured it out.  

Celery doesn't support async

To start, Celery doesn't support async code. However, you can wrap your functions in a task that runs the underlying async Celery task via:

asyncio.run(async_function())

However, it isn't that simple. What will happen when you also use a connection pool is that you will quickly exhaust the pool as connections aren't released back to the pool, and you'll get a ton of timeouts. The answer is in how Django's async support works, but first, we need to understand how Django's async support in the ORM works.

How Django's async ORM actually works

Django doesn't have true async database support (yet?). Here's what happens under the hood:

  1. You call await MyModel.objects.aget()
  2. Django detects this is an async call to a sync ORM
  3. Django internally uses sync_to_async to run the query in a separate thread
  4. By default, sync_to_async uses a global process-wide single_thread_executor
  5. That thread never shuts down during the process lifetime
  6. Database connections opened in that thread never get closed or returned to the pool

Therefore, if you set this (required for async calls with Django):

# settings.py
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        ...
        "CONN_MAX_AGE": 0
    } 
}

Django will open new connections for every use. However, if you use the pool attribute, it is ignored, and you'll have the aforementioned problem. However, the answer to this working is found in how Django handles async view functions.

Django async request flow

This is the flow of a request and how it interactions with your async code and the database:

  Request Arrives
     │
     └─ ASGIHandler.__call__()
            │
            └─ async with ThreadSensitiveContext():  ← Creates per-request executor
                   │
                   ├─ await signals.request_started.asend()
                   │      └─ Triggers close_old_connections() in main thread
                   │
                   └─ async def my_view(request):
                          │
                          └─ await MyModel.objects.aget()
                                 │
                                 └─ Django ORM wraps with sync_to_async()
                                        │
                                        ├─ Checks: Is there a ThreadSensitiveContext?
                                        │     YES → Use context's dedicated executor
                                        │     NO  → Use global single_thread_executor
                                        │
                                        └─ Per-Request Executor Thread:
                                               └─ Runs sync query
                                               └─ Opens DB connection (thread-local)
                                               └─ Returns result
                   │
                   ├─ Response sent to client
                   │
                   ├─ await signals.request_finished.asend()
                   │      └─ Triggers close_old_connections() in main thread
                   │
                   └─ ThreadSensitiveContext.__aexit__:
                          └─ executor.shutdown()  ← Kills the executor thread
                                 └─ Thread dies
                                        └─ Thread-local connections cleaned up ✅
                                               └─ Connections returned to pool ✅
                                               

The key is the use of ThreadSensitiveContext. This creates a context that causes the Django ORM to create a thread to serialize DB calls for the lifetime of that context. In the case of an HTTP request, that means for the lifetime of the HTTP request. The close_old_connections() will only close connections from the current thread, not the one spawned to serialize db calls when the ORM is called from an async context.

However, for a Celery task, we don't have any of that. Even if we called close_old_connections() manually, it would only clean up connections associated with the current thread and miss those from the global thread.

What ThreadSensitiveContext does is ensure that when, within an async function, a request to make an ORM call is made, it creates a new thread tied to the async function's thread, and then when the ThreadSensitiveContext exits, it is cleaned up and with it, any database connections it had requested from the pool.

Key Concepts

What is ThreadSensitiveContext?

ThreadSensitiveContext is an async context manager from the asgiref library that:

  1. Creates an isolated thread pool executor for the context scope
  2. Ensures all sync_to_async() calls within the context use this executor (not the global one)
  3. On context exit (__aexit__), shuts down the executor
  4. Executor shutdown kills the thread
  5. Thread death triggers cleanup of thread-local data (including database connections)

Why Thread Shutdown Cleans Up Connections

  1. Django's ConnectionHandler uses Local(thread_critical=True) (thread-local storage)
  2. Each thread gets its own set of database connection objects
  3. When a thread terminates, Python's garbage collector cleans up thread-local data
  4. The psycopg3 pool detects connections are closed and returns them to the pool

Why You Can't Access Another Thread's Connections

Thread-local storage is isolated per thread by design. From Python's perspective, each thread has completely separate storage. This is why cross-thread cleanup is impossible and why per-task executors are necessary.

The fix for Celery

Celery doesn't support async functions natively. Detect and wrap them:

# celeryapp.py
import asyncio
import functools
import inspect
from typing import Any, Callable

from django.db import close_old_connections

def _async_to_sync_wrapper(async_func: Callable[..., Any]) -> Callable[..., Any]:
    """Wraps async functions for Celery tasks with proper connection cleanup."""

    @functools.wraps(async_func)
    def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
        async def wrapped_with_context() -> Any:
            async with ThreadSensitiveContext():
                return await async_func(*args, **kwargs)

        try:
            # Close stale connections BEFORE task execution
            # (Matches Django's request_started signal behavior)
            close_old_connections()

            # Run the async task with per-task executor isolation
            return asyncio.run(wrapped_with_context())
        finally:
            # Close connections AFTER task execution
            # (Matches Django's request_finished signal behavior)
            close_old_connections()

    # Preserve function signature for inspection
    sync_wrapper.__signature__ = inspect.signature(async_func)
    sync_wrapper.__annotations__ = async_func.__annotations__
    return sync_wrapper

def async_task(**kwargs):
    """Custom task decorator that supports async functions."""
    def inner(func):
        # Detect async functions and wrap them
        if inspect.iscoroutinefunction(func):
            func = _async_to_sync_wrapper(func)

        return app.task(**kwargs)(func)

    return inner

Usage:

@async_task(name="my_app.process_data")
async def process_data_async(org_id: int):
    obj = await MyModel.objects.aget(id=1)
    # ... more async work

What we did:

  1. We close any connections associated with the main thread before and after the task. In an async context, that really just catches any accidental sync ORM calls you made, but it doesn't hurt to add it
  2. We wrap the task execution in ThreadSensitiveContext to ensure the ORM calling thread is created pre "request" (though celery task call in this case).
  3. We created an @async_task decorator to handle everything automatically, and it can even support sync tasks

One more thing: Handling Celery's prefork

If you do all the above and use the default mode of prefork, your app won't work at all. This is because the connection pool, created by Django, is created in the main worker process before it is forked for other workers. Any connections created at this point will be invalid as they are tied to the process that created them.

Therefore, when the worker process initializes, you need to clean any existing connections:

from celery.signals import worker_process_init, worker_process_shutdown
from django.core.cache import caches
from django.db import connections
    
@worker_process_init.connect
def init_worker_process(**kwargs: Any) -> None:
    # Close all database connections inherited from parent
    for conn in connections.all():
        conn.close()

    # Also close cache connections
    for cache in caches.all():
        if hasattr(cache, "close"):
            cache.close()

@worker_process_shutdown.connect
def shutdown_worker_process(**kwargs: Any) -> None:
    for conn in connections.all():
        conn.close()

Note: if you use gunicorn and its --preload flag to load the app in memory before forking, you'll have the same issue. In that case, however, you can just remove the flag.

References

  • Django ASGI Handler: django/core/handlers/asgi.py (lines 161-162)
  • ThreadSensitiveContext: asgiref/sync.py (lines 111-149)
  • Django Ticket #32889: Introduction of ThreadSensitiveContext for ASGI
  • ConnectionHandler Implementation: django/db/utils.py (line 145: thread_critical = True)
  • psycopg3 Connection Pooling: Documentation

Conclusion

I'm writing this as I couldn't find any other documentation referring to using ThreadSensitiveContext to apply the same async handling Django uses for web requests and how to apply it to other traditionally sync-only contexts like Celery. It sucked as you can't replicate a lot of these issues in dev but all's well that ends well :)