This is already half a year since I started my first FastAPI project and it looks great!
In the beginning, I’ve been recruiting to one of the companies and I have received a recruitment assignment which was about creating a small 2h project with Python and framework of choice.
I’ve chosen a FastAPI because I have never used it before but I have always wanted to learn it (I think that it is a good idea to check yourself with new technology). In my previous company, I did not have much time to use the new tech stack so every occasion is good.
Here is my project if somebody would be interested. This project used a base sync setup due to best practices from tiangolo FastAPI documentation.
This article will focus on showing a configuration of FastAPI using asyncio PostgreSQL setup (using SQLAlchemy <1.4 with databases) and how to test it. Setup will include alembic to perform migrations.
The description will include the most important parts with a short explanation.
To check out the whole codebase please visit example GitHub project
Everything will be running on docker:
version: '3'
services:
postgres:
image: postgres
restart: always
ports:
- "5432:5432"
expose:
- "5432"
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- postgresql_data:/var/lib/postgresql/data/
volumes:
postgresql_data:
def get_db() -> databases.Database:
database_url = config.DATABASE_URL
options = {
"min_size": config.DB_MIN_SIZE,
"max_size": config.DB_MAX_SIZE,
"force_rollback": config.DB_FORCE_ROLL_BACK,
}
return databases.Database(database_url, **options)
database = get_db()
Base = declarative_base()
metadata = Base.metadata
application = FastAPI(
title=config.SERVICE_NAME,
description=config.DESCRIPTION,
debug=config.DEBUG,
)
def create_start_app_handler(app: FastAPI) -> Callable:
async def start_app() -> None:
logging.info("connecting to a database")
await database.connect()
logging.info("Database connection - successful")
return start_app
def create_stop_app_handler(app: FastAPI) -> Callable:
async def stop_app() -> None:
logging.info("Closing connection to database")
await database.disconnect()
logging.info("Database connection - closed")
return stop_app
application.add_event_handler("startup", create_start_app_handler(application))
application.add_event_handler("shutdown", create_stop_app_handler(application))
almebic init alembic
alembic/env.py
config = context.config
config.set_main_option("sqlalchemy.url", app_config.DATABASE_URL)
target_metadata = metadata
To demonstrate the functionality the app will include Article table using SQLAlcemy in imperative mapping.
from sqlalchemy import Column, String, Table, Text, DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from app.db.base import metadata
Article = Table(
"article",
metadata,
Column(
"id",
UUID(),
primary_key=True,
),
Column("title", String(65)),
Column("slug", String(65), nullable=False, unique=True),
Column("text", Text),
Column("created_at", DateTime(timezone=True)),
)
Import tables in the alembic/env.py
so alembic can pick it up for revision
Create migration files
alembic revision --autogenerate
alembic upgrade head
At this point, the application should have the latest database schema initialized and connect to every endpoint.
import datetime
from uuid import UUID
from app.models.base import BaseSchema
class ArticleBase(BaseSchema):
title: str
slug: str
text: str
class ArticleIn(ArticleBase):
created_at: datetime.datetime = datetime.datetime.now(tz=datetime.timezone.utc)
class ArticleOut(ArticleBase):
id: UUID
created_at: datetime.datetime
class ArticleRepository(BaseRepository):
@property
def _table(self) -> sqlalchemy.Table:
return Article
@property
def _schema_out(self) -> Type[ArticleOut]:
return ArticleOut
@property
def _schema_in(self) -> Type[ArticleIn]:
return ArticleIn
async def _list(self) -> List[Mapping]:
query = self._table.select()
return await self._db.fetch_all(query=query)
async def list(self) -> List:
rows = await self._list()
return [self._schema_out(**dict(row.items())) for row in rows
router = APIRouter()
@router.get("/")
async def articles_list() -> List[ArticleOut]:
article_repo: ArticleRepository = ArticleRepository()
articles = await article_repo.list()
return articles
articles_router = APIRouter()
articles_router.include_router(articles.router, prefix="/articles")
application.include_router(api.api_router, prefix="/api/v1")
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
curl -X 'GET' \
'http://127.0.0.1:8000/api/v1/articles/' \
-H 'accept: application/json'
@pytest.fixture(autouse=True)
async def db() -> AsyncGenerator:
await database.connect()
yield
await database.disconnect()
@pytest.fixture()
async def async_client() -> AsyncGenerator:
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture()
def article_data() -> ArticleIn:
return ArticleIn(title="Test article", slug="Test slug", text="Test text")
async def test_articles_list(async_client: AsyncClient, article_data: ArticleIn):
repo = ArticleRepository()
article: ArticleOut = await repo.create(article_data)
response = await async_client.get("/api/v1/articles")
assert response.json() == [
{
"id": str(article.id),
"created_at": article.created_at.isoformat(),
"slug": article.slug,
"text": article.text,
"title": article.title,
}
]
Feel free to check the whole code example here
This is a small and modified example of how it can be used. I have started using this configuration in production environments and it works like a charm. Unfortunately, not all packages are supporting asyncio yet but for most of my use cases, most things are covered e.g. async requests to other services.
For further implementation, it is worth investing more time to prepare a cookiecutter for the project setup (maybe I will share mine in the next article).
I also started looking into more useful design patterns, so I can recommend you all this amazing repository. I hope that maybe for some of you it will open new horizons!