Application Factory
Part 1, Chapter 4
In this chapter, we'll refactor the current project structure using the factory pattern to make testing and scaling easier.
Objectives
- Create an application factory pattern for initializing a FastAPI app
- Configure Celery to work with the application factory pattern
- Manage changes to the database with SQLAlchemy and Alembic
App Factory
Add a new folder called "project". Then, add an __init__.py file:
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
create_app
is a factory function, which can be called multiple times, that returns a FastAPI app for us to use.
Update main.py like so to create a FastAPI app using the above factory function:
from project import create_app
app = create_app()
Test:
(venv)$ uvicorn main:app --reload
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO: Started reloader process [96439] using watchgod
INFO: Started server process [96482]
INFO: Waiting for application startup.
INFO: Application startup complete.
Visit http://localhost:8000 to ensure the app still works.
Database Support
Dependencies
Add the following dependencies to requirements.txt:
SQLAlchemy==2.0.25
alembic==1.13.1
Install:
(venv)$ pip install -r requirements.txt
Notes:
- For now, since Celery doesn't support asyncio very well (since it was developed before asyncio), we're using SQLAlchemy as our ORM because it can be used with both FastAPI and Celery.
- Alembic is a database migration tool for SQLAlchemy.
Config
Next, within "project", create a config.py file:
import os
import pathlib
from functools import lru_cache
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
class DevelopmentConfig(BaseConfig):
pass
class ProductionConfig(BaseConfig):
pass
class TestingConfig(BaseConfig):
pass
@lru_cache()
def get_settings():
config_cls_dict = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig
}
config_name = os.environ.get("FASTAPI_CONFIG", "development")
config_cls = config_cls_dict[config_name]
return config_cls()
settings = get_settings()
Notes:
- In
get_settings
, we used theFASTAPI_CONFIG
env variable to control which configuration to use. For example, during development,DevelopmentConfig
will be used andTestingConfig
will be used during test. - I do not recommend pydantic
BaseSettings
here because it might cause Celery to raise[ERROR/MainProcess] pidbox command error: KeyError('__signature__')
error when we launchFlower
Now project would seem like this:
├── main.py
├── project
│ ├── __init__.py
│ └── config.py
└── requirements.txt
Import SQLAlchemy
Create a new file called project/database.py:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from project.config import settings
# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
Config Alembic
Init Alembic:
(venv)$ alembic init alembic
Your project structure will now look like this:
├── alembic # new
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
├── alembic.ini # new
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ └── database.py
└── requirements.txt
Update alembic/env.py:
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from project import create_app # new
from project.config import settings # new
from project.database import Base # new
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)) # new
fastapi_app = create_app() # new
target_metadata = Base.metadata # new
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
...
Notes:
- We used
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))
to set the database connection string. - Then, we used
create_app
to create a newfastapi_app
instance to ensure the relevant models are loaded. - Finally, we added
target_metadata = Base.metadata
so that new models are discovered by Alembic.
To create an empty db.sqlite3
, run:
(venv)$ python
>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()
(venv)ls db.sqlite3
db.sqlite3
Let's go ahead and migrate the database even though we don't have any models yet:
(venv)$ alembic revision --autogenerate
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Generating alembic/versions/399011ac3c75_.py ... done
(venv)$ alembic upgrade head
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 399011ac3c75, empty message
Application Structure
If you have experience with Django or Flask, you've probably used "Django Apps" or "Flask Blueprints" to break up larger applications by grouping common functionality into reusable components.
Next, we'll do the same thing for FastAPI.
Create a "users" folder inside "project". Add an __init__.py file to it:
from fastapi import APIRouter
users_router = APIRouter(
prefix="/users",
)
from . import models # noqa
Add a User
model to a new file called project/users/models.py:
from sqlalchemy import Column, Integer, String
from project.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(128), unique=True, nullable=False)
email = Column(String(128), unique=True, nullable=False)
def __init__(self, username, email, *args, **kwargs):
self.username = username
self.email = email
Update project/__init__.py:
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
from project.users import users_router # new
app.include_router(users_router) # new
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
So, when from project.users import users_router
is called, the code in project/users/__init__.py will run. models.py will be imported as well.
Your project structure should now look like this:
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ └── 399011ac3c75_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ └── models.py
└── requirements.txt
Notes:
- main.py - uses
create_app
to create a new FastAPI app - project/__init__.py - Factory function
- project/config.py - FastAPI config
- "project/users" - relevant models and routes for
Users
Database Operations
Next, let's create a new database migration and create the table for the above User
model:
(venv)$ alembic revision --autogenerate
# INFO [alembic.autogenerate.compare] Detected added table 'users'
(venv)$ alembic upgrade head
# Create users table
We can interact with the database inside the Python shell:
(venv)$ python
Then, within the shell, run:
>>> from main import app
>>> from project.database import SessionLocal
>>> from project.users.models import User
>>> user = User(username='test1', email='[email protected]')
>>> session = SessionLocal()
>>> session.add(user)
>>> session.commit()
>>>
>>> new_session = SessionLocal()
>>> new_session.query(User).first().username
'test1'
>>> exit()
Add Celery
Update BaseConfig
in project/config.py, adding CELERY_BROKER_URL
and CELERY_RESULT_BACKEND
:
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") # NEW
CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0") # NEW
Create a new file called project/celery_utils.py:
from celery import current_app as current_celery_app
from project.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace="CELERY")
return celery_app
Notes:
create_celery
is a factory function that configures and then returns a Celery app instance.- Rather than creating a new Celery instance, we used current_app so that shared tasks will work as expected.
celery_app.config_from_object(settings, namespace="CELERY")
means all celery-related configuration keys should be prefixed withCELERY_
. For example, to configure thebroker_url
, we should useCELERY_BROKER_URL
Update project/__init__.py:
from fastapi import FastAPI
from project.celery_utils import create_celery # new
def create_app() -> FastAPI:
app = FastAPI()
# do this before loading routes # new
app.celery_app = create_celery() # new
from project.users import users_router
app.include_router(users_router)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
Create a new file called project/users/tasks.py:
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
Notes:
- Many resources on the web recommend using
celery.task
. This might cause circular imports since you'll have to import the Celery instance. - We used
shared_task
to make our code reusable, which, again, requirescurrent_app
increate_celery
instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected.
Update project/users/__init__.py to import the above tasks.py:
from fastapi import APIRouter
users_router = APIRouter(
prefix="/users",
)
from . import models, tasks # noqa
Now, the Celery tasks will be found when we launch the worker.
Update main.py:
from project import create_app
app = create_app()
celery = app.celery_app
Your project structure should now look like this:
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 399011ac3c75_.py
│ └── b48ac95a682c_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│ ├── __init__.py
│ ├── celery_utils.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ ├── models.py
│ └── tasks.py
└── requirements.txt
Manual Test
Run a worker in one terminal window:
(venv)$ celery -A main.celery worker --loglevel=info
[config]
.> app: default:0x10f681940 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
From the tasks
section above, we can see project.users.tasks.divide
, which means that the Celery worker found the task successfully.
Enter the Python shell in a new terminal:
(venv)$ python
Send some tasks to the Celery worker:
>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:
[2024-01-04 16:08:33,989: INFO/MainProcess] Task project.users.tasks.divide[65d78422-1d71-4284-87e5-cf7d2437b57e] received
[2024-01-04 16:08:39,002: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[65d78422-1d71-4284-87e5-cf7d2437b57e] succeeded in 5.010942939989036s: 0.5
✓ Mark as Completed