diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index f6bf561990ab..63668882c18e 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -1546,6 +1546,10 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op if opStmt.expectedExecErrors.empty() { opStmt.potentialExecErrors.merge(getValidGenerationErrors()) } + // Limit any CTAS statements to a maximum time of 1 minute. + opStmt.statementTimeout = time.Minute + opStmt.potentialExecErrors.add(pgcode.QueryCanceled) + og.potentialCommitErrors.add(pgcode.QueryCanceled) opStmt.sql = fmt.Sprintf(`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY`, destTableName, selectStatement.String(), MaxRowsToConsume) @@ -3052,6 +3056,8 @@ type opStmt struct { // potentialExecErrors errors that could be potentially seen on execution. potentialExecErrors errorCodeSet queryResultCallback opStmtQueryResultCallback + // statementTimeout if this statement has a timeout. + statementTimeout time.Duration } // String implements Stringer @@ -3168,6 +3174,16 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error { func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenerator) error { var err error var rows pgx.Rows + // Apply any timeout for this statement + if s.statementTimeout > 0 { + _, err = tx.Exec(ctx, fmt.Sprintf("SET LOCAL statement_timeout='%s'", s.statementTimeout.String())) + if err != nil { + return errors.Mark( + og.WrapWithErrorState(errors.Wrap(err, "***UNEXPECTED ERROR; Unable to set statement timeout."), s), + errRunInTxnFatalSentinel, + ) + } + } // Statement doesn't produce any result set that needs to be validated. if s.queryResultCallback == nil { _, err = tx.Exec(ctx, s.sql) @@ -3241,6 +3257,16 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera return err } } + // Reset any timeout for this statement + if s.statementTimeout > 0 { + _, err = tx.Exec(ctx, "SET LOCAL statement_timeout=0") + if err != nil { + return errors.Mark( + og.WrapWithErrorState(errors.Wrap(err, "***UNEXPECTED ERROR; Unable to reset statement timeout."), s), + errRunInTxnFatalSentinel, + ) + } + } return nil }