From ceab3560af949e0805140f5c932775aba5555082 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 24 Jun 2025 19:43:10 +0000 Subject: [PATCH] workload/schemachanger: apply statement_timeout out to CTAS Previously, the schema changer workload never used statement_timeouts since schema changes are normally not predictable operations. However, since we added CREATE TABLE AS support, its possible to have multiple joins where a CREATE TABLE AS can take more then 5 minutes. This patch modifies the operation generator support adding statement timeouts for individual statements. Fixes: #148342 Release note: None --- .../schemachange/operation_generator.go | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index 985f14d6138e..d5dfa9e5d9d9 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -1496,6 +1496,10 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op if opStmt.expectedExecErrors.empty() { opStmt.potentialExecErrors.merge(getValidGenerationErrors()) } + // Limit any CTAS statements to a maximum time of 1 minute. + opStmt.statementTimeout = time.Minute + opStmt.potentialExecErrors.add(pgcode.QueryCanceled) + og.potentialCommitErrors.add(pgcode.QueryCanceled) opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`, destTableName, selectStatement.String(), MaxRowsToConsume) @@ -3094,6 +3098,8 @@ type opStmt struct { potentialExecErrors errorCodeSet // queryResultCallback handles the results of the query execution. queryResultCallback opStmtQueryResultCallback + // statementTimeout if this statement has a timeout. + statementTimeout time.Duration } // String implements Stringer @@ -3210,6 +3216,16 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error { func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenerator) error { var err error var rows pgx.Rows + // Apply any timeout for this statement + if s.statementTimeout > 0 { + _, err = tx.Exec(ctx, fmt.Sprintf("SET LOCAL statement_timeout='%s'", s.statementTimeout.String())) + if err != nil { + return errors.Mark( + og.WrapWithErrorState(errors.Wrap(err, "***UNEXPECTED ERROR; Unable to set statement timeout."), s), + errRunInTxnFatalSentinel, + ) + } + } // Statement doesn't produce any result set that needs to be validated. if s.queryResultCallback == nil { _, err = tx.Exec(ctx, s.sql) @@ -3283,6 +3299,16 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera return err } } + // Reset any timeout for this statement + if s.statementTimeout > 0 { + _, err = tx.Exec(ctx, "SET LOCAL statement_timeout=0") + if err != nil { + return errors.Mark( + og.WrapWithErrorState(errors.Wrap(err, "***UNEXPECTED ERROR; Unable to reset statement timeout."), s), + errRunInTxnFatalSentinel, + ) + } + } return nil }