| Component | Technology | License |
|---|---|---|
| Language | Python 3.11+ | PSF |
| Messaging | pyzmq (ZeroMQ bindings) | LGPL/BSD |
| Database | SQLite3 (built into Python) | Public Domain |
| Containers | Docker + Docker Compose | Apache 2.0 |
| Serialization | JSON (stdlib) | - |
| Config | JSON (stdlib json) |
- |
| Logging | Python logging (stdlib) |
- |
| Testing/Metrics | time (stdlib), pytest |
MIT |
distribuidos/
├── config/
│ └── city_config.json # Grid size, sensors, rules, timings, ports, network
├── common/
│ ├── __init__.py
│ ├── models.py # Data classes for events, commands
│ ├── constants.py # Ports, topics, thresholds, states, timings
│ ├── config_loader.py # Config singleton loader
│ ├── db_utils.py # SQLite helper (create tables, insert, query)
│ └── monitoring_commands.py # Shared monitoring CLI commands and formatters
├── pc1/
│ ├── __init__.py
│ ├── sensors/
│ │ ├── __init__.py
│ │ ├── camera_sensor.py # PUB - Queue length events (Lq)
│ │ ├── inductive_sensor.py # PUB - Vehicle count events (Cv)
│ │ └── gps_sensor.py # PUB - Traffic density events (Dt)
│ ├── broker.py # SUB (from sensors) -> PUB (to PC2) - standard
│ ├── broker_threaded.py # Multithreaded broker variant (inproc PUSH/PULL)
│ └── start_pc1.py # Launcher: broker + all sensors as subprocesses
├── pc2/
│ ├── __init__.py
│ ├── analytics_service.py # SUB (from broker) + REP (from PC3) + PUSH (to DBs)
│ ├── traffic_light_control.py # Receives commands, manages semaphore states
│ ├── db_replica.py # PULL - receives data for replica SQLite DB
│ ├── health_checker.py # FailoverState + health check daemon thread
│ ├── monitoring_fallback.py # Fallback monitoring CLI on PC2 during failover
│ └── start_pc2.py # Launcher: db_replica + semaphore + analytics
├── pc3/
│ ├── __init__.py
│ ├── monitoring_service.py # REQ/REP queries + direct commands to analytics
│ ├── db_primary.py # PULL - receives data for primary SQLite DB + health REP
│ └── start_pc3.py # Launcher: db_primary subprocess + monitoring foreground
├── docker-compose.yml
├── Dockerfile.pc1
├── Dockerfile.pc2
├── Dockerfile.pc3
├── .dockerignore
├── requirements.txt
├── requirements-dev.txt # Extends requirements.txt + ruff
├── ruff.toml # Linter/formatter configuration
├── .coveragerc # Coverage config for pytest-cov
├── .github/
│ └── workflows/
│ ├── ci.yml # CI: ruff check + format + pytest
│ ├── test-coverage.yml # CI: pytest with coverage report + artifact
│ ├── config-consistency.yml # CI: validates city_config.json structure
│ ├── docker-smoke.yml # CI: Docker build + import validation
│ ├── docker-integration.yml # CI: full ZMQ integration + failover test
│ └── docker-sensor-count.yml # CI: validates SENSOR_COUNT env var
├── PLAN.md # This file - master development plan
├── phase3_plan.md # Phase 3 design decisions
├── phase4_plan.md # Phase 4 design decisions
├── phase5_plan.md # Phase 5 design decisions
├── monitoring_fallback.md # Bug report: cross-PC import fix
├── README.md # Project readme with usage instructions
├── ideas.md # Post-project enhancement ideas (GUI, ESP32)
├── perf/
│ ├── __init__.py
│ ├── run_scenarios.py # Performance test scenario runner (Docker-based)
│ └── generate_graphs.py # Matplotlib chart generation from results
└── tests/
├── test_common.py # 32 tests - models, config, DB utils, validation
├── test_sensors.py # 23 tests - sensor events, broker modes, threaded broker
├── test_analytics.py # 35 tests - analytics, rules, semaphore, green wave DB
├── test_monitoring.py # 33 tests - DB primary, monitoring CLI, fallback, errors
├── test_failover.py # 15 tests - failover state, health checker, recovery
└── test_performance.py # 29 tests - latency, sensor count, graphs, launcher args
- Create the directory structure above
- Create
requirements.txtwith:pyzmq,pytest,matplotlib - Create base Dockerfiles for each PC
- Define the 4x4 grid (rows A-D, columns 1-4)
- Map which intersections have which sensors
- Define ZMQ ports and addresses
- Define traffic rules and thresholds
Example config (see config/city_config.json for the full version):
{
"city": {
"name": "Ciudad Simulada",
"grid": { "rows": ["A","B","C","D"], "columns": [1,2,3,4] }
},
"sensors": {
"cameras": [{"sensor_id": "CAM-A1", "interseccion": "INT-A1"}, ...],
"inductive_loops": [{"sensor_id": "ESP-A2", "interseccion": "INT-A2"}, ...],
"gps": [{"sensor_id": "GPS-A1", "interseccion": "INT-A1"}, ...]
},
"rules": {
"normal": { "conditions": { "Q_max": 5, "Vp_min": 35, "D_max": 20 } },
"congestion": { "conditions": { "Q_min": 10, "Vp_max": 20, "D_min": 40 } },
"green_wave": { "conditions": { "trigger": "user_command" } }
},
"timings": {
"normal_cycle_sec": 15,
"congestion_extension_sec": 10,
"green_wave_duration_sec": 30,
"sensor_default_interval_sec": 10,
"inductive_interval_sec": 30,
"health_check_interval_sec": 5,
"health_check_timeout_ms": 2000,
"health_check_max_retries": 3
},
"zmq_ports": {
"sensor_camera_pub": 5555,
"sensor_inductive_pub": 5556,
"sensor_gps_pub": 5557,
"broker_pub": 5560,
"analytics_rep": 5561,
"semaphore_control_pull": 5562,
"db_primary_pull": 5563,
"db_replica_pull": 5564,
"health_check_rep": 5565
},
"network": {
"pc1_host": "pc1",
"pc2_host": "pc2",
"pc3_host": "pc3"
}
}- Define data classes for:
CameraEvent,InductiveEvent,GPSEvent,SemaphoreCommand,MonitoringQuery,MonitoringResponse - JSON serialization/deserialization helpers
- Config loader utility (
common/config_loader.py)
- Create tables:
sensor_events,semaphore_states,congestion_history,priority_actions - Helper functions:
insert_event(),query_by_time_range(),query_by_intersection(),get_semaphore_state() - Performance metric helper:
get_event_count_in_interval() - System summary:
get_system_summary()
Each sensor type runs as a single process handling all sensors of that type via a shared ZMQ PUB socket:
- Camera sensor (
camera_sensor.py): Publishes on topic"camara". Generates randomvolumen(0-20 vehicles) andvelocidad_promedio(5-50 km/h) at configured intervals. - Inductive loop sensor (
inductive_sensor.py): Publishes on topic"espira". Generates randomvehiculos_contados(0-30) every 30 seconds. - GPS sensor (
gps_sensor.py): Publishes on topic"gps". Generates randomvelocidad_promedioand derivesnivel_congestion(ALTA < 10, NORMAL 10-40, BAJA > 40).
All sensors:
- Accept CLI params or read config for: intersection ID, generation interval
- Print each generated event to stdout
- Attach proper timestamps
Sensor event formats:
Camera event (EVENTO_LONGITUD_COLA - Lq):
{
"sensor_id": "CAM-C5",
"tipo_sensor": "camara",
"interseccion": "INT-C5",
"volumen": 10,
"velocidad_promedio": 25,
"timestamp": "2026-02-09T15:10:00Z"
}Inductive loop event (EVENTO_CONTEO_VEHICULAR - Cv):
{
"sensor_id": "ESP-C5",
"tipo_sensor": "espira_inductiva",
"interseccion": "INT-C5",
"vehiculos_contados": 12,
"intervalo_segundos": 30,
"timestamp_inicio": "2026-02-09T15:20:00Z",
"timestamp_fin": "2026-02-09T15:20:30Z"
}GPS event (EVENTO_DENSIDAD_DE_TRAFICO - Dt):
{
"sensor_id": "GPS-C5",
"tipo_sensor": "gps",
"interseccion": "INT-C5",
"nivel_congestion": "ALTA",
"velocidad_promedio": 18,
"timestamp": "2026-02-09T15:20:10Z"
}Congestion level rules for GPS:
- ALTA: velocidad_promedio < 10
- NORMAL: 10 <= velocidad_promedio <= 40
- BAJA: velocidad_promedio > 40
- SUB socket subscribing to topics:
"camara","espira","gps"(from local sensors) - PUB socket forwarding all received events to PC2
- Uses zmq.Poller for efficient multi-socket polling
- Print forwarded messages to stdout
- Same broker but using
threadingto handle each topic subscription in a separate thread - Uses inproc PUSH/PULL pattern to safely share messages between threads
- This is needed for the performance experiments in the final delivery
- Entrypoint script that launches broker + all sensor processes as subprocesses
- Supports
--broker-mode standard|threadedand--intervaloverride - Graceful shutdown: terminates all children on SIGINT/SIGTERM
- Environment variable support:
BROKER_MODE,SENSOR_INTERVAL
This is the core brain of the system. Multiple ZMQ sockets:
- SUB socket: Subscribes to broker's PUB (receives all sensor events)
- PUSH socket (x2): Sends data to primary DB (PC3) and replica DB (PC2)
- PUSH to semaphore control: Sends light-change commands (PUSH/PULL pattern)
- REP socket: Responds to monitoring queries from PC3 (REQ/REP)
Logic:
- Receive sensor event
- Evaluate rules per intersection:
- Normal traffic:
Q < 5 AND Vp > 35 AND D < 20-> standard 15s red cycle - Congestion detected:
Q >= 10 OR Vp <= 20 OR D >= 40-> extend green by 10s on congested direction - Green wave (priority): User-triggered via monitoring -> force all semaphores on a route to green
- Normal traffic:
- If state change needed, send command to semaphore control service
- PUSH event data to both databases
- Print all decisions and actions to stdout
- Maintains in-memory state of all semaphores (intersection -> color)
- PULL or SUB socket: Receives commands from analytics
- Executes light changes: red->green, green->red
- Prints every state change:
"[INT-B3] RED -> GREEN (reason: congestion detected)"
- PULL socket: Receives data from analytics service
- Inserts into local SQLite replica database
- This replica activates as primary if PC3 fails
- PULL socket: Receives data from analytics service (PC2)
- Inserts into primary SQLite database
- Prints operations to stdout
- Health check REP daemon thread (port 5565) for Phase 5
- Interactive CLI for the user
- REQ socket to analytics service (PC2) for queries and commands
Supported operations:
- Estado actual de una interseccion (e.g., INT-B3)
- Historial de congestion entre dos timestamps
- Forzar ola verde en una via (ambulancia)
- Cambiar semaforo de una interseccion especifica
- Estado general del sistema
- Health check
- Prints all operations and responses
- Launches db_primary as background subprocess
- Runs monitoring_service in foreground for interactive stdin access
- Graceful shutdown of background process
- PC2's analytics service periodically sends a heartbeat REQ to PC3's DB
- If no response after N attempts (e.g., 3 retries with 2s timeout), declare PC3 as failed
- Implement using ZMQ
RCVTIMEOsocket option (Lazy Pirate pattern with socket recreation)
- When PC3 failure detected:
- Analytics redirects all PUSH writes to PC2's replica only
- PUSH socket to PC3 is disconnected to prevent ZMQ buffering
- Print alert:
"[FAILOVER] PC3 is down. Using replica DB on PC2."
- Operation continues transparently
- Fallback monitoring CLI on PC2 (
monitoring_fallback.py) - Automatic recovery when PC3 comes back (health checker detects PONG)
- Separate ZMQ context for PC3-bound sockets: When PC3's Docker container stops, DNS resolution blocks ZMQ I/O threads. Using a dedicated
pc3_contextisolates these from the main context's SUB/REP sockets - Thread-safe
_send_to_primary()callable:push_to_dbs()acceptsCallable[[str], None] | Noneinstead of raw socket. The callable holds a lock during send so the health checker's failover callback can't close the socket mid-send. Useszmq.NOBLOCKto never block the main loop
- 15 unit tests covering FailoverState, HealthChecker, push_to_dbs, full integration
- All 167 tests passing (
pytest tests/ -v)
services:
pc1:
build: { context: ., dockerfile: Dockerfile.pc1 }
container_name: pc1-sensors-broker
networks: [traffic-net]
depends_on:
pc2: { condition: service_started }
environment:
- BROKER_MODE=${BROKER_MODE:-standard}
- SENSOR_INTERVAL=${SENSOR_INTERVAL:-0}
restart: unless-stopped
pc2:
build: { context: ., dockerfile: Dockerfile.pc2 }
container_name: pc2-analytics
networks: [traffic-net]
volumes: [pc2-data:/data]
restart: unless-stopped
pc3:
build: { context: ., dockerfile: Dockerfile.pc3 }
container_name: pc3-monitoring
networks: [traffic-net]
volumes: [pc3-data:/data]
depends_on:
pc2: { condition: service_started }
stdin_open: true
tty: true
restart: unless-stopped
networks:
traffic-net:
driver: bridge
volumes:
pc2-data:
pc3-data:- Based on
python:3.11-slim - Install
pyzmq - Copy respective PC code + common modules
- Entrypoint scripts that launch all processes for that PC
- Build and verify all 3 images
- Integration test: docker compose up, verify cross-container ZMQ communication
- Failover test: docker stop pc3, verify system continues on replica
- Recovery test: docker start pc3, verify auto-recovery
- Sensor count control:
--sensor-count Ninstart_pc1.py+--count Nin each sensor scriptSENSOR_COUNTenv var indocker-compose.yml- Slices config lists to first N sensors per type
- Latency instrumentation:
created_at: floatfield onSemaphoreCommand(wall-clocktime.time())traffic_light_control.pycomputes and logs[LATENCY] INT-XX: N.NN mson every state change
- Throughput:
get_event_count_in_interval()already indb_utils.py
| Scenario | Sensors | Interval | Design |
|---|---|---|---|
| 1A | 1 of each type (3 total) | 10s | Standard broker |
| 1B | 1 of each type (3 total) | 10s | Multithreaded broker |
| 2A | 2 of each type (6 total) | 5s | Standard broker |
| 2B | 2 of each type (6 total) | 5s | Multithreaded broker |
-
perf/run_scenarios.py: automated scenario runner using Docker Compose- Runs each scenario for configurable duration (default 2 min)
- Collects throughput via
docker execinto replica DB - Collects latency by parsing
[LATENCY]log lines - Saves results to
benchmark_results/results.json
-
perf/generate_graphs.py: matplotlib chart generation- Throughput grouped bar chart (standard vs threaded, by sensor count)
- Latency grouped bar chart (with min/max error bars)
- Per-scenario throughput and latency charts
- Output to
benchmark_results/graphs/
- 29 unit tests in
tests/test_performance.py(167 total tests passing)
- System models: architectural, interaction, fault, security + McCuber cube
- UML diagrams: deployment, component, class, sequence
- Explanation of:
- a) How processes obtain initial resource definitions (sensors, grid size, semaphores)
- b) Traffic rules for 3 states
- c) Query types available to users
- d) Examples of direct commands from Monitoring to Analytics
- Test protocol document
- Performance metrics methodology
- Working implementation: PC1 + PC2 services + DB writes to PC3
- Source code of implemented functionality
- Complete working system across 3 Docker containers
- Source code (.zip) + README with execution instructions
- Complement first delivery documentation
- Documented source files
- 10-minute demo video covering:
- a) Component distribution across machines
- b) Parameters for all process types
- c) City grid distribution among sensors and semaphore assignment
- d) Libraries and patterns used
- e) Fault handling
- Performance report (max 5 pages): hw/sw specs, measurement tools, tables, graphs, analysis
Sensor (PUB) --[camara/espira/gps]--> Broker (SUB/PUB) --[all topics]--> Analytics (SUB)
|
PUSH/PULL --------+-----> DB Primary (PC3)
PUSH/PULL --------+-----> DB Replica (PC2)
PUSH/PULL --------+-----> Semaphore Control (PC2)
REP <---REQ-------+-----> Monitoring (PC3)
REQ/REP ----------+-----> Health Check (PC3)
| State | Condition | Action |
|---|---|---|
| Normal | Q < 5 AND Vp > 35 AND D < 20 | Standard 15s red/green cycle |
| Congestion | Q >= 10 OR Vp <= 20 OR D >= 40 | Extend green +10s on congested direction, reduce cross-street green |
| Green Wave | User command (ambulance) | Force all semaphores on specified route to GREEN for 30s |
- All roads are one-way
- Semaphore changes are only green->red and red->green (no yellow)
- City grid is 4x4 (rows A-D, columns 1-4) = 16 intersections
- Normal red-to-green wait time: 15 seconds
| Phase | Days | Description |
|---|---|---|
| 1 | 1-2 | Foundation, config, models, DB schema |
| 2 | 3-5 | PC1: Sensors + Broker |
| 3 | 5-8 | PC2: Analytics + Semaphore Control + Replica |
| 4 | 8-10 | PC3: Primary DB + Monitoring CLI |
| 5 | 10-12 | Fault tolerance (health check + failover) |
| 6 | 12-13 | Docker Compose integration |
| 7 | 13-15 | Performance experiments |
| 8 | Ongoing | Documentation & deliverables |