Application Factory

Part 1, Chapter 4


In this chapter, we'll refactor the current project structure using the application factory pattern to make testing and scaling easier.

Objectives

  1. Create an application factory pattern for initializing a Flask app
  2. Configure Celery to work with the application factory pattern
  3. Utilize Flask-SQLAlchemy to interact with a relational database
  4. Manage changes to the database with Flask-Migrate

Refactor

Update requirements.txt:

Flask-Migrate
Flask-SQLAlchemy

Install

(env)$ pip install -r requirements.txt

App Factory

Add a new folder called "project". Then, add an __init__.py file:

import os

from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy


# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()


def create_app():

    # instantiate the app
    app = Flask(__name__)

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

Notes:

  1. After instantiating the two Flask extensions, init_app(app)configures the extensions to work with the Flask app
  2. create_app is a factory function, which can be called multiple times, that returns a Flask app for us to use

Update app.py like so to create a Flask app using the above factory function:

from project import create_app

app = create_app()

Config

Next, within "project", create a config.py file:

import os
from pathlib import Path


class BaseConfig:
    """Base configuration"""
    BASE_DIR = Path(__file__).parent.parent

    TESTING = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL", f"sqlite:////{BASE_DIR}/db.sqlite3")


class DevelopmentConfig(BaseConfig):
    """Development configuration"""
    DEBUG = True


class ProductionConfig(BaseConfig):
    """Production configuration"""
    DEBUG = False


config = {
    "development": DevelopmentConfig,
    "production": ProductionConfig,
}

To wire up the config, update project/__init__.py like so:

import os

from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.config import config  # new


# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()


def create_app(config_name=None):  # updated
    # new
    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # new
    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

Blueprint

From the Flask docs:

Flask uses a concept of blueprints for making application components and supporting common patterns within an application or across applications. Blueprints can greatly simplify how large applications work and provide a central means for Flask extensions to register operations on applications.

Create a "users" folder inside "project". Add an __init__.py file to it:

from flask import Blueprint

users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")

from . import models  # noqa

Add a User model to a new file called project/users/models.py:

from project import db


class User(db.Model):

    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(128), unique=True, nullable=False)
    email = db.Column(db.String(128), unique=True, nullable=False)

    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email

Add the blueprint to the factory function in project/__init__.py:

import os

from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.config import config


# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()


def create_app(config_name=None):
    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)

    # new
    # register blueprints
    from project.users import users_blueprint
    app.register_blueprint(users_blueprint)


    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

Your project structure should now look like this:

├── app.py
├── project
│   ├── __init__.py
│   ├── config.py
│   └── users
│       ├── __init__.py
│       └── models.py
└── requirements.txt

Notes:

  1. app.py - uses create_app to create a new Flask app
  2. project/__init__.py - Flask extensions and factory function
  3. project/config.py - Flask config
  4. "project/users" - relevant models and views for the Users blueprint

Database Operations

Next, let's create a new database migration and create the table for the above User model:

(env)$ FLASK_APP=app.py flask db init
# inits the migrations, creates a "migrations" directory

(env)$ FLASK_APP=app.py flask db migrate -m "Initial migration."
# creates the initial migration file inside "migrations/versions"

(env)$ FLASK_APP=app.py flask db upgrade
# creates the sqlite database (project/db.sqlite3) along with the users table

Next, let's interact with the database inside the Flask shell:

(env)$ FLASK_APP=app.py flask shell

Then, within the shell, run:

>>> from project.users.models import User
>>> user = User(username='test1', email='[email protected]')
>>> db.session.add(user)
>>> db.session.commit()
>>>
>>> User.query.all()
[<User 1>]
>>> User.query.first().username
'test1'

Curious as to why we don't have to import db via from project import db? We added it to the shell context with shell_context_processor in the create_app function.

Add Celery

Flask-CeleryExt makes it easy to integrate Celery and Flask so that Celery tasks have access to Flask's app context.

Add Flask-CeleryExt to the requirements.txt file:

Flask-CeleryExt

Install:

(env)$ pip install -r requirements.txt

Update project/config.py, adding CELERY_BROKER_URL and CELERY_RESULT_BACKEND to BaseConfig:

class BaseConfig:
    """Base configuration"""
    BASE_DIR = Path(__file__).parent.parent

    TESTING = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:////{BASE_DIR}/db.sqlite3')

    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
    CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")

Create a new file called project/celery_utils.py:

from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    return celery

Notes:

  1. make_celery is a factory function that configures and then returns a Celery app instance.
  2. Rather than creating a new Celery instance, we used current_app so that shared tasks work as expected.
  3. celery.config_from_object(app.config, namespace="CELERY") indicates that all Celery-related configuration keys should be written in uppercase and prefixed with CELERY_. For example, to configure the broker_url, your should use CELERY_BROKER_URL.

Update project/__init__.py:

import os

from flask import Flask
from flask_celeryext import FlaskCeleryExt  # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.celery_utils import make_celery  # new
from project.config import config


# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery)  # new


def create_app(config_name=None):
    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)
    ext_celery.init_app(app)  # new

    # register blueprints
    from project.users import users_blueprint
    app.register_blueprint(users_blueprint)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

When we instantiated ext_celery, we passed the custom application factory make_celery to it. If we hadn't done this, the FlaskCeleryExt would create Celery app for us automatically, which is not recommended since we'll be running shared tasks.

Create a new file called project/users/tasks.py:

from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

Notes:

  1. Many resources on the web recommend using celery.task. This can cause circular imports since you'll have to import the Celery instance.
  2. We used shared_task to make our code reusable, which, again, requires current_app in make_celery instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected.

Update project/users/__init__.py:

from flask import Blueprint

users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")

from . import models, tasks  # noqa

Now, the Celery tasks will be found when we launch the worker.

Update app.py:

from project import create_app, ext_celery

app = create_app()
celery = ext_celery.celery

Your project structure should now look like this:

├── app.py
├── migrations
│   ├── README
│   ├── __pycache__
│   │   └── env.cpython-39.pyc
│   ├── alembic.ini
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       └── 465cfae61f7a_initial_migration.py
├── project
│   ├── __init__.py
│   ├── config.py
│   ├── db.sqlite3
│   └── users
│       ├── __init__.py
│       ├── models.py
│       └── tasks.py
└── requirements.txt

Manual Test

Run a worker in one terminal window:

(env)$ celery worker -A app.celery --loglevel=info

[config]
.> app:         default:0x110bdea60 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide

Enter the Flask shell in a new terminal:

(env)$ FLASK_APP=app.py flask shell

Send some tasks to the Celery worker:

>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)

Back in the first terminal window, you should see the logs from the worker:

[2021-03-10 16:51:49,788: INFO/MainProcess] Received task: project.users.tasks.divide[a984913b-1011-4fc8-8997-b9449abf9fca]
[2021-03-10 16:51:54,798: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[a984913b-1011-4fc8-8997-b9449abf9fca] succeeded in 5.008229221999983s: 0.5



Mark as Completed