A production-ready federated learning system enabling privacy-preserving distributed ML training across edge clients without centralizing raw data.
┌─────────────────┐
│ Rust gRPC │
│ Coordinator │ ← Manages clients, rounds, model distribution
│ (Port 50051) │
└────────┬────────┘
│
├──────────────┐
│ │
┌────────▼────────┐ ┌──▼──────────────┐
│ C++ Gradient │ │ Prometheus │
│ Aggregator │ │ Metrics │
│ (Port 50052) │ │ (Port 9090) │
└────────┬────────┘ └─────────────────┘
│
┌────┴────┬────────┬────────┬────────┐
│ │ │ │ │
┌───▼───┐ ┌──▼───┐ ┌──▼───┐ ┌──▼───┐ ┌──▼───┐
│Client0│ │Client1│ │Client2│ │Client3│ │Client4│
│PyTorch│ │PyTorch│ │PyTorch│ │PyTorch│ │PyTorch│
└───────┘ └───────┘ └───────┘ └───────┘ └───────┘
- Differential Privacy: Gradient clipping and Gaussian noise injection
- No Raw Data Sharing: Only model updates leave client devices
- Byzantine Fault Tolerance: Median-based outlier detection
- Gradient Compression: Quantization and top-K sparsification
- Efficient Communication: gRPC with Protocol Buffers
- Parallel Aggregation: Multi-threaded C++ aggregator
- Prometheus Metrics: Real-time training statistics
- Grafana Dashboards: Visualize progress and client health
- Distributed Tracing: Track requests across services
- Rust 1.70+
- C++17 compiler (g++ or clang)
- CMake 3.15+
- Python 3.8+
- gRPC and Protocol Buffers
cd coordinator
cargo build --releasecd aggregator
mkdir build && cd build
cmake ..
makecd python_client
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# Generate protobuf code
cd ../proto
python -m grpc_tools.protoc -I. --python_out=../python_client \
--grpc_python_out=../python_client coordinator.protoTerminal 1 - Start Coordinator:
cd coordinator
cargo run --releaseTerminal 2 - Start Aggregator:
cd aggregator/build
./aggregator_serverTerminal 3-7 - Start 5 Clients:
cd python_client
source venv/bin/activate
# In separate terminals
python client.py --client-id client_0
python client.py --client-id client_1
python client.py --client-id client_2
python client.py --client-id client_3
python client.py --client-id client_4- Metrics: http://localhost:9090/metrics
- Health: http://localhost:9090/health
Edit coordinator/src/coordinator.rs:
fn default_config() -> TrainingConfig {
TrainingConfig {
total_rounds: 10, // Number of federated rounds
local_epochs: 5, // Epochs per client per round
batch_size: 32, // Training batch size
learning_rate: 0.01, // SGD learning rate
enable_compression: true, // Gradient compression
enable_differential_privacy: true,
dp_config: Some(DpParameters {
epsilon: 2.0, // Privacy budget
delta: 1e-5,
noise_multiplier: 1.1,
max_grad_norm: 1.0, // Gradient clipping threshold
}),
aggregation_threshold: 3, // Min clients per round
}
}Adjust privacy parameters in python_client/client.py:
self.dp = DifferentialPrivacy(
epsilon=2.0, # Lower = more privacy, less accuracy
delta=1e-5, # Failure probability
max_grad_norm=1.0 # Gradient clipping threshold
).
├── coordinator/ # Rust gRPC coordination service
│ ├── src/
│ │ ├── main.rs # Server entry point
│ │ ├── coordinator.rs # Core coordination logic
│ │ ├── client_manager.rs # Client registration & tracking
│ │ └── metric.rs # Prometheus metrics
│ ├── Cargo.toml
│ └── build.rs
│
├── aggregator/ # C++ gradient aggregation
│ ├── include/
│ │ ├── aggregator.h # Aggregation interface
│ │ └── byzantine_detector.h # Byzantine fault tolerance
│ ├── src/
│ │ ├── main.cpp # gRPC server
│ │ ├── aggregator.cpp # Median aggregation
│ │ ├── byzantine_detector.cpp
│ │ └── compression.cpp # Gradient compression
│ └── CMakeLists.txt
│
├── python_client/ # PyTorch training clients
│ ├── client.py # Main client implementation
│ ├── requirements.txt
│ └── README.md
│
├── proto/ # Protocol Buffer definitions
│ └── coordinator.proto
│
└── README.md
federated_active_clients- Number of connected clientsfederated_current_round- Current training roundfederated_model_accuracy- Global model accuracyfederated_gradients_received_total- Total gradient submissionsfederated_byzantine_clients_detected_total- Malicious clients detectedfederated_aggregation_seconds- Aggregation time histogram
# Average accuracy over time
avg(federated_model_accuracy)
# Client participation rate
rate(federated_client_participation_total[5m])
# Aggregation latency (p95)
histogram_quantile(0.95, federated_aggregation_seconds_bucket)
Import the dashboard JSON:
{
"dashboard": {
"title": "Federated Learning",
"panels": [
{
"title": "Model Accuracy",
"targets": [{ "expr": "federated_model_accuracy" }]
},
{
"title": "Active Clients",
"targets": [{ "expr": "federated_active_clients" }]
},
{
"title": "Aggregation Time",
"targets": [{ "expr": "rate(federated_aggregation_seconds_sum[5m])" }]
}
]
}
}# Rust tests
cd coordinator
cargo test
# C++ tests (if implemented)
cd aggregator/build
ctest# Start all services and run a quick training round
./scripts/integration_test.shOn a MacBook Pro M1 with 5 clients:
- Round Time: ~15-20 seconds
- Aggregation: <100ms for 7840 parameters
- Communication: ~50ms per gradient submission
- Memory: ~200MB per client, ~100MB coordinator
- Differential Privacy: Provides (ε, δ)-DP guarantees
- Byzantine Tolerance: Detects up to 20% malicious clients
- Secure Communication: Use TLS in production (currently insecure for demo)
- Authentication: Session tokens (use proper auth in production)
version: "3.8"
services:
coordinator:
build: ./coordinator
ports:
- "50051:50051"
- "9090:9090"
aggregator:
build: ./aggregator
ports:
- "50052:50052"
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9091:9090"
grafana:
image: grafana/grafana
ports:
- "3000:3000"See k8s/ directory for deployment manifests.
# Check coordinator is running
curl http://localhost:9090/health
# Check port is open
lsof -i :50051# Check aggregator logs
cd aggregator/build
./aggregator_server --verbose- Increase
local_epochs(more training per round) - Decrease
epsilon(less DP noise, but less privacy) - Increase
aggregation_threshold(more clients per round)