@@ -1496,6 +1496,10 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
1496
1496
if opStmt .expectedExecErrors .empty () {
1497
1497
opStmt .potentialExecErrors .merge (getValidGenerationErrors ())
1498
1498
}
1499
+ // Limit any CTAS statements to a maximum time of 1 minute.
1500
+ opStmt .statementTimeout = time .Minute
1501
+ opStmt .potentialExecErrors .add (pgcode .QueryCanceled )
1502
+ og .potentialCommitErrors .add (pgcode .QueryCanceled )
1499
1503
1500
1504
opStmt .sql = fmt .Sprintf (`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY` ,
1501
1505
destTableName , selectStatement .String (), MaxRowsToConsume )
@@ -3094,6 +3098,8 @@ type opStmt struct {
3094
3098
potentialExecErrors errorCodeSet
3095
3099
// queryResultCallback handles the results of the query execution.
3096
3100
queryResultCallback opStmtQueryResultCallback
3101
+ // statementTimeout if this statement has a timeout.
3102
+ statementTimeout time.Duration
3097
3103
}
3098
3104
3099
3105
// String implements Stringer
@@ -3210,6 +3216,16 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error {
3210
3216
func (s * opStmt ) executeStmt (ctx context.Context , tx pgx.Tx , og * operationGenerator ) error {
3211
3217
var err error
3212
3218
var rows pgx.Rows
3219
+ // Apply any timeout for this statement
3220
+ if s .statementTimeout > 0 {
3221
+ _ , err = tx .Exec (ctx , fmt .Sprintf ("SET LOCAL statement_timeout='%s'" , s .statementTimeout .String ()))
3222
+ if err != nil {
3223
+ return errors .Mark (
3224
+ og .WrapWithErrorState (errors .Wrap (err , "***UNEXPECTED ERROR; Unable to set statement timeout." ), s ),
3225
+ errRunInTxnFatalSentinel ,
3226
+ )
3227
+ }
3228
+ }
3213
3229
// Statement doesn't produce any result set that needs to be validated.
3214
3230
if s .queryResultCallback == nil {
3215
3231
_ , err = tx .Exec (ctx , s .sql )
@@ -3283,6 +3299,16 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera
3283
3299
return err
3284
3300
}
3285
3301
}
3302
+ // Reset any timeout for this statement
3303
+ if s .statementTimeout > 0 {
3304
+ _ , err = tx .Exec (ctx , "SET LOCAL statement_timeout=0" )
3305
+ if err != nil {
3306
+ return errors .Mark (
3307
+ og .WrapWithErrorState (errors .Wrap (err , "***UNEXPECTED ERROR; Unable to reset statement timeout." ), s ),
3308
+ errRunInTxnFatalSentinel ,
3309
+ )
3310
+ }
3311
+ }
3286
3312
return nil
3287
3313
}
3288
3314
0 commit comments