Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 170 additions & 16 deletions pkg/sql/inspect/index_consistency_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,33 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/spanutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

var indexConsistencyHashEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.inspect.index_consistency_hash.enabled",
"if false, the index consistency check skips the hash precheck and always runs the full join",
true,
)

// indexConsistencyCheckApplicability is a lightweight version that only implements applicability logic.
type indexConsistencyCheckApplicability struct {
tableID descpb.ID
Expand All @@ -44,6 +55,21 @@ func (c *indexConsistencyCheckApplicability) AppliesTo(
return spanContainsTable(c.tableID, codec, span)
}

// checkState represents the state of an index consistency check.
type checkState int

const (
// checkNotStarted indicates Start() has not been called yet.
checkNotStarted checkState = iota
// checkHashMatched indicates the hash precheck passed - no corruption detected,
// so the full check can be skipped.
checkHashMatched
// checkRunning indicates the full check is actively running and may produce more results.
checkRunning
// checkDone indicates the check has finished (iterator exhausted or error occurred).
checkDone
)

// indexConsistencyCheck verifies consistency between a table's primary index
// and a specified secondary index by streaming rows from both sides of a
// query. It reports an issue if a key exists in the primary but not the
Expand All @@ -55,11 +81,11 @@ type indexConsistencyCheck struct {
indexID descpb.IndexID
asOf hlc.Timestamp

tableDesc catalog.TableDescriptor
secIndex catalog.Index
priIndex catalog.Index
rowIter isql.Rows
exhaustedIter bool
tableDesc catalog.TableDescriptor
secIndex catalog.Index
priIndex catalog.Index
rowIter isql.Rows
state checkState

// columns is a list of the columns returned by one side of the
// queries join. The actual resulting rows from the RowContainer is
Expand All @@ -76,7 +102,7 @@ var _ inspectCheckApplicability = (*indexConsistencyCheck)(nil)

// Started implements the inspectCheck interface.
func (c *indexConsistencyCheck) Started() bool {
return c.rowIter != nil
return c.state != checkNotStarted
}

// Start implements the inspectCheck interface.
Expand Down Expand Up @@ -132,6 +158,10 @@ func (c *indexConsistencyCheck) Start(
return res
}

pkColNames := colNames(pkColumns)
otherColNames := colNames(otherColumns)
allColNames := colNames(c.columns)

// Generate query bounds from the span to limit the query to the specified range
var predicate string
var queryArgs []interface{}
Expand Down Expand Up @@ -167,6 +197,7 @@ func (c *indexConsistencyCheck) Start(

// Nothing to do if no rows exist in the span.
if !hasRows {
c.state = checkDone
return nil
}

Expand All @@ -175,7 +206,6 @@ func (c *indexConsistencyCheck) Start(
}

// Generate SQL predicate from the bounds
pkColNames := colNames(pkColumns)
// Encode column names for SQL usage
encodedPkColNames := make([]string, len(pkColNames))
for i, colName := range pkColNames {
Expand All @@ -202,8 +232,26 @@ func (c *indexConsistencyCheck) Start(
queryArgs = append(queryArgs, datum)
}

if indexConsistencyHashEnabled.Get(&c.flowCtx.Cfg.Settings.SV) && len(allColNames) > 0 {
match, hashErr := c.hashesMatch(ctx, allColNames, predicate, queryArgs)
if hashErr != nil {
// If hashing fails, we usually fall back to the full check. But if the
// error stems from query construction, that's an internal bug and shouldn't
// be ignored.
if isQueryConstructionError(hashErr) {
return hashErr
}
log.Dev.Infof(ctx, "hash precheck for index consistency failed; falling back to full check: %v", hashErr)
}
if match {
// Hashes match, no corruption detected - skip the full check.
c.state = checkHashMatched
return nil
}
}

checkQuery := c.createIndexCheckQuery(
colNames(pkColumns), colNames(otherColumns), c.tableDesc.GetID(), c.secIndex, c.priIndex.GetID(), predicate,
pkColNames, otherColNames, c.tableDesc.GetID(), c.secIndex, c.priIndex.GetID(), predicate,
)

// Wrap the query with AS OF SYSTEM TIME to ensure it uses the specified timestamp
Expand Down Expand Up @@ -232,13 +280,19 @@ func (c *indexConsistencyCheck) Start(
// do that here because the results of the iterator are used in the Next()
// function.
c.rowIter = it
c.state = checkRunning
return nil
}

// Next implements the inspectCheck interface.
func (c *indexConsistencyCheck) Next(
ctx context.Context, cfg *execinfra.ServerConfig,
) (*inspectIssue, error) {
// If hashes matched, there's no corruption to report.
if c.state == checkHashMatched {
return nil, nil
}

if c.rowIter == nil {
return nil, errors.AssertionFailedf("nil rowIter unexpected")
}
Expand All @@ -248,7 +302,7 @@ func (c *indexConsistencyCheck) Next(
// Close the iterator to prevent further usage. The close may emit the
// internal error too, but we only need to capture it once.
_ = c.Close(ctx)
c.exhaustedIter = true
c.state = checkDone

// Convert internal errors to inspect issues rather than failing the entire job.
// This allows us to capture and log data corruption or encoding errors as
Expand All @@ -269,7 +323,7 @@ func (c *indexConsistencyCheck) Next(
}, nil
}
if !ok {
c.exhaustedIter = true
c.state = checkDone
return nil, nil
}

Expand Down Expand Up @@ -336,12 +390,8 @@ func (c *indexConsistencyCheck) Next(

// Done implements the inspectCheck interface.
func (c *indexConsistencyCheck) Done(context.Context) bool {
// If we never started (rowIter is nil), we're done
if c.rowIter == nil {
return true
}
// Otherwise, we're done when the iterator is exhausted
return c.exhaustedIter
done := c.state == checkHashMatched || c.state == checkDone
return done
}

// Close implements the inspectCheck interface.
Expand Down Expand Up @@ -660,6 +710,93 @@ func (c *indexConsistencyCheck) createIndexCheckQuery(
)
}

type hashResult struct {
rowCount int64
hash string
}

// hashesMatch performs a fast comparison of primary and secondary indexes by
// computing row counts and hash values. Returns true if both indexes have
// identical row counts and hash values, indicating no corruption.
func (c *indexConsistencyCheck) hashesMatch(
ctx context.Context, columnNames []string, predicate string, queryArgs []interface{},
) (bool, error) {
primary, err := c.computeHashAndRowCount(ctx, c.priIndex, columnNames, predicate, queryArgs)
if err != nil {
return false, errors.Wrapf(err, "computing hash for primary index %s", c.priIndex.GetName())
}
secondary, err := c.computeHashAndRowCount(ctx, c.secIndex, columnNames, predicate, queryArgs)
if err != nil {
return false, errors.Wrapf(err, "computing hash for secondary index %s", c.secIndex.GetName())
}
// Hashes match only if both row count and hash value are identical.
return primary.rowCount == secondary.rowCount && primary.hash == secondary.hash, nil
}

// computeHashAndRowCount executes a hash query for the specified index and
// returns the row count and XOR aggregate hash value.
func (c *indexConsistencyCheck) computeHashAndRowCount(
ctx context.Context,
index catalog.Index,
columnNames []string,
predicate string,
queryArgs []interface{},
) (hashResult, error) {
query := buildIndexHashQuery(c.tableDesc.GetID(), index, columnNames, predicate)
queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", query, c.asOf.AsOfSystemTime())

qos := getInspectQoS(&c.flowCtx.Cfg.Settings.SV)
row, err := c.flowCtx.Cfg.DB.Executor().QueryRowEx(
ctx, "inspect-index-consistency-hash", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.NodeUserName(),
QualityOfService: &qos,
},
queryWithAsOf,
queryArgs...,
)
if err != nil {
return hashResult{}, err
}
if len(row) != 2 {
return hashResult{}, errors.AssertionFailedf("hash query returned unexpected column count: %d", len(row))
}
return hashResult{
rowCount: int64(tree.MustBeDInt(row[0])),
hash: string(tree.MustBeDBytes(row[1])),
}, nil
}

// buildIndexHashQuery constructs a query that computes row count and XOR
// aggregate hash for the specified index and columns.
func buildIndexHashQuery(
tableID descpb.ID, index catalog.Index, columnNames []string, predicate string,
) string {
hashExpr := hashInputExpression(columnNames)
whereClause := buildWhereClause(predicate, nil /* nullFilters */)
return fmt.Sprintf(`
SELECT
count(*) AS row_count,
crdb_internal.datums_to_bytes(xor_agg(fnv64(%s))) AS hash_value
FROM [%d AS t]@{FORCE_INDEX=[%d]}%s`,
hashExpr,
tableID,
index.GetID(),
whereClause,
)
}

// hashInputExpression creates a hash-friendly expression by encoding column
// values to bytes with NULL coalesced to empty bytes.
func hashInputExpression(columnNames []string) string {
args := make([]string, len(columnNames))
for i, col := range columnNames {
args[i] = colRef("t", col)
}
encoded := fmt.Sprintf("crdb_internal.datums_to_bytes(%s)", strings.Join(args, ", "))
return fmt.Sprintf("COALESCE(%s, ''::BYTES)", encoded)
}

// encodeColumnName properly encodes a column name for use in SQL.
func encodeColumnName(columnName string) string {
var buf bytes.Buffer
Expand Down Expand Up @@ -751,3 +888,20 @@ func buildWhereClause(predicate string, nullFilters []string) string {

return buf.String()
}

// isQueryConstructionError checks if the given error is due to
// invalid syntax or references in the query construction.
func isQueryConstructionError(err error) bool {
code := pgerror.GetPGCode(err)
switch code {
case pgcode.Syntax,
pgcode.UndefinedColumn,
pgcode.UndefinedTable,
pgcode.UndefinedFunction,
pgcode.DatatypeMismatch,
pgcode.InvalidColumnReference:
return true
default:
return false
}
}
Loading