Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions pkg/sql/inspect/inspect_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -34,6 +35,11 @@ func inspectTypeCheck(

// inspectRun represents the runtime state of an execution of INSPECT.
type inspectRun struct {
table catalog.TableDescriptor
db catalog.DatabaseDescriptor

namedIndexes tree.TableIndexNames

checks []*jobspb.InspectDetails_Check
asOfTimestamp hlc.Timestamp
}
Expand All @@ -43,40 +49,77 @@ func newInspectRun(
) (inspectRun, error) {
var run inspectRun

switch stmt.Typ {
case tree.InspectTable:
if table, err := p.ResolveExistingObjectEx(ctx, stmt.Table, true /* required */, tree.ResolveRequireTableDesc); err != nil {
return inspectRun{}, err
} else {
run.table = table
}

if db, err := p.Descriptors().ByIDWithLeased(p.Txn()).Get().Database(ctx, run.table.GetParentID()); err != nil {
return inspectRun{}, err
} else {
run.db = db
}
case tree.InspectDatabase:
if db, err := p.Descriptors().ByNameWithLeased(p.Txn()).Get().Database(ctx, stmt.Database.ToUnresolvedName().String()); err != nil {
return inspectRun{}, err
} else {
run.db = db
}
default:
return inspectRun{}, errors.AssertionFailedf("unexpected INSPECT type received, got: %v", stmt.Typ)
}

if len(stmt.Options) == 0 || stmt.Options.HasIndexAll() {
// No options or INDEX ALL specified - inspect all indexes.

switch stmt.Typ {
case tree.InspectTable:
table, err := p.ResolveExistingObjectEx(ctx, stmt.Table, true /* required */, tree.ResolveRequireTableDesc)
checks, err := sql.InspectChecksForTable(ctx, p, run.table)
if err != nil {
return inspectRun{}, err
}

run.checks, err = sql.InspectChecksForTable(ctx, p, table)
if err != nil {
run.checks = checks
case tree.InspectDatabase:
if checks, err := sql.InspectChecksForDatabase(ctx, p, run.db); err != nil {
return inspectRun{}, err
} else {
run.checks = checks
}
case tree.InspectDatabase:
db, err := p.Descriptors().ByName(p.Txn()).Get().Database(ctx, stmt.Database.ToUnresolvedName().String())
default:
return inspectRun{}, errors.AssertionFailedf("unexpected INSPECT type received, got: %v", stmt.Typ)
}
} else {
// Named indexes specified.
switch stmt.Typ {
case tree.InspectTable:
schema, err := p.Descriptors().ByIDWithLeased(p.Txn()).Get().Schema(ctx, run.table.GetParentSchemaID())
if err != nil {
return inspectRun{}, err
}

run.checks, err = sql.InspectChecksForDatabase(ctx, p, db)
if err != nil {
tableName := tree.MakeTableNameWithSchema(
tree.Name(run.db.GetName()), tree.Name(schema.GetName()), tree.Name(run.table.GetName()),
)
if namedIndexes, err := stmt.Options.WithNamedIndexesOnTable(&tableName); err != nil {
return inspectRun{}, err
} else {
run.namedIndexes = namedIndexes
}
case tree.InspectDatabase:
if namedIndexes, err := stmt.Options.WithNamedIndexesInDatabase(run.db.GetName()); err != nil {
return inspectRun{}, err
} else {
run.namedIndexes = namedIndexes
}
default:
return inspectRun{}, errors.AssertionFailedf("unexpected INSPECT type received, got: %v", stmt.Typ)
}
} else {
// Named indexes specified.
checks, err := sql.InspectChecksByIndexNames(ctx, p, stmt.Options.NamedIndexes())
if err != nil {

if checks, err := sql.InspectChecksByIndexNames(ctx, p, run.namedIndexes); err != nil {
return inspectRun{}, err
} else {
run.checks = checks
}
run.checks = checks
}

if stmt.AsOf.Expr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/inspect_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TriggerInspectJob(
func InspectChecksForDatabase(
ctx context.Context, p PlanHookState, db catalog.DatabaseDescriptor,
) ([]*jobspb.InspectDetails_Check, error) {
tables, err := p.Descriptors().ByName(p.Txn()).Get().GetAllTablesInDatabase(ctx, p.Txn(), db)
tables, err := p.Descriptors().ByNameWithLeased(p.Txn()).Get().GetAllTablesInDatabase(ctx, p.Txn(), db)
if err != nil {
return nil, err
}
Expand Down
140 changes: 134 additions & 6 deletions pkg/sql/logictest/testdata/logic_test/inspect
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,10 @@ statement ok
CREATE INDEX idx_c1 ON bar (c1);
CREATE INDEX idx_c3 ON bar (c3);

statement error pq: index "bar@idx_c1" does not belong to table "foo"
statement error pq: index "bar@idx_c1" is not on table "test.public.foo"
INSPECT TABLE foo WITH OPTIONS INDEX (bar@idx_c1);

# TODO(148365): The table name should be used to disambiguate.
statement error index name "idx_c1" is ambiguous \(found in test.public.foo and test.public.bar\)
INSPECT TABLE bar WITH OPTIONS INDEX (idx_c1);

statement error index "dbfake.foo.idx_c1" does not belong to database "test"
statement error pq: index "dbfake.foo.idx_c1" is not in database "test"
INSPECT DATABASE test WITH OPTIONS INDEX (dbfake.foo.idx_c1);

statement error pq: index name "idx_c1" is ambiguous \(found in test.public.foo and test.public.bar\)
Expand Down Expand Up @@ -221,3 +217,135 @@ statement ok
DROP TABLE t2;

subtest end

subtest database_resolve_indexes

statement ok
CREATE DATABASE dbfoo;
CREATE TABLE dbfoo.public.t2 (c1 INT, INDEX idx_c1 (c1));
CREATE TABLE dbfoo.public.t1 (c1 INT, INDEX idx_c1 (c1));


statement ok
SET enable_inspect_command = true;

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (t1@idx_c1);

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (t2@idx_c1);

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (public.t1@idx_c1);

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (dbfoo.t1@idx_c1);

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (dbfoo.t2@idx_c1);

statement ok
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (dbfoo.public.t1@idx_c1);

statement error pq: index name "idx_c1" is ambiguous \(found in dbfoo.public.t2 and dbfoo.public.t1\)
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (idx_c1);

statement error pq: index name "idx_c1" is ambiguous \(found in dbfoo.public.t2 and dbfoo.public.t1\)
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (public.idx_c1);

statement error pq: index name "idx_c1" is ambiguous \(found in dbfoo.public.t2 and dbfoo.public.t1\)
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (dbfoo.idx_c1);

statement error pq: index name "idx_c1" is ambiguous \(found in dbfoo.public.t2 and dbfoo.public.t1\)
INSPECT DATABASE dbfoo WITH OPTIONS INDEX (dbfoo.public.idx_c1);

statement ok
DROP DATABASE dbfoo;

subtest schema_catalog_collision

statement ok
CREATE DATABASE ambiguous;
CREATE SCHEMA ambiguous.ambiguous;
CREATE TABLE ambiguous.ambiguous.t1 (c1 INT, INDEX idx_c1 (c1));

# The duplicated names of the schema and the catalog means the database parameter doesn't help with resolution.
statement error pq: index "idx_c1" does not exist
INSPECT DATABASE ambiguous WITH OPTIONS INDEX (ambiguous.idx_c1);

statement ok
DROP DATABASE ambiguous;
subtest end

subtest end

subtest table_resolve_indexes

statement ok
CREATE DATABASE dbfoo;
CREATE DATABASE dbbar;
CREATE SCHEMA dbfoo.s1;
CREATE TABLE dbfoo.public.t1 (c1 INT, INDEX idx_c1 (c1));
CREATE TABLE dbfoo.s1.t1 (c1 INT, c2 INT, INDEX idx_c1 (c1), INDEX idx_c2 (c2));
CREATE TABLE dbbar.public.t1 (c1 INT, c2 INT, INDEX idx_c1 (c1), INDEX idx_c2 (c2));
CREATE TABLE dbbar.public.t2 (c1 INT, c2 INT, INDEX idx_c1 (c1), INDEX idx_c2 (c2));

statement ok
SET enable_inspect_command = true;

subtest all_forms

statement ok
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (t1@idx_c1);

statement error pq: index "s1.t1@idx_c1" is not on table "dbfoo.public.t1"
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (s1.t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (dbfoo.t1@idx_c1);

statement error pq: index "dbfoo.s1.t1@idx_c1" is not on table "dbfoo.public.t1"
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (dbfoo.s1.t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (idx_c1);

statement error pq: index "s1.idx_c1" is not on table "dbfoo.public.t1"
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (s1.idx_c1);

statement ok
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (dbfoo.idx_c1);

statement error pq: index "dbfoo.s1.idx_c1" is not on table "dbfoo.public.t1"
INSPECT TABLE dbfoo.t1 WITH OPTIONS INDEX (dbfoo.s1.idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (s1.t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (dbfoo.t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (dbfoo.s1.t1@idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (s1.idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (dbfoo.idx_c1);

statement ok
INSPECT TABLE dbfoo.s1.t1 WITH OPTIONS INDEX (dbfoo.s1.idx_c1);

subtest end

statement ok
DROP DATABASE dbfoo;

subtest end
84 changes: 64 additions & 20 deletions pkg/sql/sem/tree/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,79 @@ func (n *Inspect) Format(ctx *FmtCtx) {
// InspectOptions corresponds to a comma-delimited list of inspect options.
type InspectOptions []InspectOption

// NamedIndexes flattens the indexes named by option.
func (n *InspectOptions) NamedIndexes() TableIndexNames {
// namedIndexes flattens and copies the indexes named by option.
func (n *InspectOptions) namedIndexes() TableIndexNames {
var names TableIndexNames
for _, option := range *n {
if opt, ok := option.(*InspectOptionIndex); ok {
names = append(names, opt.IndexNames...)
for _, n := range opt.IndexNames {
name := *n
names = append(names, &name)
}
}
}

return names
}

// SetNamedIndexesToTable returns a copy of the named indexes with each table
// set to the specified 3-part name. It errors if the existing table name can't
// be qualified to the new table name.
func (n *InspectOptions) WithNamedIndexesOnTable(tableName *TableName) (TableIndexNames, error) {
tabledIndexes := n.namedIndexes()
for _, index := range tabledIndexes {
if !indexMatchesTable(index, *tableName) {
return nil, pgerror.Newf(pgcode.InvalidName, "index %q is not on table %q", index.String(), tableName)
}
index.Table = *tableName
}
return tabledIndexes, nil
}

// WithNamedIndexesInDatabase returns a copy of the named indexes with each
// table name set to include database specified. It errors if the database was
// previously set and is not a match.
//
// In 2-part names, the schema field is used ambiguously for either the database
// or the schema. To prevent duplication of the database name, the catalog field
// isn't set if the database name is the same as the schema name.
func (n *InspectOptions) WithNamedIndexesInDatabase(databaseName string) (TableIndexNames, error) {
databasedIndexes := n.namedIndexes()
for _, index := range databasedIndexes {
if index.Table.ExplicitCatalog && index.Table.CatalogName != Name(databaseName) {
return nil, pgerror.Newf(pgcode.InvalidName, "index %q is not in database %q", index.String(), databaseName)
}

if index.Table.ExplicitSchema {
if index.Table.SchemaName != Name(databaseName) {
index.Table.CatalogName = Name(databaseName)
index.Table.ExplicitCatalog = true
}
} else {
index.Table.SchemaName = Name(databaseName)
index.Table.ExplicitSchema = true
}
}

return databasedIndexes, nil
}

// indexMatchesTable checks if a TableIndexName matches a 3-part table name.
func indexMatchesTable(index *TableIndexName, table TableName) bool {
if index.Table.ObjectName != "" && table.ObjectName != index.Table.ObjectName {
return false
}
if index.Table.ExplicitCatalog {
return table.CatalogName == index.Table.CatalogName && table.SchemaName == index.Table.SchemaName
}
if index.Table.ExplicitSchema {
// A 2-part name means the first segment may be the schema or the catalog.
return table.CatalogName == index.Table.SchemaName || table.SchemaName == index.Table.SchemaName
}

return true
}

// HasIndexAll checks if the options include an INDEX ALL option.
func (n *InspectOptions) HasIndexAll() bool {
for _, option := range *n {
Expand All @@ -89,23 +150,6 @@ func (n *Inspect) Validate() error {
return err
}

// TODO(155056): Better validate index names from the options with the name
// of the database or table from the command.
// TODO(148365): Check for duplicated index names (including the name of the
// database or table from the command).
for _, index := range n.Options.NamedIndexes() {
switch n.Typ {
case InspectTable:
if index.Table.ObjectName != "" && n.Table.Object() != index.Table.Object() {
return pgerror.Newf(pgcode.InvalidName, "index %q does not belong to table %q", index.String(), n.Table.String())
}
case InspectDatabase:
if index.Table.ExplicitCatalog && n.Database.Object() != index.Table.Catalog() {
return pgerror.Newf(pgcode.InvalidName, "index %q does not belong to database %q", index.String(), n.Database.String())
}
}
}

return nil
}

Expand Down