rogulski.it

My name is Piotr, a passionate pythonista and this is my blog!

    SqlAlchemy 1.4 async ORM with FastAPI

    Posted at — Oct 9, 2021

    Previously on my blog I was setting up a backend service with async sqlalchemy core using a database library. The reason for that was simple, SQLAlchemy did not support asyncio in ORM yet. Now, since SQLAlchemy 1.4 is here, we can do the proper setup using only this package!

    This tutorial will present how to set up a production-ready application running on FastAPI, PostgreSQL, SQLAlchemy 1.4, and alembic. Everything using asyncio.

    Requirements

    Project setup

    First, I want to share a structure. The project will look like this:

    ├── Dockerfile
    ├── README.md
    ├── alembic.ini
    ├── app
    │   ├── __init__.py
    │   ├── api
    │   ├── core
    │   ├── db
    │   ├── main.py
    │   └── models
    ├── docker-compose.yml
    ├── poetry.lock
    ├── pyproject.toml
    └── tests
        ├── __init__.py
        ├── app
        └── conftest.py
    

    Starting from the top, at the root of the project we will store config files like docker, migrations, poetry, etc.

    Next, our Python application module is placed in app directory.

    Lastly, tests module is located at the same directory level. The reason to keep it outside the app module is similar to why poetry separates the dependency. On various builds created from code, tests are not needed on the final image or server.

    Build and run a system in background

    docker-compose up -d
    

    The tutorial code can be found on my github

    git clone git@github.com:rglsk/fastapi-sqlalchemy-1.4-async.git
    

    Database setup

    According to the structure, the db directory presents as follows:

    │   ├── db
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── base_class.py
    │   │   ├── errors.py
    │   │   ├── migrations
    │   │   │   ├── README
    │   │   │   ├── __pycache__
    │   │   │   ├── env.py
    │   │   │   ├── script.py.mako
    │   │   │   └── versions
    │   │   ├── repositories
    │   │   │   ├── __init__.py
    │   │   │   ├── base.py
    │   │   │   └── coupons.py
    │   │   ├── session.py
    │   │   └── tables
    │   │       ├── __init__.py
    │   │       └── coupons.py
    

    Local database

    version: "3.7"
    
    services:
      postgres:
        image: postgres:12.5
        environment:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: postgres
        volumes:
          - postgresql_data:/var/lib/postgresql/data/
        expose:
          - 5432
        ports:
          - 5432:5432
    
    volumes:
      postgresql_data:
    

    Declarative base

    To define a mapped class and start to create tables it is required to have the Base. In the given example as_declarative decorator is used, it just adapts a given class into a declarative_base(). Additionally, every table is forced to include the primary key which is a UUID (it is my personal preference).

    # base_class.py
    
    import uuid
    
    from sqlalchemy import Column
    from sqlalchemy.dialects.postgresql import UUID
    from sqlalchemy.ext.declarative import as_declarative, declared_attr
    
    
    @as_declarative()
    class Base:
        id: uuid.UUID = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
        __name__: str
    
        # Generate __tablename__ automatically
        @declared_attr
        def __tablename__(cls) -> str:
            return cls.__name__.lower()
    

    Note

    The declarative_base() function is now a specialization of the more generic registry class. The function also moves to the sqlalchemy.orm package from the declarative.ext package.

    Tables

    Every table created in the project needs to inherit from our declarative base.

    # tables/coupons.py
    
    from sqlalchemy import Column, Integer, String
    
    from app.db.base_class import Base
    
    
    class Coupon(Base):
        __tablename__ = "coupon"
    
        code = Column(String, nullable=False, unique=True)
        init_count = Column(Integer, nullable=False)
        remaining_count = Column(Integer, nullable=False)
    

    Database session

    Wanting to have a connection with database in place we need to define a session. Project needs to work asynchronous so create_async_engine is going to be used. It works the same as traditional Engine API but additionally it makes it async.

    # session.py
    
    from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
    from sqlalchemy.orm import sessionmaker
    
    from app.core.config import settings
    
    
    engine = create_async_engine(
        settings.async_database_url,
        echo=settings.DB_ECHO_LOG,
    )
    async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    

    Migrations

    Long story short, going on production we can not create and drop tables every time we make a request or build. To solve this issue we will use alembic. It is my migration tool by choice, especially if dealing with SQLAlchemy.

    Init migrations

    alemic init app/db/migrations
    

    Alembic ini config

    Everything can be the same as autogenerated, the most important part is to keep the location script path correct.

    # alembic.ini
    
    [alembic]
    script_location = app/db/migrations
    

    Alembic env config

    The most important part of this file is to set the correct database URL. Using docker it will be: postgresql://postgres:postgres@postgres:5432/postgres.

    Please pay attention that Base class is imported from base.py and not from base_class.py. It is because alembic needs to gather all mapped tables.

    from app.core.config import settings  # noqa
    from app.db.base import Base  # noqa
    
    config = context.config
    config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
    target_metadata = Base.metadata
    
    fileConfig(config.config_file_name)
    

    Import all the table models into by alembic

    # app/db/base.py
    
    from app.db.base_class import Base  # noqa: F401
    from app.db.tables.coupons import Coupon  # noqa: F401
    

    Generate migrations

    docker-compose run app alembic revision --autogenerate
    

    Run migrations

    docker-compose run app alembic upgrade head
    

    Repositories

    As a bit of Domain-Driven Design, I love to use a repository pattern in Python codebase. It is a code layer between service and database that help with handling database operations. Unfortunately, it is an additional work time to code them and add unit tests for everything. I can tell it is worth it! It keeps the code in very good structure, grouped by layers, and it allows not to bind projects with a specific ORM library or even database.

    For this tutorial, the BaseRepository includes only two basic methods. All of them are async methods awaiting commit.

    # app/db/repositories/base.py
    
    import abc
    from typing import Generic, TypeVar, Type
    from uuid import uuid4, UUID
    
    from sqlalchemy.ext.asyncio import AsyncSession
    
    from app.db.errors import DoesNotExist
    from app.models.schema.base import BaseSchema
    
    IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseSchema)
    SCHEMA = TypeVar("SCHEMA", bound=BaseSchema)
    TABLE = TypeVar("TABLE")
    
    
    class BaseRepository(Generic[IN_SCHEMA, SCHEMA, TABLE], metaclass=abc.ABCMeta):
        def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
            self._db_session: AsyncSession = db_session
    
        @property
        @abc.abstractmethod
        def _table(self) -> Type[TABLE]:
            ...
    
        @property
        @abc.abstractmethod
        def _schema(self) -> Type[SCHEMA]:
            ...
    
        async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
            entry = self._table(id=uuid4(), **in_schema.dict())
            self._db_session.add(entry)
            await self._db_session.commit()
            return self._schema.from_orm(entry)
    
        async def get_by_id(self, entry_id: UUID) -> SCHEMA:
            entry = await self._db_session.get(self._table, entry_id)
            if not entry:
                raise DoesNotExist(
                    f"{self._table.__name__}<id:{entry_id}> does not exist"
                )
            return self._schema.from_orm(entry)
    

    Usage of BaseRepository:

    # app/repositories/coupons.py
    
    from typing import Type
    
    from app.db.repositories.base import BaseRepository
    from app.db.tables.coupons import Coupon
    from app.models.schema.coupons import InCouponSchema, CouponSchema
    
    
    class CouponsRepository(BaseRepository[InCouponSchema, CouponSchema, Coupon]):
        @property
        def _in_schema(self) -> Type[InCouponSchema]:
            return InCouponSchema
    
        @property
        def _schema(self) -> Type[CouponSchema]:
            return CouponSchema
    
        @property
        def _table(self) -> Type[Coupon]:
            return Coupon
    

    API handlers

    │   ├── api
    │   │   ├── __init__.py
    │   │   ├── dependencies
    │   │   │   ├── __init__.py
    │   │   │   └── db.py
    │   │   └── routes
    │   │       ├── __init__.py
    │   │       ├── api.py
    │   │       └── coupons.py
    

    Using FastAPI we need to talk about dependency injection! Long story short it means we are using a mechanism in our code to declare things that it requires to work and use (called dependencies) and then FastAPI will do the magic and inject them when needed.

    Our only dependency in this project, but an important one is a database. It creates an async session, yields it, and at the end commits this session.

    # api/dependencies/db.py
    
    from sqlalchemy.ext.asyncio import AsyncSession
    
    from app.db.session import async_session
    
    
    async def get_db() -> AsyncSession:
        """
        Dependency function that yields db sessions
        """
        async with async_session() as session:
            yield session
            await session.commit()
    

    The usage is happening thanks to the FastAPI buildin class called Depends which injects our dependency into the handler.

    # api/routes/coupons.py
    
    from fastapi import APIRouter, Depends
    from sqlalchemy.ext.asyncio import AsyncSession
    from starlette import status
    
    from app.api.dependencies.db import get_db
    from app.db.repositories.coupons import CouponsRepository
    from app.models.schema.coupons import OutCouponSchema, InCouponSchema
    
    @router.post("/", status_code=status.HTTP_201_CREATED, response_model=OutCouponSchema)
    async def create_coupon(
        payload: InCouponSchema, db: AsyncSession = Depends(get_db)
    ) -> OutCouponSchema:
        coupons_repository = CouponsRepository(db)
        coupon = await coupons_repository.create(payload)
        return OutCouponSchema(**coupon.dict())
    

    Tests

    Now we are finally good to start the real fun!

    Having async methods in our codebase to run our tests properly we need to set up the event loop in scope=session.

    # tests/conftest.py
    
    @pytest.fixture(scope="session")
    def event_loop(request) -> Generator:
        """Create an instance of the default event loop for each test case."""
        loop = asyncio.get_event_loop_policy().new_event_loop()
        yield loop
        loop.close()
    

    Next, we need to create a fixture with a new db_session which will run only for our tests, create and drop tables and allow to execute database operations.

    @pytest.fixture()
    async def db_session() -> AsyncSession:
        async with engine.begin() as connection:
            await connection.run_sync(Base.metadata.drop_all)
            await connection.run_sync(Base.metadata.create_all)
            async with async_session(bind=connection) as session:
                yield session
                await session.flush()
                await session.rollback()
    

    Having only the above session we could already start to create our entries in the tests. Unfortunately, when we will try to connect to DB in our handler we will have the issue.

    To solve it we need to override the

    @pytest.fixture()
    def override_get_db(db_session: AsyncSession) -> Callable:
        async def _override_get_db():
            yield db_session
    
        return _override_get_db
    
    
    @pytest.fixture()
    def app(override_get_db: Callable) -> FastAPI:
        from app.api.dependencies.db import get_db
        from app.main import app
    
        app.dependency_overrides[get_db] = override_get_db
        return app
    

    Finally, we need to create an async testing client for which httpx lib will help us.

    from typing import AsyncGenerator
    
    import pytest
    from fastapi import FastAPI
    
    from httpx import AsyncClient
    
    @pytest.fixture()
    async def async_client(app: FastAPI) -> AsyncGenerator:
        async with AsyncClient(app=app, base_url="http://test") as ac:
            yield ac
    

    Having our fixtures configured there is nothing more please than create our first test. The important part in testing is to remember to define pytest.mark.asyncio. It can be done as it is for example using pytestmark or by attaching it as the decorator to every test method.

    from unittest import mock
    
    import pytest
    from httpx import AsyncClient
    from sqlalchemy.ext.asyncio import AsyncSession
    from starlette import status
    
    from app.db.repositories.coupons import CouponsRepository
    from app.models.schema.coupons import InCouponSchema
    
    pytestmark = pytest.mark.asyncio
    
    
    async def test_coupon_create(
        async_client: AsyncClient, db_session: AsyncSession
    ) -> None:
        coupons_repository = CouponsRepository(db_session)
        payload = {
            "code": "PIOTR",
            "init_count": 100,
        }
    
        response = await async_client.post("/v1/coupons/", json=payload)
        coupon = await coupons_repository.get_by_id(response.json()["id"])
    
        assert response.status_code == status.HTTP_201_CREATED
        assert response.json() == {
            "code": payload["code"],
            "init_count": payload["init_count"],
            "remaining_count": payload["init_count"],
            "id": str(coupon.id),
        }
    

    Summary

    This setup works like a charm. It was battle-tested on the big load services. Of course, there were some issues like connection pooling and closed connections in the middle of the request (but this is a topic for the next blog post!).

    My overall feelings are quite good, it takes some time to set up everything and well test it, but from my perspective, it is worth it. Does it give you much to have your DB connections async? It depends on the use-case…

    FastAPI tips and tricks