Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ RUN pip install --no-cache-dir uv

# Create a non-root user and group
RUN groupadd -r appgroup && useradd --no-log-init -r -g appgroup -d /app -s /sbin/nologin appuser

RUN mkdir -p /data && chown -R appuser:appgroup /data
WORKDIR /app

# Copy dependency definition files and your submodule source code
Expand Down
8 changes: 5 additions & 3 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "vqlforge"
version = "0.2"
version = "0.25"
description = "VQLForge Backend"
readme = "README.md"
requires-python = ">=3.12"
Expand All @@ -18,7 +18,6 @@ dependencies = [
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-fastapi-client>=0.6.0",
"pytest-mock>=3.12.0",
]

Expand All @@ -28,4 +27,7 @@ build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src"]
[tool.hatch.metadata]
allow-direct-references = true
allow-direct-references = true

[tool.hatch.paths]
sqlglot = "src/sqlglot-vql"
120 changes: 108 additions & 12 deletions backend/src/api/forge.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,93 @@
"""
API routes for the VQL Forge agentic process.

This module provides a FastAPI endpoint for converting SQL to VQL in an
iterative, agent-like process. It uses Server-Sent Events (SSE) to stream
progress back to the client.
"""

import logging
import json
from fastapi import APIRouter, HTTPException, Request
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

from src.schemas.validation import VqlValidationApiResponse
from src.schemas.agent import AgenticModeRequest, AgenticModeResponse, AgentStep
from src.schemas.validation import VqlValidateRequest
from src.services.translation_service import run_translation
from src.services.validation_service import run_validation
from src.utils.ai_analyzer import explain_vql_differences
from src.config import settings

logger = logging.getLogger(__name__)
router = APIRouter()


async def format_sse(data: dict, event: str | None = None) -> str:
"""Formats a dictionary into an SSE message string."""
"""Format a dictionary into a Server-Sent Event (SSE) message string.

Args:
data: The dictionary payload to send.
event: An optional event name for the SSE message.

Returns:
A string formatted as a compliant SSE message.
"""
payload = json.dumps(data)
if event:
return f"event: {event}\ndata: {payload}\n\n"
return f"data: {payload}\n\n"


@router.post("/forge", tags=["VQL Forge"])
async def agentic_sql_to_vql_forge_stream(request: AgenticModeRequest):
def format_explanation_as_markdown(explanation: str) -> str:
"""Format the AI explanation into better structured markdown.

This function enhances a plain text explanation by attempting to structure
it as markdown. If markdown markers like '##' or '-' are already present,
it returns the original string to avoid double formatting.

Args:
explanation: The raw explanation string from the AI.

Returns:
A markdown-formatted explanation string.
"""
Handles the agentic SQL-to-VQL process using a streaming response (SSE)
to provide real-time updates of the agent's progress with a limited number of correction loops.
if not explanation:
return explanation

# If the explanation is already well-formatted, return as-is
if any(marker in explanation for marker in ['##', '- ', '* ', '\n- ', '\n* ']):
return explanation

# Otherwise, try to structure it better
lines = explanation.split('\n')
formatted_lines = []

for line in lines:
line = line.strip()
if not line:
formatted_lines.append('')
continue
else:
formatted_lines.append(line)

return '\n'.join(formatted_lines)


@router.post("/forge", tags=["VQL Forge"])
async def agentic_sql_to_vql_forge_stream(request: AgenticModeRequest,) -> StreamingResponse:
"""Handle the agentic SQL-to-VQL process via a streaming response.

This endpoint uses Server-Sent Events (SSE) to provide real-time updates
of the agent's progress. The process involves an initial translation,
followed by a series of validation and correction loops until the VQL is
valid or the maximum number of attempts is reached.

Args:
request: The request containing the SQL query, its dialect, and the VDB.

Returns:
A StreamingResponse that sends SSE events to the client.
"""
async def event_generator():
process_log: list[AgentStep] = []
Expand Down Expand Up @@ -59,17 +122,43 @@ async def event_generator():
loop_count = i + 1

# Validation Step
validation_step_name = "Validate" if i == 0 else f"Re-Validate (Attempt {loop_count})"
validation_step_name = "Validate" if i == 0 else f"Re-Validate (Step {loop_count})"
validation_step = AgentStep(step_name=validation_step_name,
details=f"Validating VQL (Attempt {loop_count})...", success=True)
details=f"Validating VQL (Step {loop_count})...", success=True)
process_log.append(validation_step)
yield await format_sse(validation_step.model_dump(), event="step")

validation_result = await run_validation(current_vql)
validation_request: VqlValidateRequest = VqlValidateRequest(
sql=request.sql, vql=current_vql, vdb=request.vdb, dialect=request.dialect)
validation_result: VqlValidationApiResponse = await run_validation(validation_request)

if validation_result.validated:
validation_step.details = "Validation successful."
yield await format_sse(validation_step.model_dump(), event="step")

# Explain Differences
explain_step = AgentStep(
step_name="Explain",
details="Analyzing differences between source SQL and final VQL...",
success=True
)
process_log.append(explain_step)
yield await format_sse(explain_step.model_dump(), event="step")

raw_explanation = await explain_vql_differences(
source_sql=request.sql,
source_dialect=request.dialect,
final_vql=current_vql
)

# Format the explanation with better structure
formatted_explanation = format_explanation_as_markdown(raw_explanation)

final_explanation = f"## Key Differences Between Source SQL and Final VQL\n\n{formatted_explanation}"

explain_step.details = final_explanation
yield await format_sse(explain_step.model_dump(), event="step")

final_success_result = AgenticModeResponse(
final_vql=current_vql, is_valid=True, process_log=process_log,
final_message="Agentic process complete. The VQL is valid."
Expand Down Expand Up @@ -106,16 +195,23 @@ async def event_generator():

# AI Analysis & Correction Step
analysis_step = AgentStep(
step_name=f"Analyze (Attempt {loop_count})", details="AI is analyzing the error to find a correction...", success=True)
step_name=f"Analyze (Step {loop_count})",
details="AI is analyzing the error to find a correction...",
success=True
)
process_log.append(analysis_step)
yield await format_sse(analysis_step.model_dump(), event="step")

correction_step = AgentStep(
step_name=f"Correct (Attempt {loop_count})", details="AI provided a corrected VQL.", success=True, output=error_analysis.sql_suggestion)
step_name=f"Correct (Step {loop_count})",
details="AI provided a corrected VQL.",
success=True,
output=error_analysis.sql_suggestion
)
process_log.append(correction_step)
yield await format_sse(correction_step.model_dump(), event="step")

current_vql = error_analysis.sql_suggestion # Update VQL for the next loop
current_vql = error_analysis.sql_suggestion

except Exception as e:
logger.error(f"Error during agentic stream: {e}", exc_info=True)
Expand Down
153 changes: 153 additions & 0 deletions backend/src/api/log_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
API endpoints for logging and retrieving validated SQL-to-VQL query pairs.

This module provides routes to log accepted queries and to retrieve those logs
based on various filtering criteria, such as the tables they reference.
"""

import json
import logging
from typing import Any, Dict, List

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, func, column, String
from sqlalchemy.orm import Session
from sqlalchemy.sql.selectable import LateralFromClause
from sqlglot import exp, parse_one

# Assuming these schemas are defined in the specified paths
from src.schemas.db_log import (
AcceptedQuery,
AcceptedQueryLogListResponse,
AcceptedQueryLogRequest,
)
from src.db.sqlite_session import get_sqlite_session
logger = logging.getLogger(__name__)
router = APIRouter()


@router.post("/log/accepted", status_code=201, tags=["Logging"])
async def log_accepted_query(request: AcceptedQueryLogRequest,
db: Session = Depends(get_sqlite_session)) -> dict[str, Any]:
"""Log a successfully validated and accepted SQL-to-VQL pair.

This endpoint uses sqlglot to parse the source SQL, extracts the table
names, and stores the entire entry in the database.

Args:
request: The request body containing the source SQL, dialect, and target VQL.
db: The SQLAlchemy database session, injected as a dependency.

Raises:
HTTPException: A 500 error if the database write operation fails.

Returns:
A confirmation message and the ID of the newly created log entry.
"""
try:
source_tables = json.dumps([table.name for table in parse_one(request.source_sql).find_all(exp.Table)])

db_log_entry = AcceptedQuery(
source_sql=request.source_sql,
source_dialect=request.source_dialect,
target_vql=request.target_vql,
tables=source_tables
)
db.add(db_log_entry)
db.commit()
db.refresh(db_log_entry)
logger.info(f"Successfully logged accepted query ID: {db_log_entry.id}")
return {"message": "Log entry created successfully.", "id": db_log_entry.id}
except Exception as e:
logger.error(f"Failed to log accepted query: {e}", exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail=f"Failed to write log to database: {str(e)}")


@router.get("/log/all", response_model=AcceptedQueryLogListResponse, tags=["Logging"])
async def get_all_logs(db: Session = Depends(get_sqlite_session)) -> AcceptedQueryLogListResponse:
"""Retrieve all accepted query logs from the database.
The logs are ordered by timestamp, with the most recent entries first.

Args:
db: The SQLAlchemy database session, injected as a dependency.

Raises:
HTTPException: A 500 error if the database read operation fails.

Returns:
A response object containing a list of all log entries.
"""
try:
logs: List[AcceptedQuery] = db.query(AcceptedQuery).order_by(AcceptedQuery.timestamp.desc()).all()
return AcceptedQueryLogListResponse(results=logs)
except Exception as e:
logger.error(f"Failed to fetch all logs: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to retrieve logs from the database.")
finally:
db.close()


@router.get("/log/filter", response_model=AcceptedQueryLogListResponse, tags=["Logging"])
async def get_logs_by_tables(
tables: List[str] = Query(
...,
description="A list of table names to filter by.",
example=["customers", "orders"],
),
db: Session = Depends(get_sqlite_session),
) -> AcceptedQueryLogListResponse:
"""Retrieve logs by tables using a correctly formed lateral subquery.

This function filters records where the 'tables' JSON array contains any of
the specified table names. It works by creating a LATERAL subquery that
unnests the JSON array for each row and checks for a match. This is the
idiomatic SQLAlchemy approach for this type of query against SQLite.

Args:
tables: A list of table names provided as query parameters.
db: The SQLAlchemy database session.

Raises:
HTTPException: 400 if 'tables' is empty.
HTTPException: 500 if the database query fails.

Returns:
A response object containing matching log entries.
"""
if not tables:
raise HTTPException(
status_code=400, detail="The 'tables' query parameter cannot be empty."
)

try:
# 1. Explicitly define the output columns of the `json_each` function.
# We only need the 'value' column, which contains the table names.
json_table_def = func.json_each(AcceptedQuery.tables).table_valued(
column("value", String),
name="json_table"
)

# 2. Create the correlated EXISTS subquery.
# Because we defined the columns, accessing `.c.value` now works
# correctly and returns a Column object with the `.in_()` method.
subquery = (
select(1)
.where(json_table_def.c.value.in_(tables))
).exists()

# 3. Apply the filter to the main query.
query = (
db.query(AcceptedQuery)
.filter(subquery)
.order_by(AcceptedQuery.timestamp.desc())
.limit(10)
)

logs: List[AcceptedQuery] = query.all()
return AcceptedQueryLogListResponse(results=logs)
except Exception as e:
logger.error(f"Failed to fetch logs by tables: {e}", exc_info=True)
raise HTTPException(
status_code=500, detail="Failed to retrieve logs from the database."
)
3 changes: 2 additions & 1 deletion backend/src/api/router.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from fastapi import APIRouter
from src.api import health, translate, validate, vdb_list, forge
from src.api import health, translate, validate, vdb_list, forge, log_recorder

api_router = APIRouter()
api_router.include_router(health.router) # /health
api_router.include_router(translate.router) # /translate
api_router.include_router(validate.router) # /validate
api_router.include_router(vdb_list.router) # /vdbs
api_router.include_router(forge.router) # /forge
api_router.include_router(log_recorder.router) # /log
Loading