The Step Detection system is designed with a modular, production-ready architecture that separates concerns and enables scalability, maintainability, and testability.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Client Apps │ │ Web Frontend │ │ Mobile Apps │
│ │ │ │ │ │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
│ ┌───────▼───────┐ │
│ │ Load Balancer │ │
│ │ (Nginx) │ │
│ └───────┬───────┘ │
│ │ │
└──────────────────────┼──────────────────────┘
│
┌────────────▼────────────┐
│ FastAPI Server │
│ (REST + WebSocket) │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Step Detection │
│ Package │
│ │
│ ┌─────────────────┐ │
│ │ Core Models │ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ │ │ TensorFlow │ │ │
│ │ │ CNN Model │ │ │
│ │ └─────────────┘ │ │
│ └─────────────────┘ │
└─────────────────────────┘
src/step_detection/
├── __init__.py # Package exports and initialization
├── core/ # Core business logic
│ ├── __init__.py
│ └── detector.py # StepDetector, SimpleStepCounter
├── models/ # ML model utilities
│ ├── __init__.py
│ └── model_utils.py # Model creation, training, evaluation
├── utils/ # Utility functions
│ ├── __init__.py
│ └── data_processor.py # Data loading, preprocessing
└── api/ # API layer
├── __init__.py
└── api.py # FastAPI application
Each module has a single, well-defined responsibility:
- Core: Business logic for step detection
- Models: Machine learning operations
- Utils: Data processing and utilities
- API: External interface layer
High-level modules don't depend on low-level modules. Both depend on abstractions.
# Abstract interface
class StepDetectorInterface:
def process_reading(self, *args) -> Dict: ...
def get_step_count(self) -> int: ...
def reset(self) -> None: ...
# Concrete implementation
class StepDetector(StepDetectorInterface):
def __init__(self, model_path: str):
self.model = tf.keras.models.load_model(model_path)
def process_reading(self, accel_x, accel_y, accel_z, gyro_x, gyro_y, gyro_z):
# Implementation detailsEach class has one reason to change:
StepDetector: Real-time step detection with detailed resultsSimpleStepCounter: Simple step countingDataProcessor: Data loading and preprocessingModelUtils: Model creation and training
Software entities are open for extension but closed for modification:
# Base detector that can be extended
class BaseStepDetector:
def __init__(self, model_path: str):
self.model = self.load_model(model_path)
def load_model(self, path: str):
"""Override in subclasses for different model types"""
return tf.keras.models.load_model(path)
def preprocess(self, data):
"""Override for custom preprocessing"""
return data
# Extended for different use cases
class EnhancedStepDetector(BaseStepDetector):
def preprocess(self, data):
# Custom preprocessing
return self.apply_filters(data)Raw Sensor Data → Data Processor → Preprocessed Data → Model Training → Trained Model
↓ ↓ ↓ ↓ ↓
CSV Files Feature Extraction Train/Val Split CNN Training .keras File
Label Encoding Normalization Optimization Metadata
Validation Augmentation Evaluation Thresholds
Sensor Reading → API Endpoint → Detector → Model → Post-processing → Response
↓ ↓ ↓ ↓ ↓ ↓
6D Vector Validation Preprocessing CNN Threshold JSON Result
(ax,ay,az, Format Check Normalization Inference Application Step Events
gx,gy,gz) Type Check Reshaping Softmax Step Logic Probabilities
Client → WebSocket → API Handler → Step Detector → Model Inference → Response → Client
↓ ↓ ↓ ↓ ↓ ↓ ↓
JSON Connection Validation Process Reading Predictions JSON Reply Update UI
Data Management Error Check State Update Thresholding Broadcast Show Steps
Purpose: Contains the core business logic for step detection.
Classes:
-
StepDetector
- Responsibilities: Real-time step detection, session management
- Dependencies: TensorFlow model, numpy
- Key Methods:
process_reading(): Process single sensor readingget_session_summary(): Get session statisticsreset(): Reset detection state
-
SimpleStepCounter
- Responsibilities: Simple step counting without detailed events
- Dependencies: TensorFlow model
- Key Methods:
process_reading(): Process reading and return booleanget_count(): Get total step countreset(): Reset counter
Design Patterns:
- State Pattern: Managing step detection state (start, in-progress, end)
- Strategy Pattern: Different detection strategies (threshold-based, ML-based)
Purpose: Machine learning model operations and utilities.
Functions:
-
create_cnn_model()
- Creates and compiles CNN architecture
- Configurable input shape and classes
- Returns compiled Keras model
-
train_model()
- Handles model training with callbacks
- Early stopping and learning rate scheduling
- Returns training history
-
evaluate_model()
- Comprehensive model evaluation
- Classification metrics and confusion matrix
- Returns evaluation results
Design Patterns:
- Factory Pattern: Model creation
- Builder Pattern: Complex model configuration
Purpose: Data processing and utility functions.
Functions:
-
load_step_data()
- Loads and combines CSV files
- Data validation and cleaning
- Returns pandas DataFrame
-
prepare_data_for_training()
- Feature extraction and label encoding
- Train/validation split with stratification
- Data type optimization
Design Patterns:
- Pipeline Pattern: Data processing pipeline
- Adapter Pattern: Different data format handling
Purpose: External interface layer providing REST and WebSocket APIs.
Components:
-
FastAPI Application
- RESTful endpoints for step detection
- WebSocket for real-time communication
- Request/response models with Pydantic
-
Middleware
- CORS handling
- Request logging
- Error handling
Design Patterns:
- Facade Pattern: Simplified interface to complex subsystem
- Observer Pattern: WebSocket event handling
# Synchronous communication
@app.post("/detect_step")
async def detect_step(reading: SensorReading):
result = detector.process_reading(**reading.dict())
return StepDetectionResponse(**result)# Asynchronous, bidirectional communication
@app.websocket("/ws/realtime")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
result = process_sensor_data(data)
await websocket.send_text(json.dumps(result))# Step detection events
class StepEvent:
def __init__(self, event_type: str, timestamp: str, data: Dict):
self.event_type = event_type # "start", "end", "completed"
self.timestamp = timestamp
self.data = data
# Event handlers
def on_step_start(event: StepEvent):
logger.info(f"Step started at {event.timestamp}")
def on_step_completed(event: StepEvent):
logger.info(f"Step completed: {event.data}")# API Layer - HTTP errors
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={"detail": str(exc)}
)
# Business Logic Layer - Domain errors
class StepDetectionError(Exception):
pass
class ModelNotLoadedError(StepDetectionError):
pass
# Infrastructure Layer - System errors
class ModelLoadError(Exception):
passdef process_reading_with_fallback(self, *args):
try:
return self.ml_detector.process_reading(*args)
except ModelNotLoadedError:
# Fallback to simple rule-based detection
return self.rule_based_detector.process_reading(*args)
except Exception as e:
logger.error(f"Detection failed: {e}")
return self.create_error_response()# Multiple API instances
services:
api-1:
image: step-detection:latest
ports: ["8001:8000"]
api-2:
image: step-detection:latest
ports: ["8002:8000"]
load-balancer:
image: nginx:alpine
ports: ["8000:80"]
depends_on: [api-1, api-2]# Singleton pattern for model loading
class ModelManager:
_instance = None
_model = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def get_model(self, model_path: str):
if self._model is None:
self._model = tf.keras.models.load_model(model_path)
return self._modelimport asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncStepDetector:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=4)
async def process_reading_async(self, *args):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
self.executor,
self.detector.process_reading,
*args
)
return result# JWT-based authentication
from fastapi_users import FastAPIUsers
from fastapi_users.authentication import JWTAuthentication
jwt_authentication = JWTAuthentication(
secret=SECRET_KEY,
lifetime_seconds=3600,
tokenUrl="/auth/login"
)
# Protected endpoints
@app.get("/protected")
async def protected_endpoint(user: User = Depends(current_user)):
return {"message": f"Hello {user.email}"}# Pydantic models for validation
class SensorReading(BaseModel):
accel_x: confloat(ge=-50, le=50)
accel_y: confloat(ge=-50, le=50)
accel_z: confloat(ge=-50, le=50)
gyro_x: confloat(ge=-10, le=10)
gyro_y: confloat(ge=-10, le=10)
gyro_z: confloat(ge=-10, le=10)from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@app.post("/detect_step")
@limiter.limit("60/minute")
async def detect_step(request: Request, reading: SensorReading):
# Protected endpoint# Test individual components
class TestStepDetector:
def test_process_reading_valid_input(self):
detector = StepDetector("test_model.keras")
result = detector.process_reading(1.0, 0.5, 9.8, 0.1, 0.2, 0.0)
assert "step_start" in result
assert "step_end" in result# Test component interactions
class TestAPIIntegration:
def test_step_detection_endpoint(self):
response = client.post("/detect_step", json={
"accel_x": 1.0, "accel_y": 0.5, "accel_z": 9.8,
"gyro_x": 0.1, "gyro_y": 0.2, "gyro_z": 0.0
})
assert response.status_code == 200# Test complete workflows
class TestE2E:
def test_training_to_inference_pipeline(self):
# Train model
model = train_model(training_data)
# Save and load model
save_model(model, "test_model.keras")
detector = StepDetector("test_model.keras")
# Test inference
result = detector.process_reading(*test_data)
assert result is not Nonefrom prometheus_client import Counter, Histogram
# Business metrics
step_detection_requests = Counter('step_detection_requests_total')
step_detection_duration = Histogram('step_detection_duration_seconds')
# System metrics
model_inference_time = Histogram('model_inference_time_seconds')
websocket_connections = Counter('websocket_connections_total')import structlog
logger = structlog.get_logger()
def process_reading(self, *args):
logger.info(
"processing_sensor_reading",
accel_magnitude=calculate_magnitude(args[:3]),
gyro_magnitude=calculate_magnitude(args[3:]),
session_id=self.session_id
)@app.get("/health")
async def health_check():
return {
"status": "healthy",
"model_loaded": detector is not None,
"memory_usage": get_memory_usage(),
"uptime": get_uptime()
}┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ API │ │ Model │ │ Data │
│ Gateway │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└───────────────┼───────────────┘
│
┌─────────────┐
│ Message │
│ Queue │
└─────────────┘
# Event store for step detection history
class StepEvent:
def __init__(self, event_type: str, data: Dict, timestamp: datetime):
self.event_type = event_type
self.data = data
self.timestamp = timestamp
class EventStore:
def append(self, event: StepEvent):
# Store event
def get_events(self, session_id: str) -> List[StepEvent]:
# Retrieve events# Separate read and write models
class StepDetectionCommand:
def execute(self, sensor_data: SensorReading):
# Write operation
class StepDetectionQuery:
def get_session_summary(self, session_id: str):
# Read operationThis architecture provides a solid foundation for building scalable, maintainable, and production-ready step detection systems.