Previously on my blog I was setting up a backend service with async sqlalchemy core using a database library. The reason for that was simple, SQLAlchemy did not support asyncio in ORM yet. Now, since SQLAlchemy 1.4 is here, we can do the proper setup using only this package!
This tutorial will present how to set up a production-ready application running on FastAPI, PostgreSQL, SQLAlchemy 1.4, and alembic. Everything using asyncio.
First, I want to share a structure. The project will look like this:
├── Dockerfile
├── README.md
├── alembic.ini
├── app
│ ├── __init__.py
│ ├── api
│ ├── core
│ ├── db
│ ├── main.py
│ └── models
├── docker-compose.yml
├── poetry.lock
├── pyproject.toml
└── tests
├── __init__.py
├── app
└── conftest.py
Starting from the top, at the root of the project we will store config files like docker, migrations, poetry, etc.
Next, our Python application module is placed in app
directory.
Lastly, tests
module is located at the same directory level. The reason to keep it outside the app
module is similar to why poetry separates the dependency.
On various builds created from code, tests are not needed on the final image or server.
Build and run a system in background
docker-compose up -d
The tutorial code can be found on my github
git clone git@github.com:rglsk/fastapi-sqlalchemy-1.4-async.git
According to the structure, the db
directory presents as follows:
│ ├── db
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── base_class.py
│ │ ├── errors.py
│ │ ├── migrations
│ │ │ ├── README
│ │ │ ├── __pycache__
│ │ │ ├── env.py
│ │ │ ├── script.py.mako
│ │ │ └── versions
│ │ ├── repositories
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ └── coupons.py
│ │ ├── session.py
│ │ └── tables
│ │ ├── __init__.py
│ │ └── coupons.py
version: "3.7"
services:
postgres:
image: postgres:12.5
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
volumes:
- postgresql_data:/var/lib/postgresql/data/
expose:
- 5432
ports:
- 5432:5432
volumes:
postgresql_data:
To define a mapped class and start to create tables it is required to have the Base.
In the given example as_declarative
decorator is used, it just adapts a given class into a declarative_base()
.
Additionally, every table is forced to include the primary key which is a UUID (it is my personal preference).
# base_class.py
import uuid
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.declarative import as_declarative, declared_attr
@as_declarative()
class Base:
id: uuid.UUID = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
__name__: str
# Generate __tablename__ automatically
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
Note
The declarative_base()
function is now a specialization of the more generic registry class. The function also moves to the sqlalchemy.orm package from the declarative.ext package.
Every table created in the project needs to inherit from our declarative base.
# tables/coupons.py
from sqlalchemy import Column, Integer, String
from app.db.base_class import Base
class Coupon(Base):
__tablename__ = "coupon"
code = Column(String, nullable=False, unique=True)
init_count = Column(Integer, nullable=False)
remaining_count = Column(Integer, nullable=False)
Wanting to have a connection with database in place we need to define a session. Project needs to work asynchronous so create_async_engine
is going to be used.
It works the same as traditional Engine API but additionally it makes it async.
# session.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_async_engine(
settings.async_database_url,
echo=settings.DB_ECHO_LOG,
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Long story short, going on production we can not create and drop tables every time we make a request or build. To solve this issue we will use alembic. It is my migration tool by choice, especially if dealing with SQLAlchemy.
Init migrations
alemic init app/db/migrations
Alembic ini config
Everything can be the same as autogenerated, the most important part is to keep the location script path correct.
# alembic.ini
[alembic]
script_location = app/db/migrations
Alembic env config
The most important part of this file is to set the correct database URL. Using docker it will be: postgresql://postgres:postgres@postgres:5432/postgres
.
Please pay attention that Base class is imported from base.py
and not from base_class.py
. It is because alembic needs to gather all mapped tables.
from app.core.config import settings # noqa
from app.db.base import Base # noqa
config = context.config
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
target_metadata = Base.metadata
fileConfig(config.config_file_name)
Import all the table models into by alembic
# app/db/base.py
from app.db.base_class import Base # noqa: F401
from app.db.tables.coupons import Coupon # noqa: F401
Generate migrations
docker-compose run app alembic revision --autogenerate
Run migrations
docker-compose run app alembic upgrade head
As a bit of Domain-Driven Design, I love to use a repository pattern in Python codebase. It is a code layer between service and database that help with handling database operations. Unfortunately, it is an additional work time to code them and add unit tests for everything. I can tell it is worth it! It keeps the code in very good structure, grouped by layers, and it allows not to bind projects with a specific ORM library or even database.
For this tutorial, the BaseRepository
includes only two basic methods. All of them are async methods awaiting commit.
# app/db/repositories/base.py
import abc
from typing import Generic, TypeVar, Type
from uuid import uuid4, UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.errors import DoesNotExist
from app.models.schema.base import BaseSchema
IN_SCHEMA = TypeVar("IN_SCHEMA", bound=BaseSchema)
SCHEMA = TypeVar("SCHEMA", bound=BaseSchema)
TABLE = TypeVar("TABLE")
class BaseRepository(Generic[IN_SCHEMA, SCHEMA, TABLE], metaclass=abc.ABCMeta):
def __init__(self, db_session: AsyncSession, *args, **kwargs) -> None:
self._db_session: AsyncSession = db_session
@property
@abc.abstractmethod
def _table(self) -> Type[TABLE]:
...
@property
@abc.abstractmethod
def _schema(self) -> Type[SCHEMA]:
...
async def create(self, in_schema: IN_SCHEMA) -> SCHEMA:
entry = self._table(id=uuid4(), **in_schema.dict())
self._db_session.add(entry)
await self._db_session.commit()
return self._schema.from_orm(entry)
async def get_by_id(self, entry_id: UUID) -> SCHEMA:
entry = await self._db_session.get(self._table, entry_id)
if not entry:
raise DoesNotExist(
f"{self._table.__name__}<id:{entry_id}> does not exist"
)
return self._schema.from_orm(entry)
Usage of BaseRepository
:
# app/repositories/coupons.py
from typing import Type
from app.db.repositories.base import BaseRepository
from app.db.tables.coupons import Coupon
from app.models.schema.coupons import InCouponSchema, CouponSchema
class CouponsRepository(BaseRepository[InCouponSchema, CouponSchema, Coupon]):
@property
def _in_schema(self) -> Type[InCouponSchema]:
return InCouponSchema
@property
def _schema(self) -> Type[CouponSchema]:
return CouponSchema
@property
def _table(self) -> Type[Coupon]:
return Coupon
│ ├── api
│ │ ├── __init__.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ └── db.py
│ │ └── routes
│ │ ├── __init__.py
│ │ ├── api.py
│ │ └── coupons.py
Using FastAPI we need to talk about dependency injection! Long story short it means we are using a mechanism in our code to declare things that it requires to work and use (called dependencies) and then FastAPI will do the magic and inject them when needed.
Our only dependency in this project, but an important one is a database. It creates an async session, yields it, and at the end commits this session.
# api/dependencies/db.py
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session
async def get_db() -> AsyncSession:
"""
Dependency function that yields db sessions
"""
async with async_session() as session:
yield session
await session.commit()
The usage is happening thanks to the FastAPI buildin class called Depends
which injects our dependency into the handler.
# api/routes/coupons.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.api.dependencies.db import get_db
from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import OutCouponSchema, InCouponSchema
@router.post("/", status_code=status.HTTP_201_CREATED, response_model=OutCouponSchema)
async def create_coupon(
payload: InCouponSchema, db: AsyncSession = Depends(get_db)
) -> OutCouponSchema:
coupons_repository = CouponsRepository(db)
coupon = await coupons_repository.create(payload)
return OutCouponSchema(**coupon.dict())
Now we are finally good to start the real fun!
Having async methods in our codebase to run our tests properly we need to set up the event loop in scope=session
.
# tests/conftest.py
@pytest.fixture(scope="session")
def event_loop(request) -> Generator:
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
Next, we need to create a fixture with a new db_session which will run only for our tests, create and drop tables and allow to execute database operations.
@pytest.fixture()
async def db_session() -> AsyncSession:
async with engine.begin() as connection:
await connection.run_sync(Base.metadata.drop_all)
await connection.run_sync(Base.metadata.create_all)
async with async_session(bind=connection) as session:
yield session
await session.flush()
await session.rollback()
Having only the above session we could already start to create our entries in the tests. Unfortunately, when we will try to connect to DB in our handler we will have the issue.
To solve it we need to override the
@pytest.fixture()
def override_get_db(db_session: AsyncSession) -> Callable:
async def _override_get_db():
yield db_session
return _override_get_db
@pytest.fixture()
def app(override_get_db: Callable) -> FastAPI:
from app.api.dependencies.db import get_db
from app.main import app
app.dependency_overrides[get_db] = override_get_db
return app
Finally, we need to create an async testing client for which httpx lib will help us.
from typing import AsyncGenerator
import pytest
from fastapi import FastAPI
from httpx import AsyncClient
@pytest.fixture()
async def async_client(app: FastAPI) -> AsyncGenerator:
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
Having our fixtures configured there is nothing more please than create our first test.
The important part in testing is to remember to define pytest.mark.asyncio
.
It can be done as it is for example using pytestmark
or by attaching it as the decorator to every test method.
from unittest import mock
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
from app.db.repositories.coupons import CouponsRepository
from app.models.schema.coupons import InCouponSchema
pytestmark = pytest.mark.asyncio
async def test_coupon_create(
async_client: AsyncClient, db_session: AsyncSession
) -> None:
coupons_repository = CouponsRepository(db_session)
payload = {
"code": "PIOTR",
"init_count": 100,
}
response = await async_client.post("/v1/coupons/", json=payload)
coupon = await coupons_repository.get_by_id(response.json()["id"])
assert response.status_code == status.HTTP_201_CREATED
assert response.json() == {
"code": payload["code"],
"init_count": payload["init_count"],
"remaining_count": payload["init_count"],
"id": str(coupon.id),
}
This setup works like a charm. It was battle-tested on the big load services. Of course, there were some issues like connection pooling and closed connections in the middle of the request (but this is a topic for the next blog post!).
My overall feelings are quite good, it takes some time to set up everything and well test it, but from my perspective, it is worth it. Does it give you much to have your DB connections async? It depends on the use-case…
orm_mode = True