rogulski.it

My name is Piotr, a passionate pythonista and this is my blog!

SqlAlchemy 1.4 async ORM with FastAPI

Posted at — Oct 9, 2021

Previously on my blog I was setting up a backend service with async sqlalchemy core using a database library. The reason for that was simple, SQLAlchemy did not support asyncio in ORM yet. Now, since SQLAlchemy 1.4 is here, we can do the proper setup using only this package!

This tutorial will present how to set up a production-ready application running on FastAPI, PostgreSQL, SQLAlchemy 1.4, and alembic. Everything using asyncio.

Requirements

Project setup

First, I want to share a structure. The project will look like this:

├── Dockerfile
├── README.md
├── alembic.ini
├── app
│   ├── __init__.py
│   ├── api
│   ├── core
│   ├── db
│   ├── main.py
│   └── models
├── docker-compose.yml
├── poetry.lock
├── pyproject.toml
└── tests
    ├── __init__.py
    ├── app
    └── conftest.py

Starting from the top, at the root of the project we will store config files like docker, migrations, poetry, etc.

Next, our Python application module is placed in app directory.

Lastly, tests module is located at the same directory level. The reason to keep it outside the app module is similar to why poetry separates the dependency. On various builds created from code, tests are not needed on the final image or server.

Build and run a system in background

docker-compose up -d

The tutorial code can be found on my github

git clone git@github.com:rglsk/fastapi-sqlalchemy-1.4-async.git

Database setup

According to the structure, the db directory presents as follows:

│   ├── db
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── base_class.py
│   │   ├── errors.py
│   │   ├── migrations
│   │   │   ├── README
│   │   │   ├── __pycache__
│   │   │   ├── env.py
│   │   │   ├── script.py.mako
│   │   │   └── versions
│   │   ├── repositories
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   └── coupons.py
│   │   ├── session.py
│   │   └── tables
│   │       ├── __init__.py
│   │       └── coupons.py

Local database

version: "3.7"

services:
  postgres:
    image: postgres:12.5
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: postgres
    volumes:
      - postgresql_data:/var/lib/postgresql/data/
    expose:
      - 5432
    ports:
      - 5432:5432

volumes:
  postgresql_data:

Declarative base

To define a mapped class and start to create tables it is required to have the Base. In the given example as_declarative decorator is used, it just adapts a given class into a declarative_base(). Additionally, every table is forced to include the primary key which is a UUID (it is my personal preference).

# base_class.py

import uuid

from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import as_declarative, declared_attr


@as_declarative()
class Base:
    id: uuid.UUID = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    __name__: str

    # Generate __tablename__ automatically
    @declared_attr
    def __tablename__(cls) -> str:
        return cls.__name__.lower()

Note

The declarative_base() function is now a specialization of the more generic registry class. The function also moves to the sqlalchemy.orm package from the declarative.ext package.

Tables

Every table created in the project needs to inherit from our declarative base.

# tables/coupons.py

from sqlalchemy import Column, Integer, String

from app.db.base_class import Base


class Coupon(Base):
    __tablename__ = "coupon"

    code = Column(String, nullable=False, unique=True)
    init_count = Column(Integer, nullable=False)
    remaining_count = Column(Integer, nullable=False)

Database session

Wanting to have a connection with database in place we need to define a session. Project needs to work asynchronous so create_async_engine is going to be used. It works the same as traditional Engine API but additionally it makes it async.

# session.py

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from app.core.config import settings


engine = create_async_engine(
    settings.async_database_url,
    echo=settings.DB_ECHO_LOG,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

Migrations

Long story short, going on production we can not create and drop tables every time we make a request or build. To solve this issue we will use alembic. It is my migration tool by choice, especially if dealing with SQLAlchemy.

Init migrations

alemic init app/db/migrations

Alembic ini config

Everything can be the same as autogenerated, the most important part is to keep the location script path correct.

# alembic.ini

[alembic]
script_location = app/db/migrations

Alembic env config

The most important part of this file is to set the correct database URL. Using docker it will be: postgresql://postgres:postgres@postgres:5432/postgres.

Please pay attention that Base class is imported from base.py and not from base_class.py. It is because alembic needs to gather all mapped tables.

from app.core.config import settings  # noqa
from app.db.base import Base  # noqa

config = context.config
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
target_metadata = Base.metadata

fileConfig(config.config_file_name)

Import all the table models into by alembic

# app/db/base.py

from app.db.base_class import Base  # noqa: F401
from app.db.tables.coupons import Coupon  # noqa: F401

Generate migrations

docker-compose run app alembic revision --autogenerate

Run migrations

docker-compose run app alembic upgrade head

Repositories

As a bit of Domain-Driven Design, I love to use a repository pattern in Python codebase. It is a code layer between service and database that help with handling database operations. Unfortunately, it is an additional work time to code them and add unit tests for everything. I can tell it is worth it! It keeps the code in very good structure, grouped by layers, and it allows not to bind projects with a specific ORM library or even database.

For this tutorial, the BaseRepository includes only two basic methods. All of them are async methods awaiting commit.

# app/db/repositories/base.py

import abc
from typing import Generic, TypeVar, Type
from uuid import uuid4, UUID

from sqlalchemy.ext.asyncio import AsyncSession

from app.db.errors import DoesNotExist
from app.models.schema.base import BaseSchema

IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseSchema)
SCHEMA = TypeVar("SCHEMA", bound=BaseSchema)
TABLE = TypeVar("TABLE")


class BaseRepository(Generic[IN_SCHEMA, SCHEMA, TABLE], metaclass=abc.ABCMeta):
    def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
        self._db_session: AsyncSession = db_session

    @property
    @abc.abstractmethod
    def _table(self) -> Type[TABLE]:
        ...

    @property
    @abc.abstractmethod
    def _schema(self) -> Type[SCHEMA]:
        ...

    async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
        entry = self._table(id=uuid4(), **in_schema.dict())
        self._db_session.add(entry)
        await self._db_session.commit()
        return self._schema.from_orm(entry)

    async def get_by_id(self, entry_id: UUID) -> SCHEMA:
        entry = await self._db_session.get(self._table, entry_id)
        if not entry:
            raise DoesNotExist(
                f"{self._table.__name__}<id:{entry_id}> does not exist"
            )
        return self._schema.from_orm(entry)

Usage of BaseRepository:

# app/repositories/coupons.py

from typing import Type

from app.db.repositories.base import BaseRepository
from app.db.tables.coupons import Coupon
from app.models.schema.coupons import InCouponSchema, CouponSchema


class CouponsRepository(BaseRepository[InCouponSchema, CouponSchema, Coupon]):
    @property
    def _in_schema(self) -> Type[InCouponSchema]:
        return InCouponSchema

    @property
    def _schema(self) -> Type[CouponSchema]:
        return CouponSchema

    @property
    def _table(self) -> Type[Coupon]:
        return Coupon

API handlers

│   ├── api
│   │   ├── __init__.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   └── db.py
│   │   └── routes
│   │       ├── __init__.py
│   │       ├── api.py
│   │       └── coupons.py

Using FastAPI we need to talk about dependency injection! Long story short it means we are using a mechanism in our code to declare things that it requires to work and use (called dependencies) and then FastAPI will do the magic and inject them when needed.

Our only dependency in this project, but an important one is a database. It creates an async session, yields it, and at the end commits this session.

# api/dependencies/db.py

from sqlalchemy.ext.asyncio import AsyncSession

from app.db.session import async_session


async def get_db() -> AsyncSession:
    """
    Dependency function that yields db sessions
    """
    async with async_session() as session:
        yield session
        await session.commit()

The usage is happening thanks to the FastAPI buildin class called Depends which injects our dependency into the handler.

# api/routes/coupons.py

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status

from app.api.dependencies.db import get_db
from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import OutCouponSchema, InCouponSchema

@router.post("/", status_code=status.HTTP_201_CREATED, response_model=OutCouponSchema)
async def create_coupon(
    payload: InCouponSchema, db: AsyncSession = Depends(get_db)
) -> OutCouponSchema:
    coupons_repository = CouponsRepository(db)
    coupon = await coupons_repository.create(payload)
    return OutCouponSchema(**coupon.dict())

Tests

Now we are finally good to start the real fun!

Having async methods in our codebase to run our tests properly we need to set up the event loop in scope=session.

# tests/conftest.py

@pytest.fixture(scope="session")
def event_loop(request) -> Generator:
    """Create an instance of the default event loop for each test case."""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

Next, we need to create a fixture with a new db_session which will run only for our tests, create and drop tables and allow to execute database operations.

@pytest.fixture()
async def db_session() -> AsyncSession:
    async with engine.begin() as connection:
        await connection.run_sync(Base.metadata.drop_all)
        await connection.run_sync(Base.metadata.create_all)
        async with async_session(bind=connection) as session:
            yield session
            await session.flush()
            await session.rollback()

Having only the above session we could already start to create our entries in the tests. Unfortunately, when we will try to connect to DB in our handler we will have the issue.

To solve it we need to override the

@pytest.fixture()
def override_get_db(db_session: AsyncSession) -> Callable:
    async def _override_get_db():
        yield db_session

    return _override_get_db


@pytest.fixture()
def app(override_get_db: Callable) -> FastAPI:
    from app.api.dependencies.db import get_db
    from app.main import app

    app.dependency_overrides[get_db] = override_get_db
    return app

Finally, we need to create an async testing client for which httpx lib will help us.

from typing import AsyncGenerator

import pytest
from fastapi import FastAPI

from httpx import AsyncClient

@pytest.fixture()
async def async_client(app: FastAPI) -> AsyncGenerator:
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac

Having our fixtures configured there is nothing more please than create our first test. The important part in testing is to remember to define pytest.mark.asyncio. It can be done as it is for example using pytestmark or by attaching it as the decorator to every test method.

from unittest import mock

import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status

from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import InCouponSchema

pytestmark = pytest.mark.asyncio


async def test_coupon_create(
    async_client: AsyncClient, db_session: AsyncSession
) -> None:
    coupons_repository = CouponsRepository(db_session)
    payload = {
        "code": "PIOTR",
        "init_count": 100,
    }

    response = await async_client.post("/v1/coupons/", json=payload)
    coupon = await coupons_repository.get_by_id(response.json()["id"])

    assert response.status_code == status.HTTP_201_CREATED
    assert response.json() == {
        "code": payload["code"],
        "init_count": payload["init_count"],
        "remaining_count": payload["init_count"],
        "id": str(coupon.id),
    }

Summary

This setup works like a charm. It was battle-tested on the big load services. Of course, there were some issues like connection pooling and closed connections in the middle of the request (but this is a topic for the next blog post!).

My overall feelings are quite good, it takes some time to set up everything and well test it, but from my perspective, it is worth it. Does it give you much to have your DB connections async? It depends on the use-case…

FastAPI tips and tricks