diff --git a/pkg/sql/inspect/index_consistency_check.go b/pkg/sql/inspect/index_consistency_check.go index 592991ca1ebd..cb167890c2b9 100644 --- a/pkg/sql/inspect/index_consistency_check.go +++ b/pkg/sql/inspect/index_consistency_check.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -21,16 +22,26 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/lexbase" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/idxtype" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/spanutils" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) +var indexConsistencyHashEnabled = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.inspect.index_consistency_hash.enabled", + "if false, the index consistency check skips the hash precheck and always runs the full join", + true, +) + // indexConsistencyCheckApplicability is a lightweight version that only implements applicability logic. type indexConsistencyCheckApplicability struct { tableID descpb.ID @@ -45,6 +56,21 @@ func (c *indexConsistencyCheckApplicability) AppliesTo( return spanContainsTable(c.tableID, codec, span) } +// checkState represents the state of an index consistency check. +type checkState int + +const ( + // checkNotStarted indicates Start() has not been called yet. + checkNotStarted checkState = iota + // checkHashMatched indicates the hash precheck passed - no corruption detected, + // so the full check can be skipped. + checkHashMatched + // checkRunning indicates the full check is actively running and may produce more results. + checkRunning + // checkDone indicates the check has finished (iterator exhausted or error occurred). + checkDone +) + // indexConsistencyCheck verifies consistency between a table's primary index // and a specified secondary index by streaming rows from both sides of a // query. It reports an issue if a key exists in the primary but not the @@ -56,11 +82,11 @@ type indexConsistencyCheck struct { indexID descpb.IndexID asOf hlc.Timestamp - tableDesc catalog.TableDescriptor - secIndex catalog.Index - priIndex catalog.Index - rowIter isql.Rows - exhaustedIter bool + tableDesc catalog.TableDescriptor + secIndex catalog.Index + priIndex catalog.Index + rowIter isql.Rows + state checkState // columns is a list of the columns returned by one side of the // queries join. The actual resulting rows from the RowContainer is @@ -77,7 +103,7 @@ var _ inspectCheckApplicability = (*indexConsistencyCheck)(nil) // Started implements the inspectCheck interface. func (c *indexConsistencyCheck) Started() bool { - return c.rowIter != nil + return c.state != checkNotStarted } // Start implements the inspectCheck interface. @@ -141,6 +167,10 @@ func (c *indexConsistencyCheck) Start( return res } + pkColNames := colNames(pkColumns) + otherColNames := colNames(otherColumns) + allColNames := colNames(c.columns) + // Generate query bounds from the span to limit the query to the specified range var predicate string var queryArgs []interface{} @@ -188,7 +218,6 @@ func (c *indexConsistencyCheck) Start( } // Generate SQL predicate from the bounds - pkColNames := colNames(pkColumns) // Encode column names for SQL usage encodedPkColNames := make([]string, len(pkColNames)) for i, colName := range pkColNames { @@ -216,8 +245,27 @@ func (c *indexConsistencyCheck) Start( } } + if indexConsistencyHashEnabled.Get(&c.flowCtx.Cfg.Settings.SV) && len(allColNames) > 0 { + match, hashErr := c.hashesMatch(ctx, allColNames, predicate, queryArgs) + if hashErr != nil { + // If hashing fails, we usually fall back to the full check. But if the + // error stems from query construction, that's an internal bug and shouldn't + // be ignored. + if isQueryConstructionError(hashErr) { + return errors.WithAssertionFailure(hashErr) + } + log.Dev.Infof(ctx, "hash precheck for index consistency did not match; falling back to full check: %v", hashErr) + } + if match { + // Hashes match, no corruption detected - skip the full check. + c.state = checkHashMatched + return nil + } + } + + joinColNames := colNames(joinColumns) checkQuery := c.createIndexCheckQuery( - colNames(pkColumns), colNames(otherColumns), colNames(joinColumns), + pkColNames, otherColNames, joinColNames, c.tableDesc.GetID(), c.secIndex, c.priIndex.GetID(), predicate, ) @@ -247,6 +295,7 @@ func (c *indexConsistencyCheck) Start( // do that here because the results of the iterator are used in the Next() // function. c.rowIter = it + c.state = checkRunning return nil } @@ -254,6 +303,11 @@ func (c *indexConsistencyCheck) Start( func (c *indexConsistencyCheck) Next( ctx context.Context, cfg *execinfra.ServerConfig, ) (*inspectIssue, error) { + // If hashes matched, there's no corruption to report. + if c.state == checkHashMatched { + return nil, nil + } + if c.rowIter == nil { return nil, errors.AssertionFailedf("nil rowIter unexpected") } @@ -263,7 +317,7 @@ func (c *indexConsistencyCheck) Next( // Close the iterator to prevent further usage. The close may emit the // internal error too, but we only need to capture it once. _ = c.Close(ctx) - c.exhaustedIter = true + c.state = checkDone // Convert internal errors to inspect issues rather than failing the entire job. // This allows us to capture and log data corruption or encoding errors as @@ -284,7 +338,7 @@ func (c *indexConsistencyCheck) Next( }, nil } if !ok { - c.exhaustedIter = true + c.state = checkDone return nil, nil } @@ -351,12 +405,8 @@ func (c *indexConsistencyCheck) Next( // Done implements the inspectCheck interface. func (c *indexConsistencyCheck) Done(context.Context) bool { - // If we never started (rowIter is nil), we're done - if c.rowIter == nil { - return true - } - // Otherwise, we're done when the iterator is exhausted - return c.exhaustedIter + done := c.state == checkHashMatched || c.state == checkDone + return done } // Close implements the inspectCheck interface. @@ -675,6 +725,93 @@ func (c *indexConsistencyCheck) createIndexCheckQuery( ) } +type hashResult struct { + rowCount int64 + hash string +} + +// hashesMatch performs a fast comparison of primary and secondary indexes by +// computing row counts and hash values. Returns true if both indexes have +// identical row counts and hash values, indicating no corruption. +func (c *indexConsistencyCheck) hashesMatch( + ctx context.Context, columnNames []string, predicate string, queryArgs []interface{}, +) (bool, error) { + primary, err := c.computeHashAndRowCount(ctx, c.priIndex, columnNames, predicate, queryArgs) + if err != nil { + return false, errors.Wrapf(err, "computing hash for primary index %s", c.priIndex.GetName()) + } + secondary, err := c.computeHashAndRowCount(ctx, c.secIndex, columnNames, predicate, queryArgs) + if err != nil { + return false, errors.Wrapf(err, "computing hash for secondary index %s", c.secIndex.GetName()) + } + // Hashes match only if both row count and hash value are identical. + return primary.rowCount == secondary.rowCount && primary.hash == secondary.hash, nil +} + +// computeHashAndRowCount executes a hash query for the specified index and +// returns the row count and XOR aggregate hash value. +func (c *indexConsistencyCheck) computeHashAndRowCount( + ctx context.Context, + index catalog.Index, + columnNames []string, + predicate string, + queryArgs []interface{}, +) (hashResult, error) { + query := buildIndexHashQuery(c.tableDesc.GetID(), index, columnNames, predicate) + queryWithAsOf := fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", query, c.asOf.AsOfSystemTime()) + + qos := getInspectQoS(&c.flowCtx.Cfg.Settings.SV) + row, err := c.flowCtx.Cfg.DB.Executor().QueryRowEx( + ctx, "inspect-index-consistency-hash", nil, /* txn */ + sessiondata.InternalExecutorOverride{ + User: username.NodeUserName(), + QualityOfService: &qos, + }, + queryWithAsOf, + queryArgs..., + ) + if err != nil { + return hashResult{}, err + } + if len(row) != 2 { + return hashResult{}, errors.AssertionFailedf("hash query returned unexpected column count: %d", len(row)) + } + return hashResult{ + rowCount: int64(tree.MustBeDInt(row[0])), + hash: string(tree.MustBeDBytes(row[1])), + }, nil +} + +// buildIndexHashQuery constructs a query that computes row count and XOR +// aggregate hash for the specified index and columns. +func buildIndexHashQuery( + tableID descpb.ID, index catalog.Index, columnNames []string, predicate string, +) string { + hashExpr := hashInputExpression(columnNames) + whereClause := buildWhereClause(predicate, nil /* nullFilters */) + return fmt.Sprintf(` +SELECT + count(*) AS row_count, + crdb_internal.datums_to_bytes(xor_agg(fnv64(%s))) AS hash_value +FROM [%d AS t]@{FORCE_INDEX=[%d]}%s`, + hashExpr, + tableID, + index.GetID(), + whereClause, + ) +} + +// hashInputExpression creates a hash-friendly expression by encoding column +// values to bytes with NULL coalesced to empty bytes. +func hashInputExpression(columnNames []string) string { + args := make([]string, len(columnNames)) + for i, col := range columnNames { + args[i] = colRef("t", col) + } + encoded := fmt.Sprintf("crdb_internal.datums_to_bytes(%s)", strings.Join(args, ", ")) + return fmt.Sprintf("COALESCE(%s, ''::BYTES)", encoded) +} + // encodeColumnName properly encodes a column name for use in SQL. func encodeColumnName(columnName string) string { var buf bytes.Buffer @@ -766,3 +903,20 @@ func buildWhereClause(predicate string, nullFilters []string) string { return buf.String() } + +// isQueryConstructionError checks if the given error is due to +// invalid syntax or references in the query construction. +func isQueryConstructionError(err error) bool { + code := pgerror.GetPGCode(err) + switch code { + case pgcode.Syntax, + pgcode.UndefinedColumn, + pgcode.UndefinedTable, + pgcode.UndefinedFunction, + pgcode.DatatypeMismatch, + pgcode.InvalidColumnReference: + return true + default: + return false + } +} diff --git a/pkg/sql/inspect/index_consistency_check_test.go b/pkg/sql/inspect/index_consistency_check_test.go index 7cd86e700986..4d3eb045b960 100644 --- a/pkg/sql/inspect/index_consistency_check_test.go +++ b/pkg/sql/inspect/index_consistency_check_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -119,6 +120,8 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.UnderRace(t, "slow test") + issueLogger := &testIssueCollector{} ctx := context.Background() const numNodes = 3 @@ -144,7 +147,7 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { ie := s.InternalExecutor().(*sql.InternalExecutor) r := sqlutils.MakeSQLRunner(db) - for _, tc := range []struct { + testCases := []struct { // desc is a description of the test case. desc string // splitRangeDDL is the DDL to split the table into multiple ranges. The @@ -325,13 +328,30 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { // No corruptionTargetIndex specified, no corruption missingIndexEntrySelector: "", // No corruption }, - } { - t.Run(tc.desc, func(t *testing.T) { - issueLogger.reset() - r.ExecMultiple(t, - `DROP DATABASE IF EXISTS test`, - `CREATE DATABASE test`, - `CREATE TABLE test.t ( + } + hashConfigs := []struct { + name string + enabled bool + }{ + {name: "hash-enabled", enabled: true}, + {name: "hash-disabled", enabled: false}, + } + for _, hashCfg := range hashConfigs { + hashCfg := hashCfg + t.Run(hashCfg.name, func(t *testing.T) { + r.Exec(t, "SET CLUSTER SETTING sql.inspect.index_consistency_hash.enabled = $1", hashCfg.enabled) + t.Cleanup(func() { + r.Exec(t, "SET CLUSTER SETTING sql.inspect.index_consistency_hash.enabled = true") + }) + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + issueLogger.reset() + r.ExecMultiple(t, + `DROP DATABASE IF EXISTS test`, + `CREATE DATABASE test`, + `CREATE TABLE test.t ( a INT, b INT, c INT NOT NULL, @@ -341,7 +361,7 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { PRIMARY KEY (a, d), FAMILY fam0 (a, b, c, d, e, f) )`, - `INSERT INTO test.t (a, b, c, d, e, f) + `INSERT INTO test.t (a, b, c, d, e, f) SELECT gs1 AS a, gs1 * 10 AS b, @@ -350,93 +370,93 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { 'e_' || gs1::STRING AS e, gs1 * 1.5 AS f FROM generate_series(1, 1000) AS gs1;`, - ) + ) - // Split the values and relocate leases so that the INSPECT job ends up - // running in parallel across multiple nodes. - r.ExecMultiple(t, tc.splitRangeDDL) - ranges, err := db.Query(` + // Split the values and relocate leases so that the INSPECT job ends up + // running in parallel across multiple nodes. + r.ExecMultiple(t, tc.splitRangeDDL) + ranges, err := db.Query(` WITH r AS (SHOW RANGES FROM TABLE test.t WITH DETAILS) SELECT range_id FROM r ORDER BY 1`) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, ranges.Close()) - }) - for i := 0; ranges.Next(); i++ { - var rangeID int - err = ranges.Scan(&rangeID) - require.NoError(t, err) - relocate := fmt.Sprintf("ALTER RANGE %d RELOCATE LEASE TO %d", rangeID, (i%numNodes)+1) - r.Exec(t, relocate) - } + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, ranges.Close()) + }) + for i := 0; ranges.Next(); i++ { + var rangeID int + err = ranges.Scan(&rangeID) + require.NoError(t, err) + relocate := fmt.Sprintf("ALTER RANGE %d RELOCATE LEASE TO %d", rangeID, (i%numNodes)+1) + r.Exec(t, relocate) + } - r.ExecMultiple(t, tc.indexDDL...) + r.ExecMultiple(t, tc.indexDDL...) - // Execute any post-index SQL - if tc.postIndexSQL != "" { - r.Exec(t, tc.postIndexSQL) - } + // Execute any post-index SQL + if tc.postIndexSQL != "" { + r.Exec(t, tc.postIndexSQL) + } - // Get timestamp before corruption if needed - var expectedASOFTime time.Time + // Get timestamp before corruption if needed + var expectedASOFTime time.Time - if tc.useTimestampBeforeCorruption { - // Get timestamp before corruption - r.QueryRow(t, "SELECT now()::timestamp").Scan(&expectedASOFTime) - expectedASOFTime = expectedASOFTime.UTC() + if tc.useTimestampBeforeCorruption { + // Get timestamp before corruption + r.QueryRow(t, "SELECT now()::timestamp").Scan(&expectedASOFTime) + expectedASOFTime = expectedASOFTime.UTC() - // Sleep for 1 millisecond to ensure corruption happens after timestamp - // This should be long enough to guarantee a different timestamp - time.Sleep(1 * time.Millisecond) - } + // Sleep for 1 millisecond to ensure corruption happens after timestamp + // This should be long enough to guarantee a different timestamp + time.Sleep(1 * time.Millisecond) + } - tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "test", "t") + tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "test", "t") - // Select target index based on corruptionTargetIndex with bounds checking - indexes := tableDesc.PublicNonPrimaryIndexes() - targetIndexPos := tc.corruptionTargetIndex - if targetIndexPos < 0 || targetIndexPos >= len(indexes) { - targetIndexPos = 0 // Default to first index for safety/backward compatibility - } - secIndex := indexes[targetIndexPos] - - // Apply test-specific corruption based on configured selectors: - // - If missingIndexEntrySelector is set, we delete the secondary index entries - // for all rows matching the predicate. This simulates missing index entries - // (i.e., primary rows with no corresponding secondary index key). - // - If danglingIndexEntryInsertQuery is set, we evaluate the query to produce - // synthetic rows and manually insert their secondary index entries without - // inserting matching primary rows. This simulates dangling entries - // (i.e., secondary index keys pointing to non-existent primary keys). - if tc.missingIndexEntrySelector != "" { - rows, err := ie.QueryBufferedEx(ctx, "missing-index-entry-query-filter", nil /* no txn */, sessiondata.NodeUserSessionDataOverride, - `SELECT * FROM test.t WHERE `+tc.missingIndexEntrySelector) - require.NoError(t, err) - require.Greater(t, len(rows), 0, - "filter '%s' matched no rows - check that values exist in table data", - tc.missingIndexEntrySelector) - t.Logf("Corrupting %d rows that match filter by deleting secondary index keys: %s", len(rows), tc.missingIndexEntrySelector) - for _, row := range rows { - err = deleteSecondaryIndexEntry(ctx, codec, row, kvDB, tableDesc, secIndex) - require.NoError(t, err) - } - } - if tc.danglingIndexEntryInsertQuery != "" { - rows, err := ie.QueryBufferedEx(ctx, "dangling-index-insert-query", nil, /* no txn */ - sessiondata.NodeUserSessionDataOverride, tc.danglingIndexEntryInsertQuery) - require.NoError(t, err) - require.Greater(t, len(rows), 0, - "danglingIndexEntryInsertQuery '%s' returned no rows - check query syntax", - tc.danglingIndexEntryInsertQuery) - t.Logf("Corrupting %d rows by inserting keys into secondary index returned by this query: %s", - len(rows), tc.danglingIndexEntryInsertQuery) - for _, row := range rows { - err = insertSecondaryIndexEntry(ctx, codec, row, kvDB, tableDesc, secIndex) - require.NoError(t, err) - } - } - r.Exec(t, - `INSERT INTO test.t (a, b, c, d, e, f) + // Select target index based on corruptionTargetIndex with bounds checking + indexes := tableDesc.PublicNonPrimaryIndexes() + targetIndexPos := tc.corruptionTargetIndex + if targetIndexPos < 0 || targetIndexPos >= len(indexes) { + targetIndexPos = 0 // Default to first index for safety/backward compatibility + } + secIndex := indexes[targetIndexPos] + + // Apply test-specific corruption based on configured selectors: + // - If missingIndexEntrySelector is set, we delete the secondary index entries + // for all rows matching the predicate. This simulates missing index entries + // (i.e., primary rows with no corresponding secondary index key). + // - If danglingIndexEntryInsertQuery is set, we evaluate the query to produce + // synthetic rows and manually insert their secondary index entries without + // inserting matching primary rows. This simulates dangling entries + // (i.e., secondary index keys pointing to non-existent primary keys). + if tc.missingIndexEntrySelector != "" { + rows, err := ie.QueryBufferedEx(ctx, "missing-index-entry-query-filter", nil /* no txn */, sessiondata.NodeUserSessionDataOverride, + `SELECT * FROM test.t WHERE `+tc.missingIndexEntrySelector) + require.NoError(t, err) + require.Greater(t, len(rows), 0, + "filter '%s' matched no rows - check that values exist in table data", + tc.missingIndexEntrySelector) + t.Logf("Corrupting %d rows that match filter by deleting secondary index keys: %s", len(rows), tc.missingIndexEntrySelector) + for _, row := range rows { + err = deleteSecondaryIndexEntry(ctx, codec, row, kvDB, tableDesc, secIndex) + require.NoError(t, err) + } + } + if tc.danglingIndexEntryInsertQuery != "" { + rows, err := ie.QueryBufferedEx(ctx, "dangling-index-insert-query", nil, /* no txn */ + sessiondata.NodeUserSessionDataOverride, tc.danglingIndexEntryInsertQuery) + require.NoError(t, err) + require.Greater(t, len(rows), 0, + "danglingIndexEntryInsertQuery '%s' returned no rows - check query syntax", + tc.danglingIndexEntryInsertQuery) + t.Logf("Corrupting %d rows by inserting keys into secondary index returned by this query: %s", + len(rows), tc.danglingIndexEntryInsertQuery) + for _, row := range rows { + err = insertSecondaryIndexEntry(ctx, codec, row, kvDB, tableDesc, secIndex) + require.NoError(t, err) + } + } + r.Exec(t, + `INSERT INTO test.t (a, b, c, d, e, f) SELECT gs1 AS a, gs1 * 10 AS b, @@ -445,116 +465,118 @@ func TestDetectIndexConsistencyErrors(t *testing.T) { 'e_' || gs1::STRING AS e, gs1 * 1.5 AS f FROM generate_series(1001, 2000) AS gs1;`, - ) + ) - _, err = db.Exec(`SET enable_inspect_command=true`) - require.NoError(t, err) + _, err = db.Exec(`SET enable_inspect_command=true`) + require.NoError(t, err) - // If not using timestamp before corruption, get current timestamp - if !tc.useTimestampBeforeCorruption { - // Convert relative timestamp to absolute timestamp using CRDB - r.QueryRow(t, "SELECT (now() + '-1us')::timestamp").Scan(&expectedASOFTime) - expectedASOFTime = expectedASOFTime.UTC() - } + // If not using timestamp before corruption, get current timestamp + if !tc.useTimestampBeforeCorruption { + // Convert relative timestamp to absolute timestamp using CRDB + r.QueryRow(t, "SELECT (now() + '-1us')::timestamp").Scan(&expectedASOFTime) + expectedASOFTime = expectedASOFTime.UTC() + } - // Use the absolute timestamp in nanoseconds for inspect command - absoluteTimestamp := fmt.Sprintf("'%d'", expectedASOFTime.UnixNano()) - inspectQuery := fmt.Sprintf(`INSPECT TABLE test.t AS OF SYSTEM TIME %s WITH OPTIONS INDEX ALL`, absoluteTimestamp) - _, err = db.Query(inspectQuery) - if tc.expectedErrRegex == "" { - require.NoError(t, err) - require.Equal(t, 0, issueLogger.numIssuesFound()) - return - } + // Use the absolute timestamp in nanoseconds for inspect command + absoluteTimestamp := fmt.Sprintf("'%d'", expectedASOFTime.UnixNano()) + inspectQuery := fmt.Sprintf(`INSPECT TABLE test.t AS OF SYSTEM TIME %s WITH OPTIONS INDEX ALL`, absoluteTimestamp) + _, err = db.Query(inspectQuery) + if tc.expectedErrRegex == "" { + require.NoError(t, err) + require.Equal(t, 0, issueLogger.numIssuesFound()) + return + } - require.Error(t, err) - require.Regexp(t, tc.expectedErrRegex, err.Error()) - var pqErr *pq.Error - require.True(t, errors.As(err, &pqErr), "expected pq.Error, got %T", err) - require.NotEmpty(t, pqErr.Hint, "expected error to have a hint") - require.Regexp(t, "SHOW INSPECT ERRORS FOR JOB [0-9]+ WITH DETAILS", pqErr.Hint) - - numExpected := len(tc.expectedIssues) - numFound := issueLogger.numIssuesFound() - - dumpAllFoundIssues := func() { - t.Log("Dumping all found issues:") - for i := 0; i < numFound; i++ { - t.Logf(" [%d] %s", i, issueLogger.issue(i)) - } - } + require.Error(t, err) + require.Regexp(t, tc.expectedErrRegex, err.Error()) + var pqErr *pq.Error + require.True(t, errors.As(err, &pqErr), "expected pq.Error, got %T", err) + require.NotEmpty(t, pqErr.Hint, "expected error to have a hint") + require.Regexp(t, "SHOW INSPECT ERRORS FOR JOB [0-9]+ WITH DETAILS", pqErr.Hint) - // The number of expected and actual issues must match exactly. - // If they don't, dump all found issues for debugging and fail. - if numExpected != numFound { - t.Logf("Mismatch in number of issues: expected %d, found %d", numExpected, numFound) - dumpAllFoundIssues() - t.Fatalf("expected %d issues, but found %d", numExpected, numFound) - } + numExpected := len(tc.expectedIssues) + numFound := issueLogger.numIssuesFound() - // For each expected issue, ensure it was found. - for i, expectedIssue := range tc.expectedIssues { - foundIssue := issueLogger.findIssue(expectedIssue.ErrorType, expectedIssue.PrimaryKey) - if foundIssue == nil { - t.Logf("Expected issue not found: %q", expectedIssue) - dumpAllFoundIssues() - t.Fatalf("expected issue %d (%q) not found", i, expectedIssue) - } - require.NotEqual(t, 0, foundIssue.DatabaseID, "expected issue to have a database ID: %s", expectedIssue) - require.NotEqual(t, 0, foundIssue.SchemaID, "expected issue to have a schema ID: %s", expectedIssue) - require.NotEqual(t, 0, foundIssue.ObjectID, "expected issue to have an object ID: %s", expectedIssue) - require.Equal(t, expectedASOFTime, foundIssue.AOST.UTC()) - - // Additional validation for internal errors - if foundIssue.ErrorType == "internal_error" { - require.NotNil(t, foundIssue.Details, "internal error should have details") - - // Validate patterns if provided for this specific issue - if tc.expectedInternalErrorPatterns != nil && i < len(tc.expectedInternalErrorPatterns) && - tc.expectedInternalErrorPatterns[i] != nil { - expectedPatterns := tc.expectedInternalErrorPatterns[i] - - // Validate each expected pattern - for detailKey, expectedPattern := range expectedPatterns { - redactableKey := redact.RedactableString(detailKey) - require.Contains(t, foundIssue.Details, redactableKey, "internal error should contain detail key: %s", detailKey) - - detailValue, ok := foundIssue.Details[redactableKey].(string) - require.True(t, ok, "detail value for key %s should be a string", detailKey) - require.Regexp(t, expectedPattern, detailValue, - "detail %s should match pattern %s, got: %s", detailKey, expectedPattern, detailValue) + dumpAllFoundIssues := func() { + t.Log("Dumping all found issues:") + for i := 0; i < numFound; i++ { + t.Logf(" [%d] %s", i, issueLogger.issue(i)) } } - } - // Validate Details if provided in expected issue - if expectedIssue.Details != nil { - require.NotNil(t, foundIssue.Details, "issue should have details when expected") + // The number of expected and actual issues must match exactly. + // If they don't, dump all found issues for debugging and fail. + if numExpected != numFound { + t.Logf("Mismatch in number of issues: expected %d, found %d", numExpected, numFound) + dumpAllFoundIssues() + t.Fatalf("expected %d issues, but found %d", numExpected, numFound) + } + + // For each expected issue, ensure it was found. + for i, expectedIssue := range tc.expectedIssues { + foundIssue := issueLogger.findIssue(expectedIssue.ErrorType, expectedIssue.PrimaryKey) + if foundIssue == nil { + t.Logf("Expected issue not found: %q", expectedIssue) + dumpAllFoundIssues() + t.Fatalf("expected issue %d (%q) not found", i, expectedIssue) + } + require.NotEqual(t, 0, foundIssue.DatabaseID, "expected issue to have a database ID: %s", expectedIssue) + require.NotEqual(t, 0, foundIssue.SchemaID, "expected issue to have a schema ID: %s", expectedIssue) + require.NotEqual(t, 0, foundIssue.ObjectID, "expected issue to have an object ID: %s", expectedIssue) + require.Equal(t, expectedASOFTime, foundIssue.AOST.UTC()) + + // Additional validation for internal errors + if foundIssue.ErrorType == "internal_error" { + require.NotNil(t, foundIssue.Details, "internal error should have details") + + // Validate patterns if provided for this specific issue + if tc.expectedInternalErrorPatterns != nil && i < len(tc.expectedInternalErrorPatterns) && + tc.expectedInternalErrorPatterns[i] != nil { + expectedPatterns := tc.expectedInternalErrorPatterns[i] + + // Validate each expected pattern + for detailKey, expectedPattern := range expectedPatterns { + redactableKey := redact.RedactableString(detailKey) + require.Contains(t, foundIssue.Details, redactableKey, "internal error should contain detail key: %s", detailKey) + + detailValue, ok := foundIssue.Details[redactableKey].(string) + require.True(t, ok, "detail value for key %s should be a string", detailKey) + require.Regexp(t, expectedPattern, detailValue, + "detail %s should match pattern %s, got: %s", detailKey, expectedPattern, detailValue) + } + } + } + + // Validate Details if provided in expected issue + if expectedIssue.Details != nil { + require.NotNil(t, foundIssue.Details, "issue should have details when expected") - // Check that all expected detail keys and values match - for expectedKey, expectedValue := range expectedIssue.Details { - require.Contains(t, foundIssue.Details, expectedKey, - "issue should contain detail key: %s", expectedKey) + // Check that all expected detail keys and values match + for expectedKey, expectedValue := range expectedIssue.Details { + require.Contains(t, foundIssue.Details, expectedKey, + "issue should contain detail key: %s", expectedKey) - actualValue := foundIssue.Details[expectedKey] - require.Equal(t, expectedValue, actualValue, - "detail %s should be %v, got %v", expectedKey, expectedValue, actualValue) + actualValue := foundIssue.Details[expectedKey] + require.Equal(t, expectedValue, actualValue, + "detail %s should be %v, got %v", expectedKey, expectedValue, actualValue) + } + } } - } - } - // Validate job status matches expected outcome - var jobID int64 - var jobStatus string - var fractionCompleted float64 - r.QueryRow(t, `SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobID, &jobStatus, &fractionCompleted) - - if tc.expectedErrRegex == "" { - require.Equal(t, "succeeded", jobStatus, "expected job to succeed when no issues found") - require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion") - requireCheckCountsMatch(t, r, jobID) - } else { - require.Equal(t, "failed", jobStatus, "expected job to fail when inconsistencies found") + // Validate job status matches expected outcome + var jobID int64 + var jobStatus string + var fractionCompleted float64 + r.QueryRow(t, `SELECT job_id, status, fraction_completed FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY job_id DESC LIMIT 1`).Scan(&jobID, &jobStatus, &fractionCompleted) + + if tc.expectedErrRegex == "" { + require.Equal(t, "succeeded", jobStatus, "expected job to succeed when no issues found") + require.InEpsilon(t, 1.0, fractionCompleted, 0.01, "progress should reach 100%% on successful completion") + requireCheckCountsMatch(t, r, jobID) + } else { + require.Equal(t, "failed", jobStatus, "expected job to fail when inconsistencies found") + } + }) } }) }