My name is Piotr, a passionate pythonista and this is my blog!

    SqlAlchemy 1.4 async ORM with FastAPI

    Posted at — Oct 9, 2021

    Previously on my blog I was setting up a backend service with async sqlalchemy core using a database library. The reason for that was simple, SQLAlchemy did not support asyncio in ORM yet. Now, since SQLAlchemy 1.4 is here, we can do the proper setup using only this package!

    This tutorial will present how to set up a production-ready application running on FastAPI, PostgreSQL, SQLAlchemy 1.4, and alembic. Everything using asyncio.


    Project setup

    First, I want to share a structure. The project will look like this:

    ├── Dockerfile
    ├── alembic.ini
    ├── app
    │   ├──
    │   ├── api
    │   ├── core
    │   ├── db
    │   ├──
    │   └── models
    ├── docker-compose.yml
    ├── poetry.lock
    ├── pyproject.toml
    └── tests
        ├── app

    Starting from the top, at the root of the project we will store config files like docker, migrations, poetry, etc.

    Next, our Python application module is placed in app directory.

    Lastly, tests module is located at the same directory level. The reason to keep it outside the app module is similar to why poetry separates the dependency. On various builds created from code, tests are not needed on the final image or server.

    Build and run a system in background

    docker-compose up -d

    The tutorial code can be found on my github

    git clone

    Database setup

    According to the structure, the db directory presents as follows:

    │   ├── db
    │   │   ├──
    │   │   ├──
    │   │   ├──
    │   │   ├──
    │   │   ├── migrations
    │   │   │   ├── README
    │   │   │   ├── __pycache__
    │   │   │   ├──
    │   │   │   ├──
    │   │   │   └── versions
    │   │   ├── repositories
    │   │   │   ├──
    │   │   │   ├──
    │   │   │   └──
    │   │   ├──
    │   │   └── tables
    │   │       ├──
    │   │       └──

    Local database

    version: "3.7"
        image: postgres:12.5
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: postgres
          - postgresql_data:/var/lib/postgresql/data/
          - 5432
          - 5432:5432

    Declarative base

    To define a mapped class and start to create tables it is required to have the Base. In the given example as_declarative decorator is used, it just adapts a given class into a declarative_base(). Additionally, every table is forced to include the primary key which is a UUID (it is my personal preference).

    import uuid
    from sqlalchemy import Column
    from sqlalchemy.dialects.postgresql import UUID
    from sqlalchemy.ext.declarative import as_declarative, declared_attr
    class Base:
        id: uuid.UUID = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
        __name__: str
        # Generate __tablename__ automatically
        def __tablename__(cls) -> str:
            return cls.__name__.lower()


    The declarative_base() function is now a specialization of the more generic registry class. The function also moves to the sqlalchemy.orm package from the declarative.ext package.


    Every table created in the project needs to inherit from our declarative base.

    # tables/
    from sqlalchemy import Column, Integer, String
    from app.db.base_class import Base
    class Coupon(Base):
        __tablename__ = "coupon"
        code = Column(String, nullable=False, unique=True)
        init_count = Column(Integer, nullable=False)
        remaining_count = Column(Integer, nullable=False)

    Database session

    Wanting to have a connection with database in place we need to define a session. Project needs to work asynchronous so create_async_engine is going to be used. It works the same as traditional Engine API but additionally it makes it async.

    from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
    from sqlalchemy.orm import sessionmaker
    from app.core.config import settings
    engine = create_async_engine(
    async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


    Long story short, going on production we can not create and drop tables every time we make a request or build. To solve this issue we will use alembic. It is my migration tool by choice, especially if dealing with SQLAlchemy.

    Init migrations

    alemic init app/db/migrations

    Alembic ini config

    Everything can be the same as autogenerated, the most important part is to keep the location script path correct.

    # alembic.ini
    script_location = app/db/migrations

    Alembic env config

    The most important part of this file is to set the correct database URL. Using docker it will be: postgresql://postgres:postgres@postgres:5432/postgres.

    Please pay attention that Base class is imported from and not from It is because alembic needs to gather all mapped tables.

    from app.core.config import settings  # noqa
    from app.db.base import Base  # noqa
    config = context.config
    config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
    target_metadata = Base.metadata

    Import all the table models into by alembic

    # app/db/
    from app.db.base_class import Base  # noqa: F401
    from import Coupon  # noqa: F401

    Generate migrations

    docker-compose run app alembic revision --autogenerate

    Run migrations

    docker-compose run app alembic upgrade head


    As a bit of Domain-Driven Design, I love to use a repository pattern in Python codebase. It is a code layer between service and database that help with handling database operations. Unfortunately, it is an additional work time to code them and add unit tests for everything. I can tell it is worth it! It keeps the code in very good structure, grouped by layers, and it allows not to bind projects with a specific ORM library or even database.

    For this tutorial, the BaseRepository includes only two basic methods. All of them are async methods awaiting commit.

    # app/db/repositories/
    import abc
    from typing import Generic, TypeVar, Type
    from uuid import uuid4, UUID
    from sqlalchemy.ext.asyncio import AsyncSession
    from app.db.errors import DoesNotExist
    from app.models.schema.base import BaseSchema
    IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseSchema)
    SCHEMA = TypeVar("SCHEMA", bound=BaseSchema)
    TABLE = TypeVar("TABLE")
    class BaseRepository(Generic[IN_SCHEMA, SCHEMA, TABLE], metaclass=abc.ABCMeta):
        def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
            self._db_session: AsyncSession = db_session
        def _table(self) -> Type[TABLE]:
        def _schema(self) -> Type[SCHEMA]:
        async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
            entry = self._table(id=uuid4(), **in_schema.dict())
            await self._db_session.commit()
            return self._schema.from_orm(entry)
        async def get_by_id(self, entry_id: UUID) -> SCHEMA:
            entry = await self._db_session.get(self._table, entry_id)
            if not entry:
                raise DoesNotExist(
                    f"{self._table.__name__}<id:{entry_id}> does not exist"
            return self._schema.from_orm(entry)

    Usage of BaseRepository:

    # app/repositories/
    from typing import Type
    from app.db.repositories.base import BaseRepository
    from import Coupon
    from import InCouponSchema, CouponSchema
    class CouponsRepository(BaseRepository[InCouponSchema, CouponSchema, Coupon]):
        def _in_schema(self) -> Type[InCouponSchema]:
            return InCouponSchema
        def _schema(self) -> Type[CouponSchema]:
            return CouponSchema
        def _table(self) -> Type[Coupon]:
            return Coupon

    API handlers

    │   ├── api
    │   │   ├──
    │   │   ├── dependencies
    │   │   │   ├──
    │   │   │   └──
    │   │   └── routes
    │   │       ├──
    │   │       ├──
    │   │       └──

    Using FastAPI we need to talk about dependency injection! Long story short it means we are using a mechanism in our code to declare things that it requires to work and use (called dependencies) and then FastAPI will do the magic and inject them when needed.

    Our only dependency in this project, but an important one is a database. It creates an async session, yields it, and at the end commits this session.

    # api/dependencies/
    from sqlalchemy.ext.asyncio import AsyncSession
    from app.db.session import async_session
    async def get_db() -> AsyncSession:
        Dependency function that yields db sessions
        async with async_session() as session:
            yield session
            await session.commit()

    The usage is happening thanks to the FastAPI buildin class called Depends which injects our dependency into the handler.

    # api/routes/
    from fastapi import APIRouter, Depends
    from sqlalchemy.ext.asyncio import AsyncSession
    from starlette import status
    from app.api.dependencies.db import get_db
    from import CouponsRepository
    from import OutCouponSchema, InCouponSchema"/", status_code=status.HTTP_201_CREATED, response_model=OutCouponSchema)
    async def create_coupon(
        payload: InCouponSchema, db: AsyncSession = Depends(get_db)
    ) -> OutCouponSchema:
        coupons_repository = CouponsRepository(db)
        coupon = await coupons_repository.create(payload)
        return OutCouponSchema(**coupon.dict())


    Now we are finally good to start the real fun!

    Having async methods in our codebase to run our tests properly we need to set up the event loop in scope=session.

    # tests/
    def event_loop(request) -> Generator:
        """Create an instance of the default event loop for each test case."""
        loop = asyncio.get_event_loop_policy().new_event_loop()
        yield loop

    Next, we need to create a fixture with a new db_session which will run only for our tests, create and drop tables and allow to execute database operations.

    async def db_session() -> AsyncSession:
        async with engine.begin() as connection:
            await connection.run_sync(Base.metadata.drop_all)
            await connection.run_sync(Base.metadata.create_all)
            async with async_session(bind=connection) as session:
                yield session
                await session.flush()
                await session.rollback()

    Having only the above session we could already start to create our entries in the tests. Unfortunately, when we will try to connect to DB in our handler we will have the issue.

    To solve it we need to override the

    def override_get_db(db_session: AsyncSession) -> Callable:
        async def _override_get_db():
            yield db_session
        return _override_get_db
    def app(override_get_db: Callable) -> FastAPI:
        from app.api.dependencies.db import get_db
        from app.main import app
        app.dependency_overrides[get_db] = override_get_db
        return app

    Finally, we need to create an async testing client for which httpx lib will help us.

    from typing import AsyncGenerator
    import pytest
    from fastapi import FastAPI
    from httpx import AsyncClient
    async def async_client(app: FastAPI) -> AsyncGenerator:
        async with AsyncClient(app=app, base_url="http://test") as ac:
            yield ac

    Having our fixtures configured there is nothing more please than create our first test. The important part in testing is to remember to define pytest.mark.asyncio. It can be done as it is for example using pytestmark or by attaching it as the decorator to every test method.

    from unittest import mock
    import pytest
    from httpx import AsyncClient
    from sqlalchemy.ext.asyncio import AsyncSession
    from starlette import status
    from import CouponsRepository
    from import InCouponSchema
    pytestmark = pytest.mark.asyncio
    async def test_coupon_create(
        async_client: AsyncClient, db_session: AsyncSession
    ) -> None:
        coupons_repository = CouponsRepository(db_session)
        payload = {
            "code": "PIOTR",
            "init_count": 100,
        response = await"/v1/coupons/", json=payload)
        coupon = await coupons_repository.get_by_id(response.json()["id"])
        assert response.status_code == status.HTTP_201_CREATED
        assert response.json() == {
            "code": payload["code"],
            "init_count": payload["init_count"],
            "remaining_count": payload["init_count"],
            "id": str(,


    This setup works like a charm. It was battle-tested on the big load services. Of course, there were some issues like connection pooling and closed connections in the middle of the request (but this is a topic for the next blog post!).

    My overall feelings are quite good, it takes some time to set up everything and well test it, but from my perspective, it is worth it. Does it give you much to have your DB connections async? It depends on the use-case…

    FastAPI tips and tricks