This guide covers everything you need to know for developing with the FastAPI Template, including setup, testing, database management, and contribution guidelines.
Before you begin, ensure you have the following installed:
- Python 3.13+: The project requires Python 3.13 or higher
- PostgreSQL: Database server (version 12+)
- Git: Version control system
- uv (recommended): Fast Python package manager, or pip as alternative
- Docker: For containerized development
- PostgreSQL client: For database management (psql, pgAdmin, etc.)
- VS Code: Recommended IDE with Python extensions
git clone <repository-url>
cd FastApi-Template# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create and activate virtual environment with dependencies
uv sync
# Activate the environment
source .venv/bin/activate # On Windows: .venv\Scripts\activate# Create virtual environment
python -m venv .venv
# Activate virtual environment
source .venv/bin/activate # On Windows: .venv\Scripts\activate
# Install dependencies
pip install -r requirements.txtCreate a .env file in the project root:
cp .env.example .envEdit .env with your configuration:
# Database Configuration
DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/fastapi_template
# Security Settings
SECRET_KEY=your-super-secret-key-here
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# Server Configuration
BACKEND_HOST=0.0.0.0
BACKEND_PORT=8799
CURRENT_ENVIRONMENT=local
# CORS Settings (for development)
CORS_ORIGINS=["http://localhost:3000", "http://localhost:8080"]
# Logging
LOG_LEVEL=DEBUG# Using PostgreSQL command line
createdb fastapi_template
# Or using SQL
psql -c "CREATE DATABASE fastapi_template;"# macOS
brew install redis
brew services start redis
# Ubuntu/Debian
sudo apt-get install redis-server
sudo systemctl start redis
# Windows
# Download from https://github.com/microsoftarchive/redis/releases
# Or use Docker:
docker run -d -p 6379:6379 redis:alpine# Initialize Alembic (only if starting fresh)
alembic init alembic
# Run existing migrations
alembic upgrade head# Run the application
python main.py
# Check if it's working
curl http://localhost:8799/docs# Using the main script (with auto-reload)
python main.py
# Or directly with uvicorn
uvicorn app.main:app --reload --host 0.0.0.0 --port 8799# Using gunicorn (production)
gunicorn app.main:app -w 4 -k uvicorn.workers.UnicornWorker --bind 0.0.0.0:8799The project includes several tools for maintaining code quality:
# Format all code
black .
# Check formatting without changes
black --check .
# Format specific files
black app/main.py# Install pre-commit hooks
pre-commit install
# Run hooks manually
pre-commit run --all-files
# Update hooks
pre-commit autoupdate# Auto-generate migration from model changes
alembic revision --autogenerate -m "Description of changes"
# Create empty migration
alembic revision -m "Description of changes"# Upgrade to latest
alembic upgrade head
# Upgrade to specific revision
alembic upgrade revision_id
# Downgrade one revision
alembic downgrade -1
# Downgrade to specific revision
alembic downgrade revision_id# Show current revision
alembic current
# Show migration history
alembic history
# Show pending migrations
alembic heads# Drop all tables and recreate
alembic downgrade base
alembic upgrade head# Run custom seeder (if implemented)
python -m app.core.seederstests/
├── __init__.py
├── conftest.py # Test configuration and fixtures
├── test_auth.py # Authentication tests
├── test_users.py # User endpoint tests
├── test_models.py # Model tests
├── test_repositories.py # Repository tests
└── integration/ # Integration tests
├── test_api.py # Full API tests
└── test_database.py # Database integration tests
# Install test dependencies
uv add pytest pytest-asyncio pytest-cov httpx
# Create test database
createdb fastapi_template_test
# Set test environment variable
export DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/fastapi_template_test# Run all tests
pytest
# Run with coverage
pytest --cov=app --cov-report=html
# Run specific test file
pytest tests/test_auth.py
# Run specific test function
pytest tests/test_auth.py::test_login
# Run with verbose output
pytest -v
# Run in parallel (with pytest-xdist)
pytest -n autoimport pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_create_user():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/api/v1/auth/signup",
data={
"username": "testuser",
"email": "test@example.com",
"password": "testpass123",
"first_name": "Test",
"last_name": "User"
}
)
assert response.status_code == 201
assert "access_token" in response.json()# conftest.py
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from app.core.config import settings
from app.core.db import get_session, Base
@pytest.fixture
async def async_session():
engine = create_async_engine(settings.database_url_test)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
yield session
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)The application uses a production-grade logging system built with Loguru, providing structured, traceable logs across all environments.
from loguru import logger
# Different log levels
logger.debug("Debug information")
logger.info("General information")
logger.warning("Warning message")
logger.error("Error occurred")
logger.critical("Critical error")
# Structured logging with context
logger.info("User created", user_id=user.id, username=user.username)
# Exception logging with full traceback
try:
risky_operation()
except Exception as e:
logger.exception("Operation failed") # Automatically includes tracebackConsole Output (Development):
- Colored output for quick visual scanning
- Shows timestamp, log level, process ID, request ID, and message
- Simplified format for rapid debugging
File Output (Always Active):
- Located in
logs/app.log - Includes full context: module, function, line number
- UTC timestamps for consistency across time zones
- Automatically rotates at 10MB, compresses with gzip
- Retains logs for 3 months
Every HTTP request automatically gets a unique request ID that appears in all related logs:
# All logs within a request context automatically include the request ID
logger.info("Processing user registration")
# Output: ... | ReqID:abc123 | Processing user registration
logger.info("Database query executed")
# Output: ... | ReqID:abc123 | Database query executedTo find all logs for a specific request:
# Search logs for specific request ID
grep "ReqID:abc123" logs/app.log
# Or use log viewer tools to filter by request_id field- Local/Development: DEBUG and above
- Staging/Production: INFO and above
Configure via .env:
LOG_LEVEL=DEBUG # or INFO, WARNING, ERROR, CRITICALFor production deployments, configure remote log shipping:
# Add to .env for centralized logging
OPENOBSERVE_URL=https://observe.example.com
OPENOBSERVE_TOKEN=your_base64_token
OPENOBSERVE_ORG=your_organization
OPENOBSERVE_STREAM=fastapi_logs
OPENOBSERVE_BATCH_SIZE=10
OPENOBSERVE_FLUSH_INTERVAL=5.0Benefits:
- Non-blocking: doesn't slow down your application
- Batched: reduces network overhead
- Resilient: continues local logging if remote service unavailable
- Searchable: query logs across all application instances
When running with multiple workers (--workers 4):
- Each log entry includes process ID (PID) to identify which worker generated it
- Thread-safe queue-based writing prevents log corruption
- All workers write to same unified log file safely
# Run with multiple workers
uvicorn app.main:app --workers 4 --host 0.0.0.0 --port 8799
# Logs will show:
# ... | PID:12345 | ReqID:abc123 | Message from worker 1
# ... | PID:12346 | ReqID:def456 | Message from worker 2# Tail logs in real-time
tail -f logs/app.log
# Search for errors
grep "ERROR" logs/app.log
# View logs with timestamps in specific range
grep "2025-01-19" logs/app.log
# Decompress old logs
gunzip logs/app.2025-01-01.log.gz
cat logs/app.2025-01-01.logCreate .vscode/launch.json:
{
"version": "0.2.0",
"configurations": [
{
"name": "FastAPI",
"type": "python",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"env": {
"CURRENT_ENVIRONMENT": "local"
}
}
]
}# Enable SQL logging
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
# Or in settings
echo=True # In database engine creation# Use select with joinedload for relationships
from sqlalchemy.orm import selectinload
users = await session.execute(
select(User).options(selectinload(User.posts))
)
# Use pagination for large datasets
async def get_users_paginated(
session: AsyncSession,
skip: int = 0,
limit: int = 100
):
result = await session.execute(
select(User).offset(skip).limit(limit)
)
return result.scalars().all()# Using the CacheManager service
from app.services.cache import cache_manager
async def get_user_profile(user_id: str):
# Try to get from cache first
cache_key = f"user:profile:{user_id}"
cached_data = await cache_manager.get(cache_key)
if cached_data:
return cached_data
# Fetch from database if not cached
user_data = await fetch_user_from_db(user_id)
# Store in cache with TTL
await cache_manager.set(cache_key, user_data, expire=300)
return user_dataThe template includes a production-ready rate limiting system using Redis and sliding window algorithm.
Apply pre-configured rate limiters to endpoints using FastAPI dependencies:
from fastapi import APIRouter, Depends
from app.api.v1.deps.rate_limit import (
rate_limit_auth, # Strict: 10 req/min (for login, signup)
rate_limit_api, # Default: 100 req/min (for general API)
rate_limit_public, # Lenient: 1000 req/min (for public data)
rate_limit_user, # User-based: 300 req/min (for authenticated)
)
router = APIRouter()
# IP-based rate limiting for authentication
@router.post("/login", dependencies=[Depends(rate_limit_auth)])
async def login(credentials: LoginForm):
# Limited to 10 requests per minute per IP
pass
# IP-based rate limiting for public API
@router.get("/posts", dependencies=[Depends(rate_limit_api)])
async def list_posts():
# Limited to 100 requests per minute per IP
pass
# User-based rate limiting for authenticated endpoints
@router.get("/profile", dependencies=[Depends(rate_limit_user)])
async def get_profile(current_user: User = Depends(get_current_user)):
# Limited to 300 requests per minute per user
# Multiple users on same IP each get their own quota
passCreate custom rate limiters for specific endpoints:
from app.api.v1.deps.rate_limit import create_rate_limit
# Custom limit for heavy operations (5 requests per 5 minutes)
heavy_limit = create_rate_limit(limit=5, window=300, prefix="heavy")
@router.post("/export", dependencies=[Depends(heavy_limit)])
async def export_large_file():
pass
# Custom user-based limit (20 uploads per minute per user)
upload_limit = create_rate_limit(
limit=20,
window=60,
prefix="upload",
use_user_id=True
)
@router.post("/upload", dependencies=[Depends(upload_limit)])
async def upload_file(current_user: User = Depends(get_current_user)):
passAll rate-limited endpoints automatically include headers in responses:
X-RateLimit-Limit: 100 # Maximum requests allowed
X-RateLimit-Remaining: 73 # Requests remaining in window
X-RateLimit-Reset: 1701234567 # Unix timestamp when limit resetsWhen limit is exceeded, clients receive HTTP 429:
HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1701234567
{
"detail": "Rate limit exceeded. Please slow down your requests."
}Configure rate limits in .env:
# Rate limiting settings
RATE_LIMIT_ENABLED=true # Enable/disable rate limiting (true/false)
RATE_LIMIT_DEFAULT=100 # General API endpoints (requests per window)
RATE_LIMIT_WINDOW=60 # Time window in seconds
RATE_LIMIT_STRICT=10 # Authentication endpoints
RATE_LIMIT_LENIENT=1000 # Public endpoints
RATE_LIMIT_USER=300 # Authenticated user endpointsIP-Based (for unauthenticated endpoints):
- Login, signup, password reset: 10 req/min per IP
- Public API endpoints: 100 req/min per IP
- Health checks, documentation: 1000 req/min per IP
User-Based (for authenticated endpoints):
- User profile, settings: 300 req/min per user
- Solves shared IP problem (office/cafe networks)
- Each user gets independent quota regardless of IP
Rate limit keys use prefixes to separate quotas by endpoint group:
from app.core.constants import RateLimitPrefix
# Pre-defined prefixes
RateLimitPrefix.AUTH # "ratelimit:auth:" - Authentication
RateLimitPrefix.USER # "ratelimit:user:" - User endpoints
RateLimitPrefix.API # "ratelimit:api:" - General API
RateLimitPrefix.PUBLIC # "ratelimit:public:" - Public endpoints
RateLimitPrefix.EXPORT # "ratelimit:export:" - File exports
RateLimitPrefix.UPLOAD # "ratelimit:upload:" - File uploads
RateLimitPrefix.SEARCH # "ratelimit:search:" - Search queries
RateLimitPrefix.ADMIN # "ratelimit:admin:" - Admin operationsAdd custom prefixes to app/core/constants.py to avoid collisions.
Rate limiting can be controlled via the RATE_LIMIT_ENABLED setting:
To disable rate limiting (default for local development):
RATE_LIMIT_ENABLED=false- No Redis connection required
- All requests allowed (no limits enforced)
- Useful for development without Redis setup
To test rate limiting locally:
RATE_LIMIT_ENABLED=true
CURRENT_ENVIRONMENT=local- Requires Redis to be running
- Full rate limiting functionality available
- Test rate limits before deploying to production
The rate limiter uses microsecond precision for timestamp tracking:
- Accurately counts multiple requests within the same second
- Prevents duplicate member issues in Redis sorted sets
- Supports high-frequency API usage (e.g., real-time applications)
- Each request gets a unique identifier:
{microseconds}:{hash}
# Use async/await consistently
async def async_operation():
result = await some_async_function()
return result
# Use async context managers
async with AsyncSession(engine) as session:
# Database operations
pass- Create schema in
app/schemas/ - Create model in
app/models/(if needed) - Create repository in
app/repos/ - Create endpoint in
app/api/v1/endpoints/ - Add to router in
app/api/v1/router.py - Write tests
Schema (app/schemas/post.py):
from pydantic import BaseModel
from datetime import datetime
from uuid import UUID
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
pass
class PostResponse(PostBase):
id: UUID
author_id: UUID
created_at: datetime
updated_at: datetimeModel (app/models/post.py):
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.models.base import Base
class Post(Base):
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"))
# Relationship
author: Mapped["User"] = relationship("User", back_populates="posts")Repository (app/repos/post.py):
from app.repos.base import BaseRepository
from app.models.post import Post
class PostRepository(BaseRepository[Post]):
async def get_by_author(
self,
session: AsyncSession,
author_id: UUID
) -> List[Post]:
result = await session.execute(
select(Post).where(Post.author_id == author_id)
)
return result.scalars().all()Endpoint (app/api/v1/endpoints/post.py):
from fastapi import APIRouter, Depends
from app.schemas.post import PostCreate, PostResponse
from app.repos.post import PostRepository
router = APIRouter()
@router.post("/", response_model=PostResponse)
async def create_post(
post_data: PostCreate,
current_user: User = Depends(get_current_user),
post_repo: PostRepository = Depends(get_post_repository)
):
return await post_repo.create(session, post_data, author_id=current_user.id)alembic revision --autogenerate -m "Add posts table"
alembic upgrade head# Always hash passwords
from app.core.auth import get_password_hash, verify_password
hashed = get_password_hash("plain_password")
is_valid = verify_password("plain_password", hashed)# Use Pydantic for validation
from pydantic import validator, EmailStr
class UserCreate(BaseModel):
username: str
email: EmailStr
@validator('username')
def username_must_be_alphanumeric(cls, v):
assert v.isalnum(), 'Username must be alphanumeric'
return v# Use SQLAlchemy ORM (automatic protection)
# Avoid raw SQL queries
# If needed, use parameterized queries
# Good
result = await session.execute(
select(User).where(User.username == username)
)
# Avoid
# result = await session.execute(f"SELECT * FROM users WHERE username = '{username}'")- Follow PEP 8 guidelines
- Use Black for code formatting
- Add type hints to all functions
- Write descriptive docstrings
- Keep functions small and focused
Use conventional commit format:
feat: add user authentication
fix: resolve database connection issue
docs: update API documentation
test: add user repository tests
refactor: improve error handling
- Fork the repository
- Create a feature branch
- Write tests for new functionality
- Ensure all tests pass
- Format code with Black
- Write clear commit messages
- Submit pull request with description
- Code follows project style guidelines
- Tests are included and passing
- Documentation is updated
- No breaking changes (or properly documented)
- Performance implications considered
- Security implications reviewed
The logging system is your first line of defense when troubleshooting issues:
# Check recent errors
grep "ERROR" logs/app.log | tail -20
# Find all logs for a specific request (if you have the request ID)
grep "ReqID:abc123" logs/app.log
# Monitor logs in real-time
tail -f logs/app.log
# Check logs from specific worker process
grep "PID:12345" logs/app.log
# View logs with full exception details
grep -A 10 "ERROR" logs/app.log # Shows 10 lines after each error# Check PostgreSQL is running
pg_ctl status
# Check database exists
psql -l | grep fastapi_template
# Verify connection string
echo $DATABASE_URL# Check current migration state
alembic current
# Reset migrations (development only)
alembic downgrade base
alembic upgrade head
# Fix migration conflicts
alembic merge -m "merge migrations" head1 head2# Check Python path
echo $PYTHONPATH
# Verify virtual environment
which python
pip list
# Reinstall dependencies
uv sync --reinstall- Enable SQL query logging to identify slow queries
- Use database indexes for frequently queried fields
- Implement connection pooling
- Add caching for expensive operations
- Profile code with tools like
py-spy
- Monitor memory usage with
memory_profiler - Use generators for large datasets
- Implement pagination for API endpoints
- Close database connections properly
The template includes integration with BackBlaze B2 cloud storage service for file management.
-
Create BackBlaze Account
- Sign up at backblaze.com
- Create an application key in account settings
-
Configure Credentials
Add to your .env file (optional - can be provided at runtime):
# BackBlaze B2 Configuration (Optional)
B2_APPLICATION_KEY_ID=your_key_id_here
B2_APPLICATION_KEY=your_application_key_here
B2_BUCKET_NAME=your_bucket_namefrom app.services.back_blaze_b2 import BackBlaze, B2BucketTypeEnum
from app.schemas import ApplicationData, UploadedFileInfo
# Initialize BackBlaze client
app_data = ApplicationData(
app_id="your_application_key_id",
app_key="your_application_key"
)
b2_client = BackBlaze(app_data)
# List available buckets
buckets = b2_client.list_buckets()
# Create a new bucket
b2_client.create_bucket("my-new-bucket", B2BucketTypeEnum.ALL_PRIVATE)
# Select a bucket for operations
b2_client.select_bucket("my-bucket-name")
# Upload a file
file_version = b2_client.upload_file(
local_file_path="/path/to/local/file.pdf",
b2_file_name="documents/file.pdf",
file_info=UploadedFileInfo(scanned=True)
)
# Get download URL
download_link = b2_client.get_download_url_by_name("documents/file.pdf")
print(download_link.download_url)
# Get temporary download link (with auth token)
from pydantic import AnyUrl
temp_link = b2_client.get_temporary_download_link(
url=AnyUrl(download_link.download_url),
valid_duration_in_seconds=3600 # 1 hour
)
# Delete a file
b2_client.delete_file(
file_id=file_version.id_,
file_name="documents/file.pdf"
)
# Update bucket settings
b2_client.update_selected_bucket(
bucket_type=B2BucketTypeEnum.ALL_PUBLIC
)
# Delete bucket
b2_client.delete_selected_bucket()- Bucket Management: Create, delete, update, list, and select buckets
- File Operations: Upload, download, and delete files
- URL Generation:
- Public download URLs for public buckets
- File ID-based URLs
- Temporary authenticated URLs for private files
- Method Chaining: Fluent interface for bucket selection
- Metadata: Custom file information with
UploadedFileInfo - Error Handling: Comprehensive exception handling with detailed logging
ALL_PUBLIC: Files are publicly accessibleALL_PRIVATE: Files require authenticationSNAPSHOT: Snapshot storageSHARE: Shared accessRESTRICTED: Restricted access with authorization rules
from fastapi import APIRouter, Depends, UploadFile, File
from app.services.back_blaze_b2 import BackBlaze
from app.schemas import ApplicationData
router = APIRouter()
def get_b2_client() -> BackBlaze:
"""Dependency to get BackBlaze client"""
app_data = ApplicationData(
app_id=settings.b2_app_id,
app_key=settings.b2_app_key
)
return BackBlaze(app_data).select_bucket(settings.b2_bucket_name)
@router.post("/upload-document")
async def upload_document(
file: UploadFile = File(...),
b2_client: BackBlaze = Depends(get_b2_client)
):
# Save uploaded file temporarily
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# Upload to BackBlaze
result = b2_client.upload_file(
local_file_path=temp_path,
b2_file_name=f"uploads/{file.filename}"
)
# Get download URL
download_link = b2_client.get_download_url_by_file_id(result.id_)
return {
"file_id": result.id_,
"file_name": result.file_name,
"download_url": download_link.download_url
}The template includes Firebase integration for user authentication, user management, and push notifications.
-
Create Firebase Project
- Go to Firebase Console
- Create a new project or use existing one
- Navigate to Project Settings → Service Accounts
- Generate new private key (downloads JSON file)
-
Configure Credentials
Add to your .env file or provide at runtime from .env.example
from app.services.firebase import Firebase
from app.schemas.firebase import FirebaseServiceAccount
# Initialize Firebase (uses singleton pattern)
firebase_service = Firebase()
# Get user by ID
user = firebase_service.get_user_by_id("user_uid_123")
print(f"User: {user.email}, Display Name: {user.display_name}")
# Get user by email
user = firebase_service.get_user_by_email("user@example.com")
# Get user by phone number
user = firebase_service.get_user_by_phone_number("+1234567890")
# List all users with pagination
users_page = firebase_service.get_all_users(max_results=1000)
for user in users_page.iterate_all():
print(f"UID: {user.uid}, Email: {user.email}")
# Create custom token for user
custom_token = firebase_service.create_custom_id_token(
uid="user_uid_123",
additional_claims={"role": "admin", "premium": True}
)
# Verify ID token from client
try:
decoded_token = firebase_service.verify_id_token(id_token="client_token_here")
uid = decoded_token['uid']
print(f"Token verified for user: {uid}")
except ConnectionAbortedError as e:
print(f"Token invalid or expired: {e}")from app.services.firebase import Firebase
firebase_service = Firebase()
# Validate FCM token
device_token = "fcm_device_token_here"
is_valid = firebase_service.validate_fcm_token(device_token)
# Send notification to single device
success = firebase_service.notify_a_device(
device_token=device_token,
title="Welcome!",
content="Thank you for signing up"
)
# Send notification to multiple devices (automatically batches in chunks of 500)
device_tokens = ["token1", "token2", "token3", ...] # Can be thousands
success_count = firebase_service.notify_multiple_devices(
device_tokens=device_tokens,
title="New Update Available",
content="Version 2.0 is now available"
)
print(f"Successfully sent to {success_count} devices")from fastapi import APIRouter, Depends, HTTPException
from app.services.firebase import Firebase
router = APIRouter()
def get_firebase_service() -> Firebase:
"""Dependency to get Firebase service"""
return Firebase()
@router.post("/auth/verify-token")
async def verify_user_token(
token: str,
firebase: Firebase = Depends(get_firebase_service)
):
try:
decoded_token = firebase.verify_id_token(token)
return {
"uid": decoded_token['uid'],
"email": decoded_token.get('email'),
"verified": True
}
except ConnectionAbortedError:
raise HTTPException(status_code=401, detail="Invalid or expired token")
@router.post("/notifications/send")
async def send_push_notification(
user_id: str,
title: str,
message: str,
firebase: Firebase = Depends(get_firebase_service)
):
# Get user's device tokens from your database
device_tokens = await get_user_device_tokens(user_id)
success_count = firebase.notify_multiple_devices(
device_tokens=device_tokens,
title=title,
content=message
)
return {
"sent": success_count,
"total": len(device_tokens)
}Firestore integration for document-based data storage alongside your PostgreSQL database.
Uses the same Firebase service account credentials as Firebase Authentication.
from app.services.firestore import Firestore
from app.schemas.firebase import FirebaseServiceAccount
# Initialize Firestore (uses singleton pattern)
service_account = FirebaseServiceAccount(
type="service_account",
project_id="your-project-id",
# ... other credentials
)
firestore_service = Firestore(service_account)
# Add a document
firestore_service.add_document(
collection_name="users",
document_id="user123",
data={
"name": "John Doe",
"email": "john@example.com",
"preferences": {
"theme": "dark",
"notifications": True
}
}
)
# Get a document
user_data = firestore_service.get_document(
collection_name="users",
document_id="user123"
)
if user_data:
print(f"User: {user_data['name']}")
# Update a document
firestore_service.update_document(
collection_name="users",
document_id="user123",
data={
"preferences.theme": "light", # Nested field update
"last_login": "2025-01-19T10:30:00Z"
}
)
# Fetch all documents from collection
all_users = firestore_service.fetch_all_documents("users")
for user in all_users:
print(f"User: {user['name']}")
# Remove a document
firestore_service.remove_document(
collection_name="users",
document_id="user123"
)from app.core.exceptions.firebase_exceptions import FirebaseDocumentNotFoundError
try:
firestore_service.update_document(
collection_name="users",
document_id="nonexistent",
data={"status": "active"}
)
except FirebaseDocumentNotFoundError as e:
print(f"Document not found: {e}")from fastapi import APIRouter, Depends, HTTPException
from app.services.firestore import Firestore
from app.core.exceptions.firebase_exceptions import FirebaseDocumentNotFoundError
router = APIRouter()
def get_firestore_service() -> Firestore:
"""Dependency to get Firestore service"""
from app.core.config import settings
return Firestore(settings.firebase_credentials)
@router.post("/user-preferences")
async def save_user_preferences(
user_id: str,
preferences: dict,
firestore: Firestore = Depends(get_firestore_service)
):
firestore.add_document(
collection_name="user_preferences",
document_id=user_id,
data=preferences
)
return {"status": "saved"}
@router.get("/user-preferences/{user_id}")
async def get_user_preferences(
user_id: str,
firestore: Firestore = Depends(get_firestore_service)
):
prefs = firestore.get_document(
collection_name="user_preferences",
document_id=user_id
)
if not prefs:
raise HTTPException(status_code=404, detail="Preferences not found")
return prefs
@router.put("/user-preferences/{user_id}")
async def update_user_preferences(
user_id: str,
preferences: dict,
firestore: Firestore = Depends(get_firestore_service)
):
try:
firestore.update_document(
collection_name="user_preferences",
document_id=user_id,
data=preferences
)
return {"status": "updated"}
except FirebaseDocumentNotFoundError:
raise HTTPException(status_code=404, detail="User preferences not found")When to use Firestore alongside PostgreSQL:
- User Preferences: Store user settings, UI state, personalization
- Real-time Data: Chat messages, notifications, activity feeds
- Session Data: Temporary data that doesn't need relational integrity
- Device Tokens: FCM tokens for push notifications
- Analytics Events: User behavior tracking, event logging
- Cache Layer: Frequently accessed data to reduce database load
When to use PostgreSQL:
- Transactional Data: Orders, payments, critical business data
- Relational Data: Data with complex relationships and foreign keys
- Data Integrity: When ACID compliance is required
- Complex Queries: JOINs, aggregations, full-text search
The template includes Google Cloud Storage integration for file management as an alternative to BackBlaze B2.
-
Create GCS Service Account
- Go to Google Cloud Console
- Navigate to IAM & Admin → Service Accounts
- Create a new service account with Storage Admin role
- Generate and download JSON key
-
Configure Credentials
Add to your .env file:
GCS_PROJECT_ID=your-project-id
GCS_BUCKET_NAME=your-bucket-name
GCS_CREDENTIALS_JSON={"type": "service_account", "project_id": "...", ...}from app.services.gcs import GoogleCloudStorage
# Initialize GCS client
gcs_client = GoogleCloudStorage(
project_id="your-project-id",
credentials_json=settings.gcs_credentials_json
)
# Select a bucket
gcs_client.select_bucket("my-bucket-name")
# Upload a file
blob = gcs_client.upload_file(
local_file_path="/path/to/local/file.pdf",
destination_blob_name="documents/file.pdf",
content_type="application/pdf"
)
# Download a file
gcs_client.download_file(
source_blob_name="documents/file.pdf",
destination_file_path="/path/to/download/file.pdf"
)
# Get public URL (for public buckets)
public_url = gcs_client.get_public_url("documents/file.pdf")
# Generate signed URL (for private buckets)
signed_url = gcs_client.generate_signed_url(
blob_name="documents/file.pdf",
expiration_minutes=60
)
# Delete a file
gcs_client.delete_file("documents/file.pdf")
# List files in bucket
files = gcs_client.list_files(prefix="documents/")
for file in files:
print(f"File: {file.name}, Size: {file.size}")from fastapi import APIRouter, Depends, UploadFile, File
from app.services.gcs import GoogleCloudStorage
from app.core.config import settings
router = APIRouter()
def get_gcs_client() -> GoogleCloudStorage:
"""Dependency to get GCS client"""
return GoogleCloudStorage(
project_id=settings.gcs_project_id,
credentials_json=settings.gcs_credentials_json
).select_bucket(settings.gcs_bucket_name)
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
gcs: GoogleCloudStorage = Depends(get_gcs_client)
):
# Save uploaded file temporarily
temp_path = f"/tmp/{file.filename}"
with open(temp_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
# Upload to GCS
blob = gcs.upload_file(
local_file_path=temp_path,
destination_blob_name=f"uploads/{file.filename}"
)
return {
"file_name": blob.name,
"size": blob.size,
"url": gcs.get_public_url(blob.name)
}The template includes Apple Pay integration for verifying in-app purchases and subscriptions via the App Store Server API.
-
App Store Connect Configuration
- Log in to App Store Connect
- Navigate to Users and Access → Keys → App Store Connect API
- Generate a new API key and download the
.p8private key file
-
Download Apple Root Certificate
- Download
AppleRootCA-G3.cerfrom Apple PKI - Store it securely in your project or server
- Download
-
Configure Credentials
Add to your .env file:
APPLE_PAY_STORE_PRIVATE_KEY_ID=YOUR_KEY_ID
APPLE_PAY_STORE_PRIVATE_KEY=-----KEY-----
APPLE_PAY_STORE_ISSUER_ID=YOUR_ISSUER_ID
APPLE_PAY_STORE_BUNDLE_ID=com.yourcompany.yourapp
APPLE_PAY_STORE_ROOT_CERTIFICATE_PATH=/path/to/AppleRootCA-G3.cerfrom app.services.payments.apple_pay import ApplePay
from app.schemas.apple_pay import ApplePayStoreCredentials
# Initialize Apple Pay client
credentials = ApplePayStoreCredentials(
private_key_id=settings.apple_pay_store_private_key_id,
private_key=settings.apple_pay_store_private_key,
issuer_id=settings.apple_pay_store_issuer_id,
bundle_id=settings.apple_pay_store_bundle_id,
root_certificate_path=settings.apple_pay_store_root_certificate_path
)
apple_pay = ApplePay(credentials)
# Verify a transaction
transaction_id = "1000000123456789"
try:
transaction_info = apple_pay.get_transaction_info(transaction_id)
print(f"Product ID: {transaction_info.product_id}")
print(f"Purchase Date: {transaction_info.purchase_date}")
print(f"Status: {transaction_info.status}")
except ApplePayVerificationError as e:
print(f"Verification failed: {e}")
# Get subscription status
original_transaction_id = "1000000123456789"
subscription_status = apple_pay.get_subscription_status(original_transaction_id)
for subscription in subscription_status.data:
print(f"Subscription Group: {subscription.subscription_group_identifier}")
print(f"Status: {subscription.status}")
# Get transaction history
history = apple_pay.get_transaction_history(original_transaction_id)
for transaction in history.signed_transactions:
print(f"Transaction: {transaction.transaction_id}")from fastapi import APIRouter, Depends, HTTPException
from app.services.payments.apple_pay import ApplePay
from app.core.config import settings
from app.core.exceptions.apple_pay import ApplePayVerificationError
router = APIRouter()
def get_apple_pay_client() -> ApplePay:
"""Dependency to get Apple Pay client"""
return ApplePay(settings.apple_pay_credentials)
@router.post("/verify-purchase")
async def verify_purchase(
transaction_id: str,
apple_pay: ApplePay = Depends(get_apple_pay_client)
):
try:
transaction = apple_pay.get_transaction_info(transaction_id)
return {
"valid": True,
"product_id": transaction.product_id,
"purchase_date": transaction.purchase_date,
"expires_date": transaction.expires_date
}
except ApplePayVerificationError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/subscription-status/{original_transaction_id}")
async def check_subscription(
original_transaction_id: str,
apple_pay: ApplePay = Depends(get_apple_pay_client)
):
try:
status = apple_pay.get_subscription_status(original_transaction_id)
return {
"active": any(s.status == "ACTIVE" for s in status.data),
"subscriptions": [
{
"group_id": s.subscription_group_identifier,
"status": s.status
}
for s in status.data
]
}
except ApplePayVerificationError as e:
raise HTTPException(status_code=400, detail=str(e))- In-App Purchase Verification: Verify one-time purchases from iOS apps
- Subscription Management: Check subscription status and renewal info
- Transaction History: Retrieve complete purchase history for a user
- Refund Detection: Identify refunded transactions
- Server-to-Server Notifications: Handle App Store Server Notifications (webhooks)
The template includes several security middleware components that are automatically applied to all requests.
CSRF (Cross-Site Request Forgery) protection is implemented via middleware for state-changing requests.
How it works:
- Validates CSRF tokens for POST, PUT, DELETE, PATCH requests
- Tokens are validated against the session or a secure cookie
- Safe methods (GET, HEAD, OPTIONS) are exempt
Frontend Integration:
// Include CSRF token in requests
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;
fetch('/api/v1/users/me', {
method: 'PUT',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify(userData)
});Automatically adds security headers to all responses:
X-Frame-Options: DENY- Prevents clickjackingX-Content-Type-Options: nosniff- Prevents MIME type sniffingX-XSS-Protection: 1; mode=block- XSS filter (legacy browsers)Strict-Transport-Security- HSTS for HTTPS enforcement (production only)
Adds rate limiting information to responses (see Rate Limiting section).
The template implements secure token revocation using Redis-based blacklisting.
from app.services.cache.token_blacklist import token_blacklist
# When user logs out, the token is added to blacklist
async def logout(token: str, user_id: int):
# Add token to blacklist with TTL matching token expiration
await token_blacklist.add_token(token, user_id)
# During authentication, check if token is blacklisted
async def validate_token(token: str) -> bool:
if await token_blacklist.is_blacklisted(token):
raise HTTPException(status_code=401, detail="Token has been revoked")
return True- Automatic Expiration: Blacklisted tokens are automatically removed after their natural expiration time
- User-Based Revocation: Revoke all tokens for a specific user (e.g., password change)
- Memory Efficient: Uses Redis sorted sets with automatic cleanup
- Graceful Degradation: If Redis is unavailable, tokens are still validated by expiration
The template includes caching decorators for easy function-level caching.
from app.services.cache.decorators import cached, cache_invalidate
# Cache function result for 5 minutes
@cached(ttl=300, key_prefix="user")
async def get_user_profile(user_id: int) -> dict:
# Expensive database query
return await fetch_user_from_db(user_id)
# Invalidate cache when data changes
@cache_invalidate(key_pattern="user:*")
async def update_user_profile(user_id: int, data: dict):
await save_user_to_db(user_id, data)from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class CustomMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Pre-processing
start_time = time.time()
response = await call_next(request)
# Post-processing
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return responsefrom fastapi import BackgroundTasks
def send_email(email: str, message: str):
# Email sending logic
pass
@router.post("/send-notification/")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, "Welcome!")
return {"message": "Email sent in background"}from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message: {data}")This development guide provides a comprehensive foundation for working with the FastAPI template. For specific questions or advanced use cases, refer to the FastAPI documentation or create an issue in the project repository.