Application Factory
Part 1, Chapter 4
In this chapter, we'll refactor the current project structure using the application factory pattern to make testing and scaling easier.
Objectives
- Create an application factory pattern for initializing a Flask app
- Configure Celery to work with the application factory pattern
- Utilize Flask-SQLAlchemy to interact with a relational database
- Manage changes to the database with Flask-Migrate
Refactor
Update requirements.txt:
Flask-Migrate==4.0.7
Flask-SQLAlchemy==3.1.1
Install
(venv)$ pip install -r requirements.txt
App Factory
Add a new folder called "project". Then, add an __init__.py file:
import os
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
def create_app():
# instantiate the app
app = Flask(__name__)
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
Notes:
- After instantiating the two Flask extensions,
init_app(app)
configures the extensions to work with the Flask app create_app
is a factory function, which can be called multiple times, that returns a Flask app for us to use
Update app.py like so to create a Flask app using the above factory function:
from project import create_app
app = create_app()
Config
Next, within "project", create a config.py file:
import os
from pathlib import Path
class BaseConfig:
"""Base configuration"""
BASE_DIR = Path(__file__).parent.parent
TESTING = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
class DevelopmentConfig(BaseConfig):
"""Development configuration"""
DEBUG = True
class ProductionConfig(BaseConfig):
"""Production configuration"""
DEBUG = False
config = {
"development": DevelopmentConfig,
"production": ProductionConfig,
}
To wire up the config, update project/__init__.py like so:
import os
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.config import config # new
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_name=None): # updated
# new
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# new
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
Blueprint
From the Flask docs:
A Blueprint is a way to organize a group of related views and other code. Rather than registering views and other code directly with an application, they are registered with a blueprint. Then the blueprint is registered with the application when it is available in the factory function.
Create a "users" folder inside "project". Add an __init__.py file to it:
from flask import Blueprint
users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")
from . import models # noqa
Add a User
model to a new file called project/users/models.py:
from project import db
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(128), unique=True, nullable=False)
email = db.Column(db.String(128), unique=True, nullable=False)
def __init__(self, username, email, *args, **kwargs):
self.username = username
self.email = email
Add the blueprint to the factory function in project/__init__.py:
import os
from flask import Flask
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.config import config
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
# new
# register blueprints
from project.users import users_blueprint
app.register_blueprint(users_blueprint)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
Your project structure should now look like this:
├── app.py
├── project
│ ├── __init__.py
│ ├── config.py
│ └── users
│ ├── __init__.py
│ └── models.py
└── requirements.txt
Notes:
- app.py - uses
create_app
to create a new Flask app - project/__init__.py - Flask extensions and factory function
- project/config.py - Flask config
- "project/users" - relevant models and views for the Users blueprint
Database Operations
Next, let's create a new database migration and create the table for the above User
model:
(venv)$ FLASK_APP=app.py flask db init
# inits the migrations, creates a "migrations" directory
(venv)$ FLASK_APP=app.py flask db migrate -m "Initial migration."
# creates the initial migration file inside "migrations/versions"
(venv)$ FLASK_APP=app.py flask db upgrade
# creates the sqlite database (BASE_DIR/db.sqlite3) along with the users table
Next, let's interact with the database inside the Flask shell:
(venv)$ FLASK_APP=app.py flask shell
Then, within the shell, run:
>>> from project.users.models import User
>>> user = User(username='test1', email='[email protected]')
>>> db.session.add(user)
>>> db.session.commit()
>>>
>>> User.query.all()
[<User 1>]
>>> User.query.first().username
'test1'
Curious as to why we don't have to import
db
viafrom project import db
? We added it to the shell context with shell_context_processor in thecreate_app
function.
Add Celery
Flask-CeleryExt makes it easy to integrate Celery and Flask so that Celery tasks have access to Flask's app context.
Add Flask-CeleryExt to the requirements.txt file:
Flask-CeleryExt==0.5.0
Install:
(venv)$ pip install -r requirements.txt
Update project/config.py, adding CELERY_BROKER_URL
and CELERY_RESULT_BACKEND
to BaseConfig
:
class BaseConfig:
"""Base configuration"""
BASE_DIR = Path(__file__).parent.parent
TESTING = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL', f'sqlite:///{BASE_DIR}/db.sqlite3')
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") # new
CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0") # new
Create a new file called project/celery_utils.py:
from celery import current_app as current_celery_app
def make_celery(app):
celery = current_celery_app
celery.config_from_object(app.config, namespace="CELERY")
return celery
Notes:
make_celery
is a factory function that configures and then returns a Celery app instance.- Rather than creating a new Celery instance, we used current_app so that shared tasks work as expected.
celery.config_from_object(app.config, namespace="CELERY")
indicates that all Celery-related configuration keys should be written in uppercase and prefixed withCELERY_
. For example, to configure thebroker_url
, you should useCELERY_BROKER_URL
.
Update project/__init__.py to instantiate the FlaskCeleryExt
extension:
import os
from flask import Flask
from flask_celeryext import FlaskCeleryExt # new
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.celery_utils import make_celery # new
from project.config import config
# instantiate the extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery) # new
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
ext_celery.init_app(app) # new
# register blueprints
from project.users import users_blueprint
app.register_blueprint(users_blueprint)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
When we instantiated ext_celery
, we passed the custom application factory make_celery
to it. If we hadn't done this, the FlaskCeleryExt
would created a Celery app for us automatically, which is not recommended here.
Create a new file called project/users/tasks.py:
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
Notes:
- Many resources on the web recommend using
celery.task
. In some cases, this can cause circular imports since you'll have to import the Celery instance. - We used
shared_task
to make our code reusable, which, again, requirescurrent_app
inmake_celery
instead of creating a new Celery instance. Now, we can copy this file anywhere in the app and it will work as expected.
Update project/users/__init__.py:
from flask import Blueprint
users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")
from . import models, tasks # noqa
This will ensure that users/tasks.py will be loaded when Flask registers the blueprint and the tasks will be found by the Celery worker.
Update app.py:
from project import create_app, ext_celery
app = create_app()
celery = ext_celery.celery
Your project structure should now look like this:
├── app.py
├── db.sqlite3
├── migrations
│ ├── README
│ ├── alembic.ini
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ └── 5d3d0f517ebc_initial_migration.py
├── project
│ ├── __init__.py
│ ├── celery_utils.py
│ ├── config.py
│ └── users
│ ├── __init__.py
│ ├── models.py
│ └── tasks.py
└── requirements.txt
Manual Test
Run a worker in one terminal window:
(venv)$ celery -A app.celery worker --loglevel=info
[config]
.> app: default:0x110bdea60 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 16 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
Enter the Flask shell in a new terminal:
(venv)$ FLASK_APP=app.py flask shell
Send some tasks to the Celery worker:
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
Back in the first terminal window, you should see the logs from the worker:
[2024-08-20 11:53:45,557: INFO/MainProcess] Task project.users.tasks.divide[e91bbd39-7f1e-460c-8b0e-4bd92e070d6a] received
[2024-08-20 11:53:50,572: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[e91bbd39-7f1e-460c-8b0e-4bd92e070d6a] succeeded in 5.013083801000903s: 0.5
✓ Mark as Completed