rogulski.it

My name is Piotr, a passionate pythonista and this is my blog!

My experience with FastAPI and async database connection

Posted at — Jun 6, 2021

Introduction

This is already half a year since I started my first FastAPI project and it looks great!

In the beginning, I’ve been recruiting to one of the companies and I have received a recruitment assignment which was about creating a small 2h project with Python and framework of choice.

I’ve chosen a FastAPI because I have never used it before but I have always wanted to learn it (I think that it is a good idea to check yourself with new technology). In my previous company, I did not have much time to use the new tech stack so every occasion is good.

Here is my project if somebody would be interested. This project used a base sync setup due to best practices from tiangolo FastAPI documentation.

About

This article will focus on showing a configuration of FastAPI using asyncio PostgreSQL setup (using SQLAlchemy <1.4 with databases) and how to test it. Setup will include alembic to perform migrations.

The description will include the most important parts with a short explanation.

To check out the whole codebase please visit example GitHub project

Database configration

Everything will be running on docker:

version: '3'

services:
  postgres:
    image: postgres
    restart: always
    ports:
      - "5432:5432"
    expose:
      - "5432"
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    volumes:
      - postgresql_data:/var/lib/postgresql/data/

volumes:
  postgresql_data:

FastAPI connection with a database

  1. Create a database instance
def get_db() -> databases.Database:
    database_url = config.DATABASE_URL
    options = {
        "min_size": config.DB_MIN_SIZE,
        "max_size": config.DB_MAX_SIZE,
        "force_rollback": config.DB_FORCE_ROLL_BACK,
    }

    return databases.Database(database_url, **options)
  1. Create a database instance and metadata used later to connection
database = get_db()
Base = declarative_base()
metadata = Base.metadata
  1. Initialize a web application
application = FastAPI(
  title=config.SERVICE_NAME,
  description=config.DESCRIPTION,
  debug=config.DEBUG,
)
  1. Create functions to connect and disconnect to db
def create_start_app_handler(app: FastAPI) -> Callable:
    async def start_app() -> None:
        logging.info("connecting to a database")
        await database.connect()
        logging.info("Database connection - successful")

    return start_app

def create_stop_app_handler(app: FastAPI) -> Callable:
    async def stop_app() -> None:
        logging.info("Closing connection to database")
        await database.disconnect()
        logging.info("Database connection - closed")

    return stop_app
  1. Trigger events on application handler start and close
    application.add_event_handler("startup", create_start_app_handler(application))
    application.add_event_handler("shutdown", create_stop_app_handler(application))

Alembic setup

  1. Initialize migrations
almebic init alembic
  1. Update created config in alembic/env.py
config = context.config
config.set_main_option("sqlalchemy.url", app_config.DATABASE_URL)
target_metadata = metadata

Table creation

To demonstrate the functionality the app will include Article table using SQLAlcemy in imperative mapping.

  1. Table definition
from sqlalchemy import Column, String, Table, Text, DateTime, func
from sqlalchemy.dialects.postgresql import UUID

from app.db.base import metadata

Article = Table(
    "article",
    metadata,
    Column(
        "id",
        UUID(),
        primary_key=True,
    ),
    Column("title", String(65)),
    Column("slug", String(65), nullable=False, unique=True),
    Column("text", Text),
    Column("created_at", DateTime(timezone=True)),
)
  1. Import tables in the alembic/env.py so alembic can pick it up for revision

  2. Create migration files

alembic revision --autogenerate
  1. Run migrations
alembic upgrade head

At this point, the application should have the latest database schema initialized and connect to every endpoint.

API usage

  1. Define Article schemas
import datetime
from uuid import UUID

from app.models.base import BaseSchema

class ArticleBase(BaseSchema):
    title: str
    slug: str
    text: str

class ArticleIn(ArticleBase):
    created_at: datetime.datetime = datetime.datetime.now(tz=datetime.timezone.utc)

class ArticleOut(ArticleBase):
    id: UUID
    created_at: datetime.datetime
  1. Create a repository that will handle the connection between the database and the outside world
class ArticleRepository(BaseRepository):
    @property
    def _table(self) -> sqlalchemy.Table:
        return Article

    @property
    def _schema_out(self) -> Type[ArticleOut]:
        return ArticleOut

    @property
    def _schema_in(self) -> Type[ArticleIn]:
        return ArticleIn

    async def _list(self) -> List[Mapping]:
        query = self._table.select()
        return await self._db.fetch_all(query=query)

    async def list(self) -> List:
        rows = await self._list()
        return [self._schema_out(**dict(row.items())) for row in rows
  1. Create an API handler function
router = APIRouter()

@router.get("/")
async def articles_list() -> List[ArticleOut]:
    article_repo: ArticleRepository = ArticleRepository()
    articles = await article_repo.list()
    return articles
  1. Initialize the routing and include it in main application
articles_router = APIRouter()
articles_router.include_router(articles.router, prefix="/articles")

application.include_router(api.api_router, prefix="/api/v1")
  1. Run the application
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
  1. Perform a request
curl -X 'GET' \
  'http://127.0.0.1:8000/api/v1/articles/' \
  -H 'accept: application/json'

Testing

  1. Create async client fixtures
@pytest.fixture(autouse=True)
async def db() -> AsyncGenerator:
    await database.connect()
    yield
    await database.disconnect()

@pytest.fixture()
async def async_client() -> AsyncGenerator:
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac
  1. Create a test for the endpoint
@pytest.fixture()
def article_data() -> ArticleIn:
    return ArticleIn(title="Test article", slug="Test slug", text="Test text")

async def test_articles_list(async_client: AsyncClient, article_data: ArticleIn):
    repo = ArticleRepository()
    article: ArticleOut = await repo.create(article_data)
    response = await async_client.get("/api/v1/articles")

    assert response.json() == [
        {
            "id": str(article.id),
            "created_at": article.created_at.isoformat(),
            "slug": article.slug,
            "text": article.text,
            "title": article.title,
        }
    ]

Summary

Feel free to check the whole code example here

This is a small and modified example of how it can be used. I have started using this configuration in production environments and it works like a charm. Unfortunately, not all packages are supporting asyncio yet but for most of my use cases, most things are covered e.g. async requests to other services.

For further implementation, it is worth investing more time to prepare a cookiecutter for the project setup (maybe I will share mine in the next article).

I also started looking into more useful design patterns, so I can recommend you all this amazing repository. I hope that maybe for some of you it will open new horizons!