Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 202 additions & 12 deletions packages/opencode/src/altimate/native/schema/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import type {
SchemaCacheWarehouseStatus,
SchemaSearchTableResult,
SchemaSearchColumnResult,
SchemaEntityGroupSummary,
SchemaSearchEntityGroupResult,
} from "../types"
import {
detectEntityGroup,
type TableShape,
DEFAULT_ENTITY_RATIO_THRESHOLD,
DEFAULT_ENTITY_MIN_TABLES,
} from "./entity-groups"

// ---------------------------------------------------------------------------
// DDL
Expand Down Expand Up @@ -67,6 +75,24 @@ CREATE INDEX IF NOT EXISTS idx_columns_search ON columns_cache(search_text);
CREATE INDEX IF NOT EXISTS idx_tables_warehouse ON tables_cache(warehouse);
CREATE INDEX IF NOT EXISTS idx_columns_warehouse ON columns_cache(warehouse);
CREATE INDEX IF NOT EXISTS idx_columns_table ON columns_cache(warehouse, schema_name, table_name, column_name);

CREATE TABLE IF NOT EXISTS entity_groups_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
warehouse TEXT NOT NULL,
database_name TEXT,
schema_name TEXT NOT NULL,
pattern TEXT NOT NULL DEFAULT 'entity-per-table',
fingerprint TEXT NOT NULL,
table_count INTEGER NOT NULL,
sample_table TEXT,
composite_columns_json TEXT NOT NULL,
table_names_json TEXT NOT NULL,
search_text TEXT NOT NULL,
UNIQUE(warehouse, database_name, schema_name, fingerprint)
);

CREATE INDEX IF NOT EXISTS idx_entity_groups_search ON entity_groups_cache(search_text);
CREATE INDEX IF NOT EXISTS idx_entity_groups_warehouse ON entity_groups_cache(warehouse);
`

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -141,22 +167,36 @@ export class SchemaCache {

/**
* Crawl a warehouse and index all schemas/tables/columns.
*
* For each schema, runs entity-per-table detection: if ≥50% of tables share
* the same column structure and the group has at least 20 tables, emits a
* single composite digest row in `entity_groups_cache` instead of N
* near-identical per-table rows. Tables outside the group still get their
* normal per-table entries.
*/
async indexWarehouse(
warehouseName: string,
warehouseType: string,
connector: Connector,
options: {
entityRatioThreshold?: number
entityMinTables?: number
} = {},
): Promise<SchemaIndexResult> {
const now = new Date().toISOString()
const ratioThreshold = options.entityRatioThreshold ?? DEFAULT_ENTITY_RATIO_THRESHOLD
const minTables = options.entityMinTables ?? DEFAULT_ENTITY_MIN_TABLES

// Clear existing data
this.db.prepare("DELETE FROM columns_cache WHERE warehouse = ?").run(warehouseName)
this.db.prepare("DELETE FROM tables_cache WHERE warehouse = ?").run(warehouseName)
this.db.prepare("DELETE FROM entity_groups_cache WHERE warehouse = ?").run(warehouseName)

let totalSchemas = 0
let totalTables = 0
let totalColumns = 0
const databaseName: string | null = null
const entityGroupsEmitted: SchemaEntityGroupSummary[] = []

let schemas: string[] = []
try {
Expand All @@ -177,6 +217,13 @@ export class SchemaCache {
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
)

const insertEntityGroup = this.db.prepare(
`INSERT OR REPLACE INTO entity_groups_cache
(warehouse, database_name, schema_name, pattern, fingerprint, table_count,
sample_table, composite_columns_json, table_names_json, search_text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
)

// Batch inserts per-table inside a transaction to avoid per-statement disk fsyncs.
// The async connector calls (listTables, describeTable) run outside the transaction;
// only the synchronous SQLite inserts are wrapped.
Expand All @@ -200,36 +247,101 @@ export class SchemaCache {
continue
}

// Collect every table's columns first so the entity-group detector
// can see the whole schema at once. We also retain the table_type and
// raw column rows for downstream insert.
type TableSnapshot = {
name: string
type: string
columns: Array<{ name: string; data_type: string; nullable: boolean }>
}
const snapshots: TableSnapshot[] = []
for (const tableInfo of tables) {
totalTables++
const searchText = makeSearchText(databaseName, schemaName, tableInfo.name, tableInfo.type)

let columns: Array<{ name: string; data_type: string; nullable: boolean }> = []
try {
columns = await connector.describeTable(schemaName, tableInfo.name)
} catch {
// continue with empty columns
}
snapshots.push({ name: tableInfo.name, type: tableInfo.type, columns })
}

const shapes: TableShape[] = snapshots.map((s) => ({
table_name: s.name,
columns: s.columns.map((c) => ({ name: c.name, data_type: c.data_type })),
}))
const detection = detectEntityGroup(shapes, { ratioThreshold, minTables })

const inGroup = new Set<string>(
detection.entity_group ? detection.entity_group.table_names : [],
)

for (const snap of snapshots) {
totalTables++

if (inGroup.has(snap.name)) {
// Tables inside the entity group are NOT emitted per-table.
// Their columns are still counted toward totalColumns so the index
// result accurately reflects the warehouse, but they live inside
// the composite digest (and the entity_groups_cache row below).
totalColumns += snap.columns.length
continue
}

// Build column insert args
const searchText = makeSearchText(databaseName, schemaName, snap.name, snap.type)
const columnArgsBatch: any[][] = []
for (const col of columns) {
for (const col of snap.columns) {
totalColumns++
const colSearch = makeSearchText(
databaseName, schemaName, tableInfo.name, col.name, col.data_type,
databaseName, schemaName, snap.name, col.name, col.data_type,
)
columnArgsBatch.push([
warehouseName, databaseName, schemaName, tableInfo.name,
warehouseName, databaseName, schemaName, snap.name,
col.name, col.data_type, col.nullable ? 1 : 0, colSearch,
])
}

// Insert table + all its columns in a single transaction
insertTableBatch(
[warehouseName, databaseName, schemaName, tableInfo.name, tableInfo.type, searchText],
[warehouseName, databaseName, schemaName, snap.name, snap.type, searchText],
columnArgsBatch,
)
}

// Persist the entity group for this schema, if one was detected.
if (detection.entity_group) {
const eg = detection.entity_group
const compositeColumns = eg.composite_columns
const groupSearchText = makeSearchText(
databaseName,
schemaName,
eg.sample_table,
...eg.table_names,
...compositeColumns.map((c) => c.name),
...compositeColumns.map((c) => c.data_type),
)
insertEntityGroup.run(
warehouseName,
databaseName,
schemaName,
"entity-per-table",
eg.fingerprint,
eg.table_names.length,
eg.sample_table,
JSON.stringify(compositeColumns),
JSON.stringify(eg.table_names),
groupSearchText,
)
entityGroupsEmitted.push({
warehouse: warehouseName,
database: databaseName ?? undefined,
schema_name: schemaName,
pattern: "entity-per-table",
table_count: eg.table_names.length,
composite_columns: compositeColumns,
sample_table: eg.sample_table,
table_names: eg.table_names,
})
}
}

// Update warehouse summary
Expand All @@ -249,6 +361,7 @@ export class SchemaCache {
tables_indexed: totalTables,
columns_indexed: totalColumns,
timestamp: now,
entity_groups: entityGroupsEmitted.length > 0 ? entityGroupsEmitted : undefined,
}
}

Expand Down Expand Up @@ -317,11 +430,54 @@ export class SchemaCache {
}
})

// Search entity groups: a group matches if any of its tokens appears in
// its search_text (which includes the full table_names list, composite
// column names/types, schema, and sample table). This means the agent
// can still find a specific collapsed table by name.
const groupRows = this.db.prepare(
`SELECT warehouse, database_name, schema_name, pattern, table_count,
sample_table, composite_columns_json, table_names_json
FROM entity_groups_cache
WHERE ${searchCondition} ${whFilter}
ORDER BY schema_name, sample_table
LIMIT ?`,
).all(...searchParams, ...whParams, limit) as any[]

const entityGroups: SchemaSearchEntityGroupResult[] = groupRows.map((row) => {
const compositeColumns = JSON.parse(row.composite_columns_json) as {
name: string
data_type: string
}[]
const tableNames = JSON.parse(row.table_names_json) as string[]
// Of the group's table names, return the subset that actually matches
// any query token. This makes "find me table AAPL" return just AAPL,
// not all 2754 tickers.
const lowerTokens = tokens.map((t) => t.toLowerCase())
const matching = tableNames.filter((name) => {
const lowerName = name.toLowerCase()
return lowerTokens.some((tok) => lowerName.includes(tok))
})
return {
warehouse: row.warehouse,
database: row.database_name ?? undefined,
schema_name: row.schema_name,
pattern: row.pattern,
table_count: row.table_count,
composite_columns: compositeColumns,
sample_table: row.sample_table,
// If query matched on schema or composite columns rather than a
// specific table, return an empty matching_tables list so callers
// know it was a structural match.
matching_tables: matching,
}
})

return {
tables,
columns,
query,
match_count: tables.length + columns.length,
match_count: tables.length + columns.length + entityGroups.length,
entity_groups: entityGroups.length > 0 ? entityGroups : undefined,
}
}

Expand All @@ -340,8 +496,42 @@ export class SchemaCache {
columns_count: row.columns_count,
}))

const totalTables = (this.db.prepare("SELECT COUNT(*) as cnt FROM tables_cache").get() as any).cnt
const totalColumns = (this.db.prepare("SELECT COUNT(*) as cnt FROM columns_cache").get() as any).cnt
// total_tables includes both per-table rows AND tables collapsed into
// entity-per-table groups (counted via the group's table_count). This
// matches the pre-collapse behaviour so callers see the real warehouse
// table count regardless of digest format.
const perTableCount = (
this.db.prepare("SELECT COUNT(*) as cnt FROM tables_cache").get() as any
).cnt
const collapsedCount = (
this.db
.prepare("SELECT COALESCE(SUM(table_count), 0) as cnt FROM entity_groups_cache")
.get() as any
).cnt
const totalTables = perTableCount + collapsedCount

// total_columns covers physical columns_cache rows. Columns inside
// entity groups are stored once in the group's composite_columns_json
// rather than N times per-table; we add (composite_count * table_count)
// back so the total reflects logical column count.
const perColumnCount = (
this.db.prepare("SELECT COUNT(*) as cnt FROM columns_cache").get() as any
).cnt
const groupRows = this.db
.prepare(
"SELECT table_count, composite_columns_json FROM entity_groups_cache",
)
.all() as any[]
let collapsedColumns = 0
for (const r of groupRows) {
try {
const cols = JSON.parse(r.composite_columns_json) as unknown[]
collapsedColumns += cols.length * r.table_count
} catch {
// ignore malformed rows
}
}
const totalColumns = perColumnCount + collapsedColumns

return {
warehouses,
Expand Down
Loading
Loading