Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ dependencies = [
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.0.0",
"httpx>=0.24.0",
"black>=23.0.0",
"ruff>=0.1.0",
]
Expand Down
16 changes: 1 addition & 15 deletions src/qwed_a2a/interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,7 @@
self._record(verdict, message.sender_agent_id, start_time)
return verdict

# --- Step 2: Check trusted agent bypass ---
if self.config.trusted_agents and (
message.sender_agent_id in self.config.trusted_agents
):
verdict = self._build_verdict(
trace_id=trace_id,
status=VerdictStatus.FORWARDED,
reason="Sender is on the trusted agents allowlist",
engine="bypass",
message=message,
)
self._record(verdict, message.sender_agent_id, start_time)
return verdict

# --- Step 3: Route to verification engine ---
# --- Step 2: Route to verification engine ---
try:
engine_result = self._route_to_engine(message)
except Exception as exc:
Expand Down Expand Up @@ -286,17 +272,17 @@

# Compiled regex patterns for case-insensitive, whitespace-tolerant detection
_DANGEROUS_PATTERNS: Dict[str, re.Pattern] = {
"eval": re.compile(r"\beval\s*\(", re.IGNORECASE),

Check warning on line 275 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"exec": re.compile(r"\bexec\s*\(", re.IGNORECASE),

Check warning on line 276 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"subprocess": re.compile(

Check warning on line 277 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
r"\b(?:subprocess\s*\.|import\s+subprocess\b|from\s+subprocess\s+import\b)",
re.IGNORECASE,
),
"os.system": re.compile(r"\bos\.system\s*\(", re.IGNORECASE),

Check warning on line 281 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"os.popen": re.compile(r"\bos\.popen\s*\(", re.IGNORECASE),

Check warning on line 282 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"__import__": re.compile(r"__import__\s*\(", re.IGNORECASE),

Check warning on line 283 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"compile": re.compile(r"\bcompile\s*\(", re.IGNORECASE),

Check warning on line 284 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
"importlib": re.compile(r"\bimportlib\s*\.", re.IGNORECASE),

Check warning on line 285 in src/qwed_a2a/interceptor.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

compile() can be part of dynamic code generation. Context=RUNTIME_CODE. Decision reason: Executable runtime path contains a risky but non-blocking pattern.
}

def _verify_code(self, payload: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
201 changes: 201 additions & 0 deletions tests/test_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""
Tests for QWED A2A protocol endpoints — FastAPI gateway layer.
Covers: get_interceptor(), configure_interceptor(), _load_trusted_agents(),
/a2a/intercept, /a2a/health, /a2a/metrics routes.
"""

from unittest.mock import MagicMock, patch

import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient

from qwed_a2a.protocol import endpoints as ep
from qwed_a2a.protocol.endpoints import (
_load_trusted_agents,
configure_interceptor,
get_interceptor,
router,
)
from qwed_a2a.protocol.schema import InterceptorConfig


# ─── fixtures ──────────────────────────────────────────────────────────────────


@pytest.fixture(autouse=True)
def reset_interceptor_singleton():
ep._interceptor = None
yield
ep._interceptor = None


@pytest.fixture
def app():
application = FastAPI()
application.include_router(router)
return application


@pytest.fixture
def client(app):
return TestClient(app)


@pytest.fixture
def general_payload():
return {
"sender_agent_id": "agent-alpha",
"receiver_agent_id": "agent-beta",
"payload_type": "general",
"payload": {"msg": "hello"},
}


@pytest.fixture
def financial_payload():
return {
"sender_agent_id": "procurement-agent",
"receiver_agent_id": "treasury-agent",
"payload_type": "financial_transaction",
"payload": {
"data": {
"claimed_total": "100.00",
"line_items": [
{"description": "Widget", "amount": "100.00", "quantity": 1}
],
}
},
}


# ─── _load_trusted_agents ──────────────────────────────────────────────────────


class TestLoadTrustedAgents:
def test_loads_agents_from_env(self, monkeypatch):
monkeypatch.setenv("QWED_A2A_TRUSTED_AGENTS", "agent-a,agent-b")
interceptor = MagicMock()
_load_trusted_agents(interceptor)
assert interceptor.trust.trust_agent.call_count == 2
calls = [c.args[0] for c in interceptor.trust.trust_agent.call_args_list]
assert "agent-a" in calls
assert "agent-b" in calls

def test_ignores_empty_entries(self, monkeypatch):
monkeypatch.setenv("QWED_A2A_TRUSTED_AGENTS", "agent-a,, ,agent-b")
interceptor = MagicMock()
_load_trusted_agents(interceptor)
assert interceptor.trust.trust_agent.call_count == 2

def test_no_env_var_no_agents_registered(self, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
interceptor = MagicMock()
_load_trusted_agents(interceptor)
interceptor.trust.trust_agent.assert_not_called()

def test_whitespace_stripped_from_agent_ids(self, monkeypatch):
monkeypatch.setenv("QWED_A2A_TRUSTED_AGENTS", " agent-x , agent-y ")
interceptor = MagicMock()
_load_trusted_agents(interceptor)
calls = [c.args[0] for c in interceptor.trust.trust_agent.call_args_list]
assert "agent-x" in calls
assert "agent-y" in calls


# ─── singleton ────────────────────────────────────────────────────────────────


class TestInterceptorSingleton:
def test_get_interceptor_returns_instance(self, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert get_interceptor() is not None

def test_get_interceptor_is_singleton(self, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert get_interceptor() is get_interceptor()

def test_configure_interceptor_replaces_singleton(self, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
original = get_interceptor()
configure_interceptor(InterceptorConfig())
assert get_interceptor() is not original


# ─── /a2a/health ──────────────────────────────────────────────────────────────


class TestHealthEndpoint:
def test_returns_200(self, client):
assert client.get("/a2a/health").status_code == 200

def test_returns_correct_fields(self, client):
data = client.get("/a2a/health").json()
assert data["status"] == "healthy"

Check warning on line 134 in tests/test_endpoints.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

assert should not be the only input-validation boundary. Context=TEST_CODE. Decision reason: Pattern detected in test code; surfaced as advisory instead of blocking runtime execution.
assert data["service"] == "qwed-a2a"

Check warning on line 135 in tests/test_endpoints.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

assert should not be the only input-validation boundary. Context=TEST_CODE. Decision reason: Pattern detected in test code; surfaced as advisory instead of blocking runtime execution.
assert "version" in data

Check warning on line 136 in tests/test_endpoints.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

assert should not be the only input-validation boundary. Context=TEST_CODE. Decision reason: Pattern detected in test code; surfaced as advisory instead of blocking runtime execution.


# ─── /a2a/metrics ─────────────────────────────────────────────────────────────


class TestMetricsEndpoint:
def test_returns_200(self, client, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert client.get("/a2a/metrics").status_code == 200

def test_returns_dict(self, client, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert isinstance(client.get("/a2a/metrics").json(), dict)


# ─── /a2a/intercept ───────────────────────────────────────────────────────────


class TestInterceptEndpoint:
def test_general_message_returns_200(self, client, general_payload, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert client.post("/a2a/intercept", json=general_payload).status_code == 200

def test_returns_verdict_fields(self, client, general_payload, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
data = client.post("/a2a/intercept", json=general_payload).json()
assert "status" in data

Check warning on line 163 in tests/test_endpoints.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

assert should not be the only input-validation boundary. Context=TEST_CODE. Decision reason: Pattern detected in test code; surfaced as advisory instead of blocking runtime execution.
assert "audit_trace_id" in data

Check warning on line 164 in tests/test_endpoints.py

View check run for this annotation

QWED Security / QWED Security

QWED: pattern_scan

assert should not be the only input-validation boundary. Context=TEST_CODE. Decision reason: Pattern detected in test code; surfaced as advisory instead of blocking runtime execution.

def test_valid_financial_forwarded(self, client, financial_payload, monkeypatch):
# Both agents must be trusted so the zero-trust boundary allows the request
monkeypatch.setenv(
"QWED_A2A_TRUSTED_AGENTS", "procurement-agent,treasury-agent"
)
resp = client.post("/a2a/intercept", json=financial_payload)
assert resp.status_code == 200
assert resp.json()["status"] == "forwarded"

def test_malformed_body_returns_422(self, client, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)
assert (
client.post("/a2a/intercept", json={"bad": "data"}).status_code == 422
)

def test_runtime_error_returns_503(self, client, general_payload, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)

async def _raise(*a, **kw):
raise RuntimeError("crypto unavailable")

with patch.object(ep.A2AVerificationInterceptor, "intercept", new=_raise):
assert (
client.post("/a2a/intercept", json=general_payload).status_code == 503
)

def test_unexpected_error_returns_500(self, client, general_payload, monkeypatch):
monkeypatch.delenv("QWED_A2A_TRUSTED_AGENTS", raising=False)

async def _raise(*a, **kw):
raise ValueError("unexpected boom")

with patch.object(ep.A2AVerificationInterceptor, "intercept", new=_raise):
assert (
client.post("/a2a/intercept", json=general_payload).status_code == 500
)
32 changes: 28 additions & 4 deletions tests/test_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ async def test_blocked_sender_rejected(self, interceptor, general_message):
assert verdict.status == VerdictStatus.BLOCKED
assert "trust boundary" in verdict.reason.lower()

async def test_trusted_agent_bypass(self, crypto_service, trust_boundary, general_message):
"""Trusted agents should bypass verification."""
async def test_trusted_agent_no_longer_bypasses_verification(
self, crypto_service, trust_boundary, general_message
):
"""Trusted agents should still route through verification engines."""
config = InterceptorConfig(
trusted_agents=[general_message.sender_agent_id]
)
Expand All @@ -108,6 +110,28 @@ async def test_trusted_agent_bypass(self, crypto_service, trust_boundary, genera
crypto_service=crypto_service,
trust_boundary=trust_boundary,
)
verdict = await interceptor.intercept(general_message, trace_id="t_trust_bypass")
verdict = await interceptor.intercept(general_message, trace_id="t_trust_no_bypass")
assert verdict.status == VerdictStatus.FORWARDED
assert verdict.engine_used == "bypass"
assert verdict.engine_used == "passthrough"

async def test_trusted_agent_financial_fraud_is_blocked(
self, crypto_service, trust_boundary, hallucinated_financial_message
):
"""Trusted agents are still verified and blocked on financial hallucinations."""
config = InterceptorConfig(
trusted_agents=[hallucinated_financial_message.sender_agent_id]
)
interceptor = A2AVerificationInterceptor(
config=config,
crypto_service=crypto_service,
trust_boundary=trust_boundary,
)

verdict = await interceptor.intercept(
hallucinated_financial_message, trace_id="t_trust_fin_fraud"
)

assert verdict.status == VerdictStatus.BLOCKED
assert verdict.engine_used == "finance_guard"
assert verdict.reason is not None
assert "hallucination" in verdict.reason.lower()
Loading