Soliplex Ingester uses SQLModel (built on SQLAlchemy) for database modeling with async support. The system supports both SQLite (development) and PostgreSQL (production).
Database models defined in: src/soliplex/ingester/lib/models.py
Set via environment variable:
DOC_DB_URL="sqlite+aiosqlite:///./db/documents.db"
# or
DOC_DB_URL="postgresql+asyncpg://user:pass@localhost/soliplex"The Database class manages engine lifecycle and session creation with automatic connection pooling.
from soliplex.ingester.lib.models import Database
# Initialize once at application startup
await Database.initialize()
# Or with custom URL (for testing)
await Database.initialize("sqlite+aiosqlite:///:memory:")
# Get sessions anywhere in the app
async with Database.session() as session:
result = await session.exec(select(Document))
# Transaction auto-commits on success, rollback on exception
# Cleanup at shutdown
await Database.close()
# Reset and reinitialize (primarily for testing)
await Database.reset(url)from soliplex.ingester.lib.models import get_session, get_engine
async with get_session() as session:
result = await session.exec(select(Document))
engine = await get_engine()Represents a batch of documents ingested together.
Table: documentbatch
Fields:
id(int, primary key) - Auto-increment batch IDname(str) - Human-readable batch namesource(str) - Source system identifierstart_date(datetime) - When batch processing startedcompleted_date(datetime, nullable) - When batch completedbatch_params(dict[str, str]) - JSON metadata
Computed Fields:
duration(float) - Processing time in seconds (None if not completed)
Example:
{
"id": 1,
"name": "Q4 Financial Reports",
"source": "sharepoint",
"start_date": "2025-01-15T10:00:00",
"completed_date": "2025-01-15T12:30:00",
"batch_params": {"department": "finance"},
"duration": 9000.0
}Represents a unique document identified by content hash.
Table: document
Fields:
hash(str, primary key) - SHA256 content hash (format: "sha256-...")mime_type(str) - Document MIME typefile_size(int, nullable) - Size in bytesdoc_meta(dict[str, str]) - JSON metadata
Relationships:
- Multiple
DocumentURIrecords can reference the same document
Deduplication: Documents are deduplicated by hash. If the same file is ingested multiple times, only one Document record exists.
Example:
{
"hash": "sha256-a1b2c3d4e5f6...",
"mime_type": "application/pdf",
"file_size": 1024000,
"doc_meta": {
"author": "John Doe",
"title": "Q4 Report"
}
}Maps source URIs to document hashes, allowing multiple URIs to reference the same document.
Table: documenturi
Fields:
id(int, primary key) - Auto-increment IDdoc_hash(str, foreign key) - Referencesdocument.hashuri(str) - Source system path/identifiersource(str) - Source system nameversion(int) - Version number (increments on changes)batch_id(int, foreign key, nullable) - Associated batch
Constraints:
- Unique constraint on
(uri, source)- One active URI per source
Use Cases:
- Track document locations across source systems
- Detect when a document at a URI has changed (hash mismatch)
- Support document versioning
Example:
{
"id": 42,
"doc_hash": "sha256-a1b2c3d4e5f6...",
"uri": "/sharepoint/finance/q4-report.pdf",
"source": "sharepoint",
"version": 2,
"batch_id": 1
}Tracks historical versions of documents at specific URIs.
Table: documenturihistory
Fields:
id(int, primary key) - Auto-increment IDdoc_uri_id(int, foreign key) - Referencesdocumenturi.idversion(int) - Version numberhash(str) - Document hash at this versionprocess_date(datetime) - When this version was processedaction(str) - Action taken ("created", "updated", "deleted")batch_id(int, foreign key, nullable) - Associated batchhistmeta(dict[str, str]) - JSON metadata
Use Cases:
- Audit trail of document changes
- Rollback to previous versions
- Compliance and record-keeping
Example:
{
"id": 100,
"doc_uri_id": 42,
"version": 1,
"hash": "sha256-old-hash...",
"process_date": "2025-01-10T10:00:00",
"action": "created",
"batch_id": 1,
"histmeta": {"user": "importer"}
}Stores raw file bytes and artifacts in the database.
Table: documentbytes
Fields:
hash(str, primary key) - Document hashartifact_type(str, primary key) - Type of artifactstorage_root(str, primary key) - Storage location identifierfile_size(int, nullable) - Size in bytes (auto-computed from file_bytes)file_bytes(bytes) - Raw binary data
Artifact Types:
document- Raw documentparsed_markdown- Extracted markdownparsed_json- Structured JSONchunks- Text chunksembeddings- Vector embeddingsrag- RAG metadata
Note: For production, consider using file storage instead of database storage for large binaries.
Example:
{
"hash": "sha256-a1b2c3d4e5f6...",
"artifact_type": "parsed_markdown",
"storage_root": "db",
"file_size": 50000,
"file_bytes": "..."
}Groups related workflow runs together.
Table: rungroup
Fields:
id(int, primary key) - Auto-increment IDname(str, nullable) - Optional group nameworkflow_definition_id(str) - Workflow usedparam_definition_id(str) - Parameter set usedbatch_id(int, foreign key, nullable) - Associated batchcreated_date(datetime) - When group was createdstart_date(datetime) - When first run startedcompleted_date(datetime, nullable) - When all runs completedstatus(RunStatus) - Overall group statusstatus_date(datetime, nullable) - When status last changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadata
Relationships:
- Has many
WorkflowRunrecords - Has many
LifecycleHistoryrecords
Example:
{
"id": 5,
"name": "Batch 1 Processing",
"workflow_definition_id": "batch",
"param_definition_id": "default",
"batch_id": 1,
"created_date": "2025-01-15T10:00:00",
"start_date": "2025-01-15T10:01:00",
"completed_date": null,
"status": "RUNNING",
"status_date": "2025-01-15T10:30:00",
"status_message": "Processing documents",
"status_meta": {}
}Represents a single workflow execution for one document.
Table: workflowrun
Fields:
id(int, primary key) - Auto-increment IDworkflow_definition_id(str) - Workflow definition IDrun_group_id(int, foreign key) - Parent groupbatch_id(int, foreign key) - Associated batchdoc_id(str) - Document hash being processedpriority(int) - Processing priority (higher = more urgent)created_date(datetime) - When run was createdstart_date(datetime) - When first step startedcompleted_date(datetime, nullable) - When all steps completedstatus(RunStatus) - Current statusstatus_date(datetime, nullable) - When status last changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadatarun_params(dict[str, str|int|bool]) - Runtime parameters
Computed Fields:
duration(float) - Processing time in seconds (None if not completed)
Relationships:
- Has many
RunSteprecords - Belongs to
RunGroup - References
Documentviadoc_id
Example:
{
"id": 100,
"workflow_definition_id": "batch",
"run_group_id": 5,
"batch_id": 1,
"doc_id": "sha256-a1b2c3d4e5f6...",
"priority": 0,
"created_date": "2025-01-15T10:00:00",
"start_date": "2025-01-15T10:01:00",
"completed_date": null,
"status": "RUNNING",
"status_date": "2025-01-15T10:05:00",
"status_message": "Processing step 3 of 5",
"status_meta": {},
"run_params": {},
"duration": null
}Represents one step within a workflow run.
Table: runstep
Fields:
id(int, primary key) - Auto-increment IDworkflow_run_id(int, foreign key) - Parent workflow runworkflow_step_number(int) - Step sequence numberworkflow_step_name(str) - Step name/identifierstep_config_id(int, foreign key) - Configuration usedstep_type(WorkflowStepType) - Type of stepis_last_step(bool) - Whether this is the final stepcreated_date(datetime) - When step was createdpriority(int) - Processing prioritystart_date(datetime, nullable) - When step started executingstatus_date(datetime, nullable) - When status last changedcompleted_date(datetime, nullable) - When step completedretry(int) - Current retry attempt (0-indexed)retries(int) - Maximum retry attemptsstatus(RunStatus) - Current statusstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadataworker_id(str, nullable) - Worker processing this steplease_token(str, nullable) - Per-claim audit token (UUID). Set byclaim_next_step, cleared on terminal status. All terminal writes (complete_step/error_step/release_step) are gated on this matching the holder, so a worker reaped between claim and write cannot double-finalize a step that has already been re-claimed.resource_key(str, nullable, indexed) - Optional declarative cross-subsystem lock key (typicallyrag:<abs-db-path>). Stamped on STORE-type steps at run-creation time. The claim layer skips a step whose key is held by a liveResourceLockrow, and the worker acquires the lock for the duration of execution.
Computed Fields:
duration(float) - Execution time in seconds (None if not completed)
Relationships:
- Belongs to
WorkflowRun - References
StepConfig
Example:
{
"id": 500,
"workflow_run_id": 100,
"workflow_step_number": 2,
"workflow_step_name": "parse",
"step_config_id": 10,
"step_type": "parse",
"is_last_step": false,
"created_date": "2025-01-15T10:01:00",
"priority": 0,
"start_date": "2025-01-15T10:02:00",
"status_date": "2025-01-15T10:05:00",
"completed_date": null,
"retry": 0,
"retries": 1,
"status": "RUNNING",
"status_message": "Parsing with Docling",
"status_meta": {},
"worker_id": "worker-abc-123",
"duration": null
}Stores step configuration for reuse and tracking.
Table: stepconfig
Fields:
id(int, primary key) - Auto-increment IDcreated_date(datetime, nullable) - When config was createdstep_type(WorkflowStepType) - Type of stepconfig_json(dict[str, str|int|bool], nullable) - Step parameterscuml_config_json(str, nullable) - Cumulative config from previous steps
Use Cases:
- Deduplicate identical configurations
- Track which configuration was used for each run
- Audit changes to processing parameters
Example:
{
"id": 10,
"created_date": "2025-01-15T09:00:00",
"step_type": "parse",
"config_json": {
"format": "markdown",
"ocr_enabled": true
},
"cuml_config_json": "{\"validate\":{...},\"parse\":{...}}"
}Represents a complete parameter set configuration.
Table: configset
Fields:
id(int, primary key) - Auto-increment IDyaml_id(str) - Parameter set ID from YAMLyaml_contents(str) - Full YAML contentscreated_date(datetime, nullable) - When loaded
Relationships:
- Has many
ConfigSetItemrecords (junction table) - Links to multiple
StepConfigrecords
Use Cases:
- Track which parameter sets were used
- Reproduce exact configurations
- Version control for processing parameters
Junction table linking config sets to step configs.
Table: configsetitem
Fields:
config_set_id(int, primary key, foreign key) - Referencesconfigset.idconfig_id(int, primary key, foreign key) - Referencesstepconfig.id
Tracks lifecycle events during workflow execution.
Table: lifecyclehistory
Fields:
id(int, primary key) - Auto-increment IDevent(LifeCycleEvent) - Type of eventhandler_name(str, nullable) - Name of the handler processing the eventrun_group_id(int, foreign key) - Associated run groupworkflow_run_id(int, foreign key) - Associated workflow runstep_id(int, nullable) - Associated step (if applicable)start_date(datetime) - When event startedcompleted_date(datetime, nullable) - When event completedstatus(RunStatus) - Event statusstatus_date(datetime, nullable) - When status changedstatus_message(str, nullable) - Status descriptionstatus_meta(dict[str, str]) - JSON metadata
Event Types:
group_start/group_enditem_start/item_end/item_failedstep_start/step_end/step_failed
Use Cases:
- Audit trail of workflow execution
- Performance monitoring
- Debugging workflow issues
Tracks worker health and activity.
Table: workercheckin
Fields:
id(str, primary key) - Worker identifierfirst_checkin(datetime) - When worker first registeredlast_checkin(datetime) - Most recent heartbeat
Constraints:
- Unique constraint on
id
Use Cases:
- Monitor active workers
- Detect stale/crashed workers
- Worker load balancing
Example:
{
"id": "worker-abc-123",
"first_checkin": "2025-01-15T10:00:00",
"last_checkin": "2025-01-15T10:30:00"
}Notes:
- A worker that calls
Worker.stop()deletes its own row so peers see the departure without waitingWORKER_CHECKIN_TIMEOUT reap_dead_workers(my_id, threshold)deletes rows older than the threshold, always excluding the caller to prevent the self-reaping race
Cross-subsystem lock rendezvous keyed by an opaque resource_key
(typically a resolved RAG-DB path). All RAG-DB writers — workflow
save_to_rag steps, the web vacuum endpoint, the si-diag CLI,
and end_group lifecycle vacuums — coordinate by acquiring rows
here.
Table: resourcelock
Fields:
resource_key(str, primary key) - Opaque key (e.g.rag:/abs/path/to/db)holder_id(str) - Holder identity (lease token for workers,{kind}:{uuid}for direct callers)holder_kind(ResourceLockKind) -worker,cli,web, orlifecyclestep_id(int, nullable) - Set when held by a worker on behalf of a stepacquired_at(datetime) - When the lock was acquiredexpires_at(datetime, indexed) - TTL boundary; refreshed by holder heartbeatsholder_meta(dict[str, str]) - JSON metadata
Lifecycle:
- Acquired via
operations.acquire_resource_lock(...)— opportunistically sweeps expired rows before attempting insert under the unique primary key - TTL-refreshed via
refresh_resource_lock(...)(workers refresh on heartbeat at half the TTL) - Released via
release_resource_lock(...)(idempotent) orforce_release_resource_lock(...)(audit-logged, used bysi-diag vacuum --force) - Dropped automatically by
complete_step/error_step/release_stepin the same transaction as the step terminal write - Expired rows are swept by
sweep_expired_resource_locks()on a 60-second loop in each worker
Example:
{
"resource_key": "rag:/var/lib/soliplex/lancedb/default",
"holder_id": "f1c2b3a4-...",
"holder_kind": "worker",
"step_id": 1234,
"acquired_at": "2026-04-29T10:00:00",
"expires_at": "2026-04-29T10:05:00",
"holder_meta": {"worker_id": "ab7c..."}
}Workflow and step status values.
class RunStatus(str, Enum):
PENDING = "PENDING" # Not yet started
RUNNING = "RUNNING" # Currently executing
COMPLETED = "COMPLETED" # Finished successfully
ERROR = "ERROR" # Failed but still retrying
FAILED = "FAILED" # Permanently failed
CANCELLED = "CANCELLED" # Cascaded from a sibling step's FAILEDTypes of workflow steps.
class WorkflowStepType(str, Enum):
INGEST = "ingest"
VALIDATE = "validate"
PARSE = "parse"
CHUNK = "chunk"
EMBED = "embed"
STORE = "store"
ENRICH = "enrich"
ROUTE = "route"Types of stored artifacts.
class ArtifactType(Enum):
DOC = "document"
PARSED_MD = "parsed_markdown"
PARSED_JSON = "parsed_json"
CHUNKS = "chunks"
EMBEDDINGS = "embeddings"
RAG = "rag"Workflow lifecycle events.
class LifeCycleEvent(str, Enum):
GROUP_START = "group_start"
GROUP_END = "group_end"
ITEM_START = "item_start"
ITEM_END = "item_end"
ITEM_FAILED = "item_failed"
STEP_START = "step_start"
STEP_END = "step_end"
STEP_FAILED = "step_failed"Identifies the caller holding a ResourceLock row.
class ResourceLockKind(str, Enum):
WORKER = "worker" # Held by a workflow worker for a step
CLI = "cli" # Held by si-cli / si-diag vacuum
WEB = "web" # Held by the /api/v1/lancedb/vacuum endpoint
LIFECYCLE = "lifecycle" # Held by an end_group lifecycle vacuumWorkflow Steps to Artifacts:
- INGEST - DOC
- PARSE - PARSED_MD, PARSED_JSON
- CHUNK - CHUNKS
- EMBED - EMBEDDINGS
- STORE - RAG
DocumentBatch
| (1:N)
DocumentURI --> Document (N:1)
|
DocumentURIHistory
DocumentBatch
| (1:N)
RunGroup
| (1:N)
WorkflowRun --> Document (N:1)
| (1:N)
RunStep --> StepConfig (N:1)
ConfigSet
| (N:M via ConfigSetItem)
StepConfig
RunGroup --> LifecycleHistory (1:N)
WorkflowRun --> LifecycleHistory (1:N)
API response model for document information.
class DocumentInfo(BaseModel):
uri: str | None = None
source: str | None = None
file_size: int | None = None
mime_type: str | None = NoneResponse model for workflow run with optional steps and document info.
class WorkflowRunWithDetails(BaseModel):
workflow_run: WorkflowRun
steps: list[RunStep] | None = None
document_info: DocumentInfo | None = NoneGeneric paginated response model.
class PaginatedResponse[T](BaseModel):
items: list[T]
total: int
page: int
rows_per_page: int
total_pages: intsi-cli db-initThis creates tables and runs migrations.
alembic upgrade headfrom soliplex.ingester.lib.models import Database
# Initialize with default URL from settings
await Database.initialize()
# Or with custom URL
await Database.initialize("sqlite+aiosqlite:///:memory:")Cascading deletion function for run groups and all dependent records.
Location: src/soliplex/ingester/lib/wf/operations.py
Signature:
async def delete_run_group(run_group_id: int) -> dict[str, int]Database Compatibility:
- SQLite (via aiosqlite)
- PostgreSQL (via asyncpg)
Behavior:
- Verifies the RunGroup exists (raises
NotFoundErrorif not found) - Retrieves all WorkflowRun IDs for the RunGroup
- Deletes all RunStep records for those WorkflowRuns
- Deletes all LifecycleHistory records (for both RunGroup and WorkflowRuns)
- Deletes all WorkflowRun records in the group
- Deletes the RunGroup itself
- Returns deletion statistics
All operations occur within a single database transaction to ensure atomicity.
Usage:
from soliplex.ingester.lib.wf.operations import delete_run_group, NotFoundError
# Delete a run group and all dependent records
result = await delete_run_group(run_group_id=5)
print(f"Deleted {result['deleted_rungroups']} run group(s)")
print(f"Deleted {result['deleted_workflowruns']} workflow run(s)")
print(f"Deleted {result['deleted_runsteps']} run step(s)")
print(f"Deleted {result['deleted_lifecyclehistory']} lifecycle history record(s)")
print(f"Total records deleted: {result['total_deleted']}")Returns:
{
"deleted_runsteps": 150,
"deleted_lifecyclehistory": 45,
"deleted_workflowruns": 10,
"deleted_rungroups": 1,
"total_deleted": 206
}Raises:
NotFoundError- If the RunGroup with the specified ID does not exist
Cascading deletion function for DocumentURI and all dependent records.
Location: src/soliplex/ingester/lib/operations.py
Signature:
async def delete_document_uri_by_uri(uri: str, source: str) -> dict[str, int]Behavior:
- Finds the DocumentURI by
uriandsource - Counts how many DocumentURIs reference the same document hash
- If only one URI references the document (cascade delete):
- Deletes all RunStep records for WorkflowRuns with this doc_id
- Deletes all LifecycleHistory records for those WorkflowRuns
- Deletes all WorkflowRun records with this doc_id
- Deletes all DocumentBytes artifacts for this hash
- Deletes file artifacts from storage
- Deletes the DocumentURIHistory records
- Deletes the DocumentURI record
- Deletes the Document record
- If multiple URIs reference the document (preserve document):
- Deletes only the DocumentURIHistory records for this URI
- Deletes only the DocumentURI record
- Preserves the Document and all workflow-related records
Returns:
{
"deleted_document_uris": 1,
"deleted_uri_history": 3,
"deleted_documents": 1,
"deleted_workflow_runs": 2,
"deleted_run_steps": 10,
"deleted_lifecycle_history": 6,
"total_deleted": 23
}Usage:
from soliplex.ingester.lib.operations import delete_document_uri_by_uri
from soliplex.ingester.lib.operations import DocumentURINotFoundError
try:
stats = await delete_document_uri_by_uri(
uri="/documents/report.pdf",
source="filesystem"
)
print(f"Total deleted: {stats['total_deleted']}")
except DocumentURINotFoundError as e:
print(f"Error: {e}")Notes:
- All deletions occur within a single transaction
- Works with both SQLite and PostgreSQL
- Raises
DocumentURINotFoundErrorif the URI/source combination does not exist - Used by the
DELETE /api/v1/document/by-uriendpoint
src/soliplex/ingester/migrations/
alembic.ini (project root)
alembic revision --autogenerate -m "description"alembic upgrade headalembic downgrade -1The following indexes are created by migrations:
ix_runstep_resource_keyonrunstep(resource_key)— used by the claim-layer subquery that excludes locked resource keysix_resourcelock_expires_atonresourcelock(expires_at)— used bysweep_expired_resource_locksand the opportunistic sweep inacquire_resource_lock
Consider adding these additional indexes for production:
-- Workflow processing queries
CREATE INDEX idx_runstep_status ON runstep(status, priority DESC);
CREATE INDEX idx_workflowrun_status ON workflowrun(status, batch_id);
CREATE INDEX idx_rungroup_batch ON rungroup(batch_id);
-- Document lookups
CREATE INDEX idx_documenturi_source ON documenturi(source);
-- Worker monitoring
CREATE INDEX idx_runstep_worker ON runstep(worker_id);
CREATE INDEX idx_workercheckin_last ON workercheckin(last_checkin);sqlite3 db/documents.db ".backup backup.db"pg_dump -h localhost -U user soliplex > backup.sqlsqlite3 db/documents.db "VACUUM;"psql -h localhost -U user -d soliplex -c "ANALYZE;"from soliplex.ingester.lib.models import WorkflowRun, RunStatus, get_session
from sqlmodel import select
async with get_session() as session:
query = select(WorkflowRun).where(WorkflowRun.status == RunStatus.FAILED)
results = await session.exec(query)
failed_runs = results.all()from sqlmodel import func, select
async with get_session() as session:
query = select(
func.count(WorkflowRun.id).label("total"),
WorkflowRun.status
).where(
WorkflowRun.batch_id == batch_id
).group_by(WorkflowRun.status)
results = await session.exec(query)
stats = {row.status: row.total for row in results}from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(seconds=600)
query = select(WorkerCheckin).where(WorkerCheckin.last_checkin < cutoff)
stale_workers = await session.exec(query)