Skip to content

Conversation

@Spartan1-1-7
Copy link

Add LangGraph APort Integration

This PR adds secure checkpoint verification for LangGraph workflows using APort agent identity system. The integration provides checkpoint-level security for state transitions with support for multiple policies and graceful fallback handling.

Main files include checkpoint_guard.py for the core integration, example workflows in the workflows directory, showcase.py for interactive demonstration, and a complete test suite. The implementation allows developers to protect any LangGraph workflow node with APort verification using a simple decorator pattern.

All tests pass and the code is production ready with comprehensive documentation and deployment guides included.

Copy link
Contributor

@uchibeke uchibeke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall Assessment

Great work on this integration! The decorator pattern is the correct approach for pre-act verification in LangGraph workflows. However, there are critical issues with the API integration that need to be fixed before this can be merged.


✅ What Works Well

  1. Correct Pattern: The decorator-based approach (@guard.require_verification()) aligns perfectly with APort's pre-act verification model
  2. Node-Level Authorization: Intercepting execution before nodes run is exactly what's needed
  3. Comprehensive Documentation: Excellent README, deployment guide, and examples
  4. Test Coverage: Good test suite structure
  5. Error Handling: Proper fallback strategies with strict/non-strict modes

🚨 Critical Issues to Fix

1. Incorrect API Endpoint Usage

Problem: The mock implementation uses a generic verify() method that doesn't match the actual APort API.

Current Implementation (in src/client.py):

# Mock SDK
async def verify(self, policy: str, agent_id: str, context: Dict[str, Any] = None):
    # This doesn't match the real APort API!

Correct Implementation (based on official SDKs):

The APort API uses: POST /api/verify/policy/{policy_id}

Request body:

{
  "agent_id": "agt_user_123",
  "context": {
    "amount": 100,
    "currency": "USD"
  },
  "idempotency_key": "optional-key"
}

Response:

{
  "decision_id": "dec_xxx",
  "allow": true,
  "reasons": [],
  "expires_in": 60,
  "created_at": "2023-01-01T00:00:00Z"
}

Required Changes:

  1. Update MockAPortSDK class in src/client.py:
class MockAPortSDK:
    """Mock APort SDK matching real API structure."""

    def __init__(self, api_key: str, base_url: str = "https://api.aport.io"):
        self.api_key = api_key
        self.base_url = base_url.rstrip('/')
        logger.info(f"[MOCK] Initialized APort client with base URL: {base_url}")

    async def verify_policy(
        self,
        agent_id: str,
        policy_id: str,
        context: Dict[str, Any] = None,
        idempotency_key: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Mock policy verification matching real APort API.
        Real endpoint: POST /api/verify/policy/{policy_id}
        """
        logger.info(f"[MOCK] POST {self.base_url}/api/verify/policy/{policy_id}")
        logger.info(f"[MOCK] Agent: {agent_id}, Context: {context}")

        # Simulate API delay
        await asyncio.sleep(0.05)

        # Mock logic: deny agents ending in "_denied"
        allow = not agent_id.endswith("_denied")

        return {
            "decision_id": f"dec_mock_{hash(agent_id + policy_id) % 10000}",
            "allow": allow,
            "reasons": [] if allow else [{
                "code": "MOCK_DENIAL",
                "message": f"Mock denial for agent {agent_id}",
                "severity": "error"
            }],
            "expires_in": 60,
            "created_at": datetime.utcnow().isoformat() + "Z"
        }
  1. Update APortClient.verify_checkpoint() method:
async def verify_checkpoint(
    self,
    policy: str,
    agent_id: str,
    checkpoint_id: str,
    state: Dict[str, Any],
    context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Verify agent authorization for a checkpoint transition."""
    try:
        # Build verification context
        verification_context = {
            "checkpoint_id": checkpoint_id,
            "state_keys": list(state.keys()) if state else [],
            "timestamp": datetime.utcnow().isoformat(),
            **(context or {})
        }

        logger.info(f"Verifying checkpoint {checkpoint_id} for agent {agent_id}")

        # Call APort policy verification API
        result = await self.client.verify_policy(
            agent_id=agent_id,
            policy_id=policy,
            context=verification_context
        )

        # Check decision
        if not result.get("allow", False):
            reasons = result.get("reasons", [])
            raise VerificationError(
                f"Agent {agent_id} verification failed for checkpoint {checkpoint_id}",
                details={"reasons": reasons, "decision_id": result.get("decision_id")},
                agent_id=agent_id
            )

        logger.info(f"Checkpoint {checkpoint_id} verification successful for agent {agent_id}")

        return {
            "verified": True,
            "agent_id": agent_id,
            "policy": policy,
            "checkpoint_id": checkpoint_id,
            "decision_id": result.get("decision_id"),
            "expires_in": result.get("expires_in"),
            "created_at": result.get("created_at")
        }

    except VerificationError:
        raise
    except Exception as e:
        logger.error(f"Checkpoint verification error: {e}")
        raise APortError(f"Checkpoint verification failed: {str(e)}")
  1. Add real SDK integration (when use_mock=False):
def __init__(self, api_key: Optional[str] = None, ...):
    # ... existing code ...

    if not self.use_mock:
        try:
            # Use real Python SDK (when available)
            import requests
            self.client = RealAPortClient(self.api_key, self.base_url)
        except ImportError:
            logger.warning("Using mock client - install real SDK for production")
            self.client = MockAPortSDK(self.api_key, self.base_url)

class RealAPortClient:
    """Real APort API client implementation."""

    def __init__(self, api_key: str, base_url: str = "https://api.aport.io"):
        self.api_key = api_key
        self.base_url = base_url.rstrip('/')
        self.headers = {
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Bearer {api_key}",
            "User-Agent": "aport-langgraph-integration/0.1.0"
        }

    async def verify_policy(
        self,
        agent_id: str,
        policy_id: str,
        context: Dict[str, Any] = None,
        idempotency_key: Optional[str] = None
    ) -> Dict[str, Any]:
        """Call real APort policy verification API."""
        url = f"{self.base_url}/api/verify/policy/{policy_id}"

        headers = self.headers.copy()
        if idempotency_key:
            headers["Idempotency-Key"] = idempotency_key

        payload = {
            "agent_id": agent_id,
            "context": context or {},
        }
        if idempotency_key:
            payload["idempotency_key"] = idempotency_key

        try:
            import aiohttp
            async with aiohttp.ClientSession() as session:
                async with session.post(url, json=payload, headers=headers, timeout=5) as response:
                    result = await response.json()

                    if response.status >= 400:
                        raise APortError(
                            f"APort API error: {result.get('message', 'Unknown error')}",
                            status_code=response.status,
                            details=result
                        )

                    return result
        except aiohttp.ClientError as e:
            raise APortError(f"Network error: {str(e)}")

2. Use Standard Policy Pack IDs

Problem: The PR uses custom policy IDs like workflow.basic.v1 that don't exist in the APort policy registry.

Standard Policy Packs (from aport-policies repo):

  • payments.refund.v1 - Refund authorization
  • payments.charge.v1 - Payment processing
  • data.export.v1 - Data export controls
  • messaging.v1 - Messaging rate limits
  • repo.v1 - Repository operations

Update Examples:

# ❌ Current (non-existent policies)
guard = APortCheckpointGuard(
    api_key="demo_api_key",
    default_policy="workflow.basic.v1",  # Doesn't exist!
)

@guard.require_verification(policy="workflow.process.v1")
async def process_task_node(state, config=None):
    ...

# ✅ Corrected (use real policies)
guard = APortCheckpointGuard(
    api_key="demo_api_key",
    default_policy="data.export.v1",  # Real policy
)

@guard.require_verification(policy="data.export.v1")
async def export_data_node(state, config=None):
    # Export user data after verification
    ...

@guard.require_verification(policy="payments.refund.v1")
async def process_refund_node(state, config=None):
    # Process refund after verification
    ...

Update workflows in:

  • workflows/basic_workflow.py
  • workflows/multi_stage_workflow.py
  • workflows/error_handling.py
  • showcase.py

3. Add Proper Context for Policy Evaluation

Problem: Current context is too generic. Real policies need specific fields.

Example: For payments.refund.v1, the context should include:

@guard.require_verification(
    policy="payments.refund.v1",
    agent_id_extractor=lambda state: state.get("agent_id")
)
async def process_refund_node(state, config=None):
    """Process refund with proper context."""
    # Extract refund details from state
    context = {
        "amount": state.get("refund_amount"),
        "currency": state.get("currency", "USD"),
        "order_id": state.get("order_id"),
        "reason": state.get("refund_reason")
    }

    # Verification happens automatically with this context
    return {"status": "refunded"}

The guard's require_verification decorator should pass state-specific context:

# In checkpoint_guard.py
async def wrapper(state: Dict[str, Any], config: Optional[RunnableConfig] = None):
    # ... agent ID extraction ...

    # Extract context from state for policy evaluation
    verification_context = {
        "checkpoint_id": f"node_{node_name}",
        "node_name": node_name,
        # Include relevant state data for policy evaluation
        **{k: v for k, v in state.items() if k not in ["_aport_verification", "agent_id"]}
    }

    result = await self.client.verify_checkpoint(
        policy=verification_policy,
        agent_id=agent_id,
        checkpoint_id=f"node_{node_name}",
        state=state,
        context=verification_context  # Pass full context
    )

4. Fix Terminology in Documentation

Problem: Documentation calls this "checkpoint integration" but it's actually "node authorization".

Update README.md:

# LangGraph APort Integration

A comprehensive integration that adds **APort pre-act verification** to LangGraph
workflow nodes, enabling secure and policy-driven state transitions in AI agent workflows.

## Overview

This integration allows you to protect LangGraph state machine nodes with APort's
agent identity verification system. It provides **node-level security** where each
node execution can be verified against policies before running.

### What This Is

-**Pre-act verification guards** for LangGraph nodes
-**Policy-based authorization** before state transitions
-**Runtime checks** using APort agent passports

### What This Is NOT

- ❌ Not a LangGraph checkpoint saver implementation (BaseCheckpointSaver)
- ❌ Not a persistence layer
- ❌ Not state storage

The term "checkpoint" in this integration refers to **decision points** in the
workflow where verification occurs, not LangGraph's checkpoint persistence system.

📋 Testing Requirements

Before merging, please ensure:

  1. Mock tests pass with updated API structure
  2. Integration tests use real policy IDs
  3. Add test for API error handling:
async def test_policy_violation():
    """Test that policy violations are properly handled."""
    guard = APortCheckpointGuard(api_key="test", use_mock=True)

    # Agent with _denied suffix should fail
    result = await guard.client.verify_policy(
        agent_id="agt_user_denied",
        policy_id="payments.refund.v1",
        context={"amount": 1000, "currency": "USD"}
    )

    assert result["allow"] == False
    assert len(result["reasons"]) > 0

📚 References

Official APort SDK Examples:

Key API Endpoint:

POST /api/verify/policy/{policy_id}
Headers: Authorization: Bearer {api_key}
Body: { "agent_id": "...", "context": {...} }
Response: { "decision_id": "...", "allow": true/false, "reasons": [...] }

Standard Policy Packs:


✅ Action Items

  • Update MockAPortSDK.verify()verify_policy() with correct signature
  • Add RealAPortClient class for production use
  • Update APortClient.verify_checkpoint() to use verify_policy()
  • Replace custom policy IDs with standard policy packs
  • Update all workflow examples to use real policies
  • Fix terminology: "checkpoint integration" → "node authorization"
  • Add context extraction for policy-specific fields
  • Update tests to match new API structure
  • Add requirements.txt entry: aiohttp>=3.9.0

💡 After These Changes

Once fixed, this will be an excellent example of:

  1. ✅ Pre-act verification for LangGraph workflows
  2. ✅ Proper use of APort policy packs
  3. ✅ Runtime authorization before sensitive operations

The decorator pattern is perfect for this use case! Just need to align with the actual APort API.


Great work overall! These changes will make this production-ready and a valuable addition to the aport-integrations repository. 🚀

@Spartan1-1-7
Copy link
Author

hi @uchibeke , i fix LangGraph APort Integration API Structure and Policies

  • Updated MockAPortSDK to use verify_policy() method matching real APort API (POST /api/verify/policy/{policy_id})
  • Replaced fake policies with real ones: payments.refund.v1, data.export.v1, messaging.v1, admin.access.v1
  • Added RealAPortClient for production deployment with aiohttp
  • Added policy-specific context for each verification
  • Updated all examples and documentation

Please review it when you are able to and if any further changes are required or any more issues are found, i will be happy to work on it.
Thank You

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants