Developing a Real-time Dashboard with FastAPI, Postgres, and WebSockets

Last updated July 17th, 2025

This tutorial will guide you through building a real-time inventory tracking dashboard using FastAPI, Postgres, and WebSockets. You'll learn how to stream live data updates from the backend to the frontend using Postgres' LISTEN/NOTIFY and FastAPI's WebSocket support.

Contents

Objectives

By the end of this tutorial, you should be able to:

  1. Develop a RESTful API with FastAPI for inventory management
  2. Implement real-time data updates using Postgres' LISTEN/NOTIFY
  3. Stream change events to the frontend via FastAPI WebSocket connections
  4. Track full document history of updated or deleted items using Postgres' triggers
  5. Build a responsive dashboard UI using HTML, CSS, and JavaScript

What Are We Building?

We'll create an inventory management API and a real-time dashboard that displays live inventory updates, quantity changes, and item deletions.

Final app:

Final App

Project Setup

We'll start by creating the project structure. The entire project will be housed in a single folder as we're making use of HTML, CSS, and JavaScript powered frontend.

Open your terminal and run:

$ mkdir fastapi-postgresql-realtime-tracker
$ cd fastapi-postgresql-realtime-tracker

Next, let's set up a virtual environment, create our base file and folder structure, and install the dependencies.

Start by creating and activating a virtual environment:

$ python -m venv venv
$ source venv/bin/activate

Create the following files and folders:

fastapi-postgresql-realtime-tracker
├── database.py
├── main.py
├── models.py
├── notify.py
├── requirements.txt
├── setup_database.sql
└── static
    ├── index.html
    └── index.js

Add the packages to the requirements.txt file:

asyncpg==0.30.0
fastapi==0.116.1
psycopg2-binary==2.9.10
pydantic==2.11.7
SQLAlchemy==2.0.41
uvicorn[standard]==0.35.0
websockets==15.0.1

Why these packages?

  • fastapi and uvicorn power the web API.
  • sqlalchemy is our ORM for database access.
  • asyncpg and psycopg2-binary are Postgres drivers (async and sync).
  • pydantic is for data validation.
  • websockets is for WebSocket support.

Finally, install the packages:

(venv)$ pip install -r requirements.txt

Schema

Let's define the schema for our inventory data. This will determine how data is stored in Postgres and validated in our API.

Pydantic schemas are used for validating data along with serializing (JSON -> Python) and de-serializing (Python -> JSON).

Let's start by defining the SQLAlchemy Inventory model. In models.py, add the following:

from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func

from database import Base


class Inventory(Base):
    __tablename__ = "inventory"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, nullable=False)
    quantity = Column(Integer, nullable=False)
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

In the code block above, we defined an inventory table, which will hold each record of an item created. The inventory model has four fields:

  1. id: The unique ID of an item stored in the inventory
  2. name: The name of the item. This field can not be null (empty)
  3. quantity: The quantity of the item stored in the inventory database
  4. updated_at: A date time object to record the timestamp for updates made to the inventory item

With the inventory model in place, let's define the Pydantic schemas for API validation. Add the following to models.py:

class InventoryBase(BaseModel):
    name: str
    quantity: int


class InventoryCreate(InventoryBase):
    pass


class InventoryUpdate(BaseModel):
    quantity: int


class InventoryResponse(InventoryBase):
    id: int
    updated_at: datetime

    class Config:
        from_attributes = True

In the code block above, we defined four Pydantic schemas:

  1. InventoryBase: The InventoryBase schema serves as the parent schema for our API validation. It has two required fields, name of the item and quantity of the item.
  2. InventoryCreate: The InventoryCreate schema is a child schema inheriting from the InventoryBase schema. This will be used in the route for creating a new inventory item.
  3. InventoryUpdate: The InventoryUpdate schema is a standalone schema. It has a non-nullable field quantity, indicating that only the quantity field of the inventory item can be updated.
  4. InventoryResponse: The InventoryResponse schema represents how the inventory data is rendered from the database to the user. It is a child schema of the InventoryBase schema. The Config class with from_attributes = True allows Pydantic to work seamlessly with SQLAlchemy models.

Don't forget to update the imports:

from datetime import datetime

from pydantic import BaseModel
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.sql import func

from database import Base

Database Setup

Now that we have our schema in place, let's set up Postgres and configure our application to communicate with it. Then, we'll configure the database connection and set up our tables and triggers for real-time notifications.

Step 1: Install Postgres

If you don't have Postgres installed, follow the official installation guide. Once installed, start the Postgres service and verify it's running:

$ psql postgres

You should see the Postgres prompt. You can also check the version:

$ psql --version

This tutorial uses Postgres v15.4, but any recent version should work.

Step 2: Configure the Database Connection

In database.py, add the following:

import os

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


# Database configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost/inventory_db")
ASYNC_DATABASE_URL = os.getenv("ASYNC_DATABASE_URL", "postgresql+asyncpg://localhost/inventory_db")

# Synchronous engine for migrations and initial setup
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Asynchronous engine for FastAPI
async_engine = create_async_engine(ASYNC_DATABASE_URL)
async_session = sessionmaker(async_engine, class_=AsyncSession, expire_on_commit=False)

Base = declarative_base()

In the code block above, we start by importing the create_engine and create_async_engine method from SQLAlchemy to allow us create a synchronous and asynchronous engine object to facilitate database communication to our Postgres instance. The sessionmaker is imported to create a session object for use within the application using the database URL set for both the synchronous and asynchronous instances. The synchronous instance is for migrations and initial setup, and the asynchronous instance is for our FastAPI application.

Next, add the following helper functions to get database sessions:

def get_db():
    """Dependency to get database session"""
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


async def get_async_db():
    """Dependency to get async database session"""
    async with async_session() as session:
        yield session

In the code block above, we define the methods to yield active sessions for both synchronous (get_db) and asynchronous (get_async_db). These functions will be used as dependencies in our FastAPI routes to provide database sessions.

Step 3: Create the Database Setup Script

In the previous step, we initialized the variables and methods necessary for interacting with our database from the application. Now, we'll create a SQL script to set up our database tables and create the trigger function for notifications. Create a file called setup_database.sql, in the project root, and add the following SQL commands:

-- Create the inventory table
CREATE TABLE IF NOT EXISTS inventory (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    quantity INTEGER NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

In the code block above, we created the inventory table and defined its associated fields: id, name, quantity and updated_at.

Next, add the following to the same file to create a trigger function to send a real-time notification whenever a row in the inventory table is inserted, updated, or deleted:

-- Create the trigger function for notifications
CREATE OR REPLACE FUNCTION notify_inventory_changes() RETURNS trigger AS $$
DECLARE
  payload JSON;
BEGIN
  IF (TG_OP = 'DELETE') THEN
    payload = json_build_object('event', TG_OP, 'data', row_to_json(OLD));
  ELSE
    payload = json_build_object('event', TG_OP, 'data', row_to_json(NEW));
  END IF;
  PERFORM pg_notify('inventory_channel', payload::text);
  RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

Let's understand what the code above does:

  1. Line 1 defines a new trigger function, notify_inventory_changes, that will be used in a table trigger. The function will return a trigger type, as required for Postgres triggers.
  2. In lines 2 - 3, we defined a variable called payload of type JSON. The variable will hold the notification data to be sent.
  3. Lines 4 - 12 hold the core logic for the notification trigger:
    • Line 4 begins the main function logic, and lines 5 - 7 builds the payload from old row data if the database operation is of type DELETE.
    • Lines 8 - 10 builds up the payload from new row data if the database operation type is an INSERT or an UPDATE.
    • Line 11 sends the payload as a notification on the inventory_channel using Postgres' pg_notify function.
    • Line 12 returns either the new or old row, as required by trigger functions. This ensures the trigger works for all operation types.
  4. Line 13 specifies that the function is written in Postgres' procedural language.

Next, let's create the trigger:

-- Create the trigger
DROP TRIGGER IF EXISTS inventory_trigger ON inventory;
CREATE TRIGGER inventory_trigger
AFTER INSERT OR UPDATE OR DELETE ON inventory
FOR EACH ROW EXECUTE FUNCTION notify_inventory_changes();

Here, we removed the old trigger if it doesn't exist and created a new trigger. We then specified that the trigger fires after an INSERT, UPDATE, or DELETE on the inventory table, for each row affected.

Routes

Now let's create the API routes to add, update, and delete inventory items. These routes will trigger changes in the database, which in turn will generate real-time notifications.

Step 1: WebSocket Connection Manager

WebSockets provide a persistent, full-duplex communication channel between a client and server over a single TCP connection, enabling real-time data exchange. Unlike traditional HTTP, WebSockets allow both the client and server to send messages to each other at any time without repeatedly reopening connections.

Let's start by configuring the WebSocket connection manager that'll broadcast events from the database in real-time. In main.py, add the following:

import asyncio
import logging
import json
from contextlib import asynccontextmanager
from typing import List

from fastapi import FastAPI, Depends, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session

from database import get_db, get_async_db, engine, Base, ASYNC_DATABASE_URL
from models import Inventory, InventoryCreate, InventoryUpdate, InventoryResponse
from notify import PostgresNotifier


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# WebSocket connection manager
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
        logger.info(f"WebSocket connected. Total connections: {len(self.active_connections)}")

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
        logger.info(f"WebSocket disconnected. Total connections: {len(self.active_connections)}")

    async def broadcast(self, message: dict):
        disconnected = []
        for connection in self.active_connections:
            try:
                await connection.send_text(json.dumps(message))
            except Exception as e:
                logger.error(f"Error sending message to WebSocket: {e}")
                disconnected.append(connection)
        # Remove disconnected connections
        for connection in disconnected:
            self.active_connections.remove(connection)

Here, we defined the ConnectionManager class with three methods:

  1. connect(): This method accepts a new WebSocket connection and adds it to the list of active connections.
  2. disconnect(): This method removes a WebSocket connection from the active list when the client disconnects or the connection is closed.
  3. broadcast(): This method sends a message to all connection clients and cleans up any disconnected clients.

Step 2: Application Lifecycle and WebSocket Notification Handling

Now, let's set up the application lifecycle and the logic for handling Postgres notifications and broadcasting them to WebSocket clients.

Add the following to main.py:

# Global instances
manager = ConnectionManager()
notifier = None


async def handle_postgres_notification(data: dict):
    """Handle PostgreSQL notifications and broadcast to WebSocket clients"""
    await manager.broadcast(data)

So, we defined a-

  1. Global instance of the WebSocket ConnectionManager class
  2. handle_postgres_notification() function to handle Postgres notifications and broadcast to WebSocket clients

Next, add the following lifespan function that runs on our FastAPI startup to main.py:

@asynccontextmanager
async def lifespan(app: FastAPI):
    global notifier

    # Create tables
    Base.metadata.create_all(bind=engine)
    logger.info("Database tables created")

    # Start PostgreSQL listener
    notifier = PostgresNotifier(ASYNC_DATABASE_URL.replace("+asyncpg", ""))
    notifier.add_listener(handle_postgres_notification)

    # Start listening in background
    task = asyncio.create_task(start_postgres_listener())

    yield

    # Shutdown
    task.cancel()
    if notifier:
        await notifier.disconnect()

In the lifespan function above, we instructed our FastAPI application to:

  1. Create tables on startup.
  2. Start the the Postgres listener PostgresNotifier. We'll define this in the next section.
  3. Create a background task for the listener to eavesdrop on all database interactions.
  4. Cancel the task and shutdown the notifier once the FastAPI application is shut down.

Next, create an asynchronous function to start the listener:

async def start_postgres_listener():
    """Start the PostgreSQL listener"""
    try:
        await notifier.listen_to_channel('inventory_channel')
        await notifier.start_listening()
    except Exception as e:
        logger.error(f"Error in PostgreSQL listener: {e}")

Step 3: FastAPI App and Static Files

In the previous step, we designed the application lifecycle and WebSocket notification handling. Now, create the FastAPI app and mount the static files for the frontend in main.py:

app = FastAPI(title="Real-Time Inventory Tracker", lifespan=lifespan)

# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")

Next, we'll define the API routes.

Step 4: API Routes

Define the following API routes in main.py:

@app.get("/")
async def read_root():
    """Serve the main page"""
    with open("static/index.html", "r") as f:
        html_content = f.read()
    return HTMLResponse(content=html_content)


@app.get("/api/inventory", response_model=List[InventoryResponse])
async def get_inventory(db: AsyncSession = Depends(get_async_db)):
    """Get all inventory items"""
    result = await db.execute(select(Inventory).order_by(Inventory.updated_at.desc()))
    items = result.scalars().all()
    return items


@app.post("/api/inventory", response_model=InventoryResponse)
async def create_inventory_item(
    item: InventoryCreate,
    db: AsyncSession = Depends(get_async_db)
):
    """Create a new inventory item"""
    db_item = Inventory(**item.dict())
    db.add(db_item)
    await db.commit()
    await db.refresh(db_item)
    return db_item


@app.put("/api/inventory/{item_id}", response_model=InventoryResponse)
async def update_inventory_item(
    item_id: int,
    item_update: InventoryUpdate,
    db: AsyncSession = Depends(get_async_db)
):
    """Update an inventory item's quantity"""
    result = await db.execute(select(Inventory).where(Inventory.id == item_id))
    db_item = result.scalar_one_or_none()

    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")

    db_item.quantity = item_update.quantity
    await db.commit()
    await db.refresh(db_item)
    return db_item


@app.delete("/api/inventory/{item_id}")
async def delete_inventory_item(
    item_id: int,
    db: AsyncSession = Depends(get_async_db)
):
    """Delete an inventory item"""
    result = await db.execute(select(Inventory).where(Inventory.id == item_id))
    db_item = result.scalar_one_or_none()

    if not db_item:
        raise HTTPException(status_code=404, detail="Item not found")

    await db.delete(db_item)
    await db.commit()
    return {"message": "Item deleted successfully"}

In the code above, we defined the routes to create, update, and delete inventory items using the asynchronous session we created. Let's add the WebSocket endpoint in the next step to allow us to broadcast changes from the database to connected clients.

Step 5: WebSocket Endpoint

Add the WebSocket endpoint to main.py:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint for real-time updates"""
    await manager.connect(websocket)
    try:
        while True:
            # Keep connection alive
            await websocket.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(websocket)

Here, we defined the WebSocket route to accept WebSocket connections from clients and remove disconnected clients from the connection manager.

Database Listener

To receive real-time updates from Postgres, we need a listener that reacts to LISTEN/NOTIFY events.

Add the following code to notify.py:

import asyncio
import json
import logging
from typing import Callable

import asyncpg


logger = logging.getLogger(__name__)


class PostgresNotifier:
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.connection = None
        self.listeners = []

    async def connect(self):
        """Connect to PostgreSQL"""
        try:
            self.connection = await asyncpg.connect(self.database_url)
            logger.info("Connected to PostgreSQL for notifications")
        except Exception as e:
            logger.error(f"Failed to connect to PostgreSQL: {e}")
            raise

    async def disconnect(self):
        """Disconnect from PostgreSQL"""
        if self.connection:
            await self.connection.close()
            logger.info("Disconnected from PostgreSQL")

We defined the PostgresNotifier class, which takes the database_url as an argument. The connect() method, as the name implies, connects to the database URL provided. Similarly, the disconnect() method closes the connection from the Postgres database.

Next, let's add the methods to add a listener and listen to a channel:

def add_listener(self, callback: Callable):
    """Add a callback function to handle notifications"""
    self.listeners.append(callback)

async def listen_to_channel(self, channel: str):
    """Listen to a specific PostgreSQL channel"""
    if not self.connection:
        await self.connect()

    await self.connection.add_listener(channel, self._handle_notification)
    logger.info(f"Listening to channel: {channel}")

Here, we defined two new methods:

  1. add_listener appends a callback function to handle notifications from the triggers.
  2. listen_to_channel listens to a specific Postgres channel for changes. For our application, we'll be listening to the inventory_channel.

Next, let's define methods to handle incoming notifications and control the listening:

async def _handle_notification(self, connection, pid, channel, payload):
    """Handle incoming notifications"""
    try:
        data = json.loads(payload)
        logger.info(f"Received notification: {data}")

        # Notify all registered listeners
        for listener in self.listeners:
            await listener(data)
    except Exception as e:
        logger.error(f"Error handling notification: {e}")

async def start_listening(self):
    """Start the listening loop"""
    if not self.connection:
        await self.connect()

    try:
        while True:
            await asyncio.sleep(0.1)  # Keep the connection alive
    except asyncio.CancelledError:
        logger.info("Listening cancelled")
    except Exception as e:
        logger.error(f"Error in listening loop: {e}")
    finally:
        await self.disconnect()

Notes:

  1. The _handle_notifications() method takes the connection, process ID, channel name, and payload. The method parses the payload and notifies all registered listeners.
  2. The start_listening() method keeps the Postgres connection alive and ensures the process continues to receive connections until the connection to the database is terminated.

Creating the Frontend

With the backend in place, let's build a simple frontend that updates in real-time as inventory items are created, updated, or deleted.

Step 1: HTML Structure

Add the following to static/index.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Real-Time Inventory Tracker</title>
    <!-- Styles omitted for brevity, use the styles from the original tutorial -->
</head>
<body>
    <div class="container">
        <div class="header">
            <h1>📦 Real-Time Inventory Tracker</h1>
            <p>Add, update, and track inventory items in real-time</p>
        </div>
        <div class="card">
            <div id="connection-status" class="status disconnected">
                🔴 Disconnected from server
            </div>
            <div class="form-section">
                <h2>Add New Item</h2>
                <form id="add-item-form">
                    <div class="form-group">
                        <input type="text" id="item-name" placeholder="Item name" required>
                        <input type="number" id="item-quantity" placeholder="Quantity" min="0" required>
                        <button type="submit" class="btn">Add Item</button>
                    </div>
                </form>
            </div>
            <div class="inventory-list">
                <h2>Inventory Items</h2>
                <div id="inventory-container">
                    <div class="loading">Loading inventory...</div>
                </div>
            </div>
        </div>
    </div>
    <script src="static/index.js"></script>
</body>
</html>

This code sets up the structure for the inventory dashboard, including a form for adding new items and a container for displaying the inventory list.

Step 2: JavaScript Application Logic

Next, add the following code to static/index.js:

class InventoryApp {
    constructor() {
        this.websocket = null;
        this.inventoryItems = new Map();
        this.isConnected = false;
        this.pendingUpdates = new Set();
        this.init();
    }
    init() {
        this.setupEventListeners();
        this.connectWebSocket();
        this.loadInitialData();
    }
    setupEventListeners() {
        const form = document.getElementById('add-item-form');
        form.addEventListener('submit', (e) => this.handleAddItem(e));
    }
    connectWebSocket() {
        const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
        const wsUrl = `${protocol}//${window.location.host}/ws`;
        this.websocket = new WebSocket(wsUrl);
        this.websocket.onopen = () => {
            this.updateConnectionStatus(true);
        };
        this.websocket.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                this.handleWebSocketMessage(data);
            } catch (error) {
                console.error('Error parsing WebSocket message:', error);
            }
        };
        this.websocket.onclose = () => {
            this.updateConnectionStatus(false);
            setTimeout(() => this.connectWebSocket(), 3000);
        };
        this.websocket.onerror = (error) => {
            console.error('WebSocket error:', error);
        };
    }
    updateConnectionStatus(connected) {
        this.isConnected = connected;
        const statusElement = document.getElementById('connection-status');
        if (connected) {
            statusElement.className = 'status connected';
            statusElement.textContent = '🟢 Connected to server';
        } else {
            statusElement.className = 'status disconnected';
            statusElement.textContent = '🔴 Disconnected from server';
        }
    }
    async loadInitialData() {
        try {
            const response = await fetch('/api/inventory');
            if (response.ok) {
                const items = await response.json();
                this.inventoryItems.clear();
                items.forEach(item => {
                    this.inventoryItems.set(item.id, item);
                });
                this.renderInventory();
            }
        } catch (error) {
            this.showError('Failed to load inventory data');
        }
    }
    handleWebSocketMessage(data) {
        const {event, data: itemData} = data;
        switch (event) {
            case 'INSERT':
                this.inventoryItems.set(itemData.id, itemData);
                this.renderInventory();
                this.showNotification(`Added: ${itemData.name}`, 'success');
                break;
            case 'UPDATE':
                this.inventoryItems.set(itemData.id, itemData);
                this.renderInventory();
                this.showNotification(`Updated: ${itemData.name}`, 'info');
                break;
            case 'DELETE':
                this.inventoryItems.delete(itemData.id);
                this.renderInventory();
                this.showNotification(`Deleted: ${itemData.name}`, 'warning');
                break;
            default:
                break;
        }
    }
    async handleAddItem(event) {
        event.preventDefault();
        const nameInput = document.getElementById('item-name');
        const quantityInput = document.getElementById('item-quantity');
        const name = nameInput.value.trim();
        const quantity = parseInt(quantityInput.value);
        if (!name || quantity < 0) {
            this.showError('Please enter a valid item name and quantity');
            return;
        }
        try {
            const response = await fetch('/api/inventory', {
                method: 'POST',
                headers: {'Content-Type': 'application/json'},
                body: JSON.stringify({name, quantity}),
            });
            if (response.ok) {
                nameInput.value = '';
                quantityInput.value = '';
            } else {
                const error = await response.json();
                this.showError(error.detail || 'Failed to add item');
            }
        } catch (error) {
            this.showError('Failed to add item');
        }
    }
    async updateItemQuantity(id, newQuantity) {
        if (this.pendingUpdates.has(id)) return;
        this.pendingUpdates.add(id);
        const item = this.inventoryItems.get(id);
        if (item) {
            const originalQuantity = item.quantity;
            item.quantity = newQuantity;
            this.renderInventory();
            try {
                const response = await fetch(`/api/inventory/${id}`, {
                    method: 'PUT',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify({quantity: newQuantity}),
                });
                if (!response.ok) {
                    item.quantity = originalQuantity;
                    this.renderInventory();
                    const error = await response.json();
                    this.showError(error.detail || 'Failed to update item');
                }
            } catch (error) {
                item.quantity = originalQuantity;
                this.renderInventory();
                this.showError('Failed to update item');
            } finally {
                this.pendingUpdates.delete(id);
            }
        }
    }
    async deleteItem(id) {
        if (!confirm('Are you sure you want to delete this item?')) return;
        const itemElement = document.querySelector(`[data-item-id="${id}"]`);
        if (itemElement) itemElement.style.opacity = '0.5';
        try {
            const response = await fetch(`/api/inventory/${id}`, {method: 'DELETE'});
            if (!response.ok) {
                if (itemElement) itemElement.style.opacity = '1';
                const error = await response.json();
                this.showError(error.detail || 'Failed to delete item');
            }
        } catch (error) {
            if (itemElement) itemElement.style.opacity = '1';
            this.showError('Failed to delete item');
        }
    }
    renderInventory() {
        const container = document.getElementById('inventory-container');
        if (this.inventoryItems.size === 0) {
            container.innerHTML = `
                <div class="empty-state">
                    <h3>No items in inventory</h3>
                    <p>Add your first item using the form above!</p>
                </div>
            `;
            return;
        }
        const sortedItems = Array.from(this.inventoryItems.values())
            .sort((a, b) => new Date(b.updated_at) - new Date(a.updated_at));
        container.innerHTML = sortedItems.map(item => this.renderInventoryItem(item)).join('');
    }
    renderInventoryItem(item) {
        const updatedAt = new Date(item.updated_at).toLocaleString();
        const isUpdating = this.pendingUpdates.has(item.id);
        return `
            <div class="inventory-item ${isUpdating ? 'updating' : ''}" data-item-id="${item.id}">
                <div class="item-info">
                    <div class="item-name">${this.escapeHtml(item.name)}</div>
                    <div class="item-meta">Last updated: ${updatedAt}</div>
                </div>
                <div class="item-actions">
                    <input
                        type="number"
                        class="quantity-input"
                        value="${item.quantity}"
                        min="0"
                        onchange="app.updateItemQuantity(${item.id}, parseInt(this.value))"
                        ${isUpdating ? 'disabled' : ''}
                    >
                    <button
                        class="btn btn-danger btn-small"
                        onclick="app.deleteItem(${item.id})"
                        ${isUpdating ? 'disabled' : ''}
                    >
                        Delete
                    </button>
                </div>
            </div>
        `;
    }
    escapeHtml(text) {
        const div = document.createElement('div');
        div.textContent = text;
        return div.innerHTML;
    }
    showNotification(message, type = 'info') {
        const notification = document.createElement('div');
        notification.className = `notification ${type}`;
        notification.textContent = message;
        notification.style.cssText = `
            position: fixed;
            top: 20px;
            right: 20px;
            padding: 12px 20px;
            border-radius: 6px;
            color: white;
            font-weight: 600;
            z-index: 1000;
            animation: slideIn 0.3s ease;
            max-width: 300px;
        `;
        switch (type) {
            case 'success':
                notification.style.background = '#28a745';
                break;
            case 'warning':
                notification.style.background = '#ffc107';
                notification.style.color = '#212529';
                break;
            case 'error':
                notification.style.background = '#dc3545';
                break;
            default:
                notification.style.background = '#17a2b8';
        }
        document.body.appendChild(notification);
        setTimeout(() => {
            notification.style.animation = 'slideOut 0.3s ease';
            setTimeout(() => {
                if (notification.parentNode) {
                    notification.parentNode.removeChild(notification);
                }
            }, 300);
        }, 3000);
    }
    showError(message) {
        this.showNotification(message, 'error');
    }
}

// Add CSS animations for notifications
const style = document.createElement('style');
style.textContent = `
    @keyframes slideIn {
        from {
            transform: translateX(100%);
            opacity: 0;
        }
        to {
            transform: translateX(0);
            opacity: 1;
        }
    }
    @keyframes slideOut {
        from {
            transform: translateX(0);
            opacity: 1;
        }
        to {
            transform: translateX(100%);
            opacity: 0;
        }
    }
`;
document.head.appendChild(style);

// Initialize the app
const app = new InventoryApp();

We defined a InventoryApp class to power our real-time inventory dashboard. It manages WebSocket connections to receive live updates, fetches and displays inventory items, and handles adding, updating, and deleting items. The UI is updated instantly as changes occur, and users receive notifications for each action, ensuring a responsive and interactive experience.

Real-time Inventory Dashboard

With both the backend and frontend set up, you can now run your application and see real-time updates in action.

Step 1: Set Up the Database

First, make sure your Postgres database is running. Then, create the database and run the setup script:

$ createdb inventory_db
$ psql -d inventory_db -f setup_database.sql

Step 2: Start the FastAPI Application

Start the FastAPI app using Uvicorn:

(venv)$ uvicorn main:app --reload

Visit http://localhost:8000 in your browser. You should see the dashboard page with a connection status indicator and a form to add new inventory items.

Final App

Try adding, updating, or deleting items using the web interface or by making API requests. All changes will be reflected in real-time across all connected browser tabs.

Final App

Conclusion

In this tutorial, you built a real-time inventory tracking dashboard using FastAPI, Postgres, and WebSockets. You learned how to:

  • Create a RESTful API with FastAPI for inventory management
  • Use Postgres triggers and LISTEN/NOTIFY for real-time updates
  • Stream change events to the frontend via WebSockets
  • Build a responsive dashboard UI with live event visualization

The key components that make this system work in real-time are:

  • Postgres Triggers: Automatically emit NOTIFY events on data changes
  • asyncpg Integration: Listen to Postgres notifications asynchronously
  • WebSocket Broadcasting: Push updates to all connected clients instantly
  • Event-driven Frontend: React to real-time events and update the UI

Looking for more?

  1. Add authentication and user management to track who makes changes
  2. Add data visualization charts to show inventory trends over time

Check out the Test-Driven Development with FastAPI and Docker course to learn more about testing and setting up CI/CD for a FastAPI app.

This tutorial provides a complete foundation for building real-time applications with FastAPI and Postgres. The combination of database triggers, WebSockets, and a modern frontend creates a responsive and scalable inventory tracking system.

Cheers!

Featured Course

Test-Driven Development with FastAPI and Docker

In this course, you'll learn how to build, test, and deploy a text summarization service with Python, FastAPI, and Docker. The service itself will be exposed via a RESTful API and deployed to Heroku with Docker.

Featured Course

Test-Driven Development with FastAPI and Docker

In this course, you'll learn how to build, test, and deploy a text summarization service with Python, FastAPI, and Docker. The service itself will be exposed via a RESTful API and deployed to Heroku with Docker.