Application Factory

Part 1, Chapter 4


In this chapter, we'll refactor the current project structure using the factory pattern to make testing and scaling easier.

Objectives

  1. Create an application factory pattern for initializing a FastAPI app
  2. Configure Celery to work with the application factory pattern
  3. Manage changes to the database with SQLAlchemy and Alembic

App Factory

Add a new folder called "project". Then, add an __init__.py file:

from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

create_app is a factory function, which can be called multiple times, that returns a FastAPI app for us to use.

Update main.py like so to create a FastAPI app using the above factory function:

from project import create_app

app = create_app()

Test:

(env)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [96439] using watchgod
INFO:     Started server process [96482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Visit http://localhost:8000 to ensure the app still works.

Database Support

Dependencies

Add the following dependencies to requirements.txt:

alembic==1.6.5
SQLAlchemy==1.4.20

Install:

(env)$ pip install -r requirements.txt

Notes:

  1. For now, since Celery doesn't support asyncio very well (since it was developed before asyncio), we're using SQLAlchemy as our ORM because it can be used with both FastAPI and Celery.
  2. Alembic is a database migration tool for SQLAlchemy.

Config

Next, within "project", create a config.py file:

import os
import pathlib
from functools import lru_cache


class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}


class DevelopmentConfig(BaseConfig):
    pass


class ProductionConfig(BaseConfig):
    pass


class TestingConfig(BaseConfig):
    pass


@lru_cache()
def get_settings():
    config_cls_dict = {
        "development": DevelopmentConfig,
        "production": ProductionConfig,
        "testing": TestingConfig
    }

    config_name = os.environ.get("FASTAPI_CONFIG", "development")
    config_cls = config_cls_dict[config_name]
    return config_cls()


settings = get_settings()

Notes:

  1. In get_settings, we used the FASTAPI_CONFIG env variable to control which configuration to use. For example, during development, DevelopmentConfig will be used and TestingConfig will be used during test.
  2. I do not recommend pydantic BaseSettings here because it might cause Celery to raise [ERROR/MainProcess] pidbox command error: KeyError('__signature__') error when we launch Flower

Now project would seem like this:

├── main.py
├── project
│   ├── __init__.py
│   └── config.py
└── requirements.txt

Import SQLAlchemy

Create a new file called project/database.py:

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from project.config import settings

# https://fastapi.tiangolo.com/tutorial/sql-databases/#create-the-sqlalchemy-engine
engine = create_engine(
    settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

Config Alembic

Init Alembic:

(env)$ alembic init alembic

Your project structure will now look like this:

├── alembic               # new
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
├── alembic.ini           # new
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   └── database.py
└── requirements.txt

Update alembic/env.py:

from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

from project import create_app                 # new
from project.config import settings            # new
from project.database import Base              # new

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))        # new

fastapi_app = create_app()    # new

target_metadata = Base.metadata       # new

# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

...

Notes:

  1. We used config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)) to set the database connection string.
  2. Then, we used create_app to create a a new fastapi_app instance to ensure the relevant models are loaded.
  3. Finally, we added target_metadata = Base.metadata so that new models are discovered by Alembic.

To create an empty db.sqlite3, run:

(env)$ python

>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()

(env)ls db.sqlite3
db.sqlite3

Let's go ahead and migrate the database even though we don't have any models yet:

(env)$ alembic revision --autogenerate

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
  Generating /fastapi-celery-project/alembic/versions/4ea12e629032_.py ...  done

(env)$ alembic upgrade head
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 4ea12e629032, empty message

Application Structure

If you have experience with Django or Flask, you've probably used "Django Apps" or "Flask Blueprints" to break up larger applications by grouping common functionality into reusable components.

Next, we'll do the same thing for FastAPI.

Create a "users" folder inside "project". Add an __init__.py file to it:

from fastapi import APIRouter

users_router = APIRouter(
    prefix="/users",
)

from . import models # noqa

Add a User model to a new file called project/users/models.py:

from sqlalchemy import Column, Integer, String

from project.database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(128), unique=True, nullable=False)
    email = Column(String(128), unique=True, nullable=False)

    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email

Update project/__init__.py:

from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    from project.users import users_router                # new
    app.include_router(users_router)                      # new

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

So, when from project.users import users_router is called, the code in project/users/__init__.py will run. models.py will be imported as well.

Your project structure should now look like this:

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       └── models.py
└── requirements.txt

Notes:

  1. main.py - uses create_app to create a new FastAPI app
  2. project/__init__.py - Factory function
  3. project/config.py - FastAPI config
  4. "project/users" - relevant models and routes for Users

Database Operations

Next, let's create a new database migration and create the table for the above User model:

(env)$ alembic revision --autogenerate
# INFO  [alembic.autogenerate.compare] Detected added table 'users'

(env)$ alembic upgrade head
# Create users table

We can interact with the database inside the Python shell:

(env)$ python

Then, within the shell, run:

>>> from main import app
>>> from project.database import SessionLocal
>>> from project.users.models import User

>>> user = User(username='test1', email='[email protected]')
>>> session = SessionLocal()
>>> session.add(user)
>>> session.commit()
>>>
>>> new_session = SessionLocal()
>>> new_session.query(User).first().username
'test1'

>>> exit()

Add Celery

Update BaseConfig in project/config.py, adding CELERY_BROKER_URL and CELERY_RESULT_BACKEND:

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")            # NEW
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")    # NEW

Create a new file called project/celery_utils.py:

from celery import current_app as current_celery_app

from project.config import settings


def create_celery():
    celery_app = current_celery_app
    celery_app.config_from_object(settings, namespace="CELERY")

    return celery_app

Notes:

  1. create_celery is a factory function that configures and then returns a Celery app instance.
  2. Rather than creating a new Celery instance, we used current_app so that shared tasks will work as expected.
  3. celery_app.config_from_object(settings, namespace="CELERY") means all celery-related configuration keys should be prefixed with CELERY_. For example, to configure the broker_url, we should use CELERY_BROKER_URL

Update project/__init__.py:

from fastapi import FastAPI

from project.celery_utils import create_celery


def create_app() -> FastAPI:
    app = FastAPI()

    # do this before loading routes
    app.celery_app = create_celery()

    from project.users import users_router
    app.include_router(users_router)

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

Create a new file called project/users/tasks.py:

from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

Notes:

  1. Many resources on the web recommend using celery.task. This might cause circular imports since you'll have to import the Celery instance.
  2. We used shared_task to make our code reusable, which, again, requires current_app in create_celery instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected.

Update project/users/__init__.py:

from fastapi import APIRouter

users_router = APIRouter(
    prefix="/users",
)

from . import models, tasks # noqa

Now, the Celery tasks will be found when we launch the worker.

Update main.py:

from project import create_app

app = create_app()
celery = app.celery_app

Your project structure should now look like this:

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 31d362f0573c_.py
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── celery_utils.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       ├── models.py
│       └── tasks.py
└── requirements.txt

Manual Test

Run a worker in one terminal window:

(env)$ celery -A main.celery worker --loglevel=info

[config]
.> app:         default:0x10f681940 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide

Enter the Python shell in a new terminal:

(env)$ python

Send some tasks to the Celery worker:

>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)

Back in the first terminal window, you should see the logs from the worker:

[2021-07-10 10:11:40,244: INFO/MainProcess] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] received
[2021-07-10 10:11:45,253: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] succeeded in 5.007629750999996s: 0.5



Mark as Completed