@@ -1497,6 +1497,10 @@ func (og *operationGenerator) createTableAs(ctx context.Context, tx pgx.Tx) (*op
1497
1497
if opStmt .expectedExecErrors .empty () {
1498
1498
opStmt .potentialExecErrors .merge (getValidGenerationErrors ())
1499
1499
}
1500
+ // Limit any CTAS statements to a maximum time of 1 minute.
1501
+ opStmt .statementTimeout = time .Minute
1502
+ opStmt .potentialExecErrors .add (pgcode .QueryCanceled )
1503
+ og .potentialCommitErrors .add (pgcode .QueryCanceled )
1500
1504
1501
1505
opStmt .sql = fmt .Sprintf (`CREATE TABLE %s AS %s FETCH FIRST %d ROWS ONLY` ,
1502
1506
destTableName , selectStatement .String (), MaxRowsToConsume )
@@ -3091,6 +3095,8 @@ type opStmt struct {
3091
3095
// potentialExecErrors errors that could be potentially seen on execution.
3092
3096
potentialExecErrors errorCodeSet
3093
3097
queryResultCallback opStmtQueryResultCallback
3098
+ // statementTimeout if this statement has a timeout.
3099
+ statementTimeout time.Duration
3094
3100
}
3095
3101
3096
3102
// String implements Stringer
@@ -3207,6 +3213,16 @@ func (og *operationGenerator) WrapWithErrorState(err error, op *opStmt) error {
3207
3213
func (s * opStmt ) executeStmt (ctx context.Context , tx pgx.Tx , og * operationGenerator ) error {
3208
3214
var err error
3209
3215
var rows pgx.Rows
3216
+ // Apply any timeout for this statement
3217
+ if s .statementTimeout > 0 {
3218
+ _ , err = tx .Exec (ctx , fmt .Sprintf ("SET LOCAL statement_timeout='%s'" , s .statementTimeout .String ()))
3219
+ if err != nil {
3220
+ return errors .Mark (
3221
+ og .WrapWithErrorState (errors .Wrap (err , "***UNEXPECTED ERROR; Unable to set statement timeout." ), s ),
3222
+ errRunInTxnFatalSentinel ,
3223
+ )
3224
+ }
3225
+ }
3210
3226
// Statement doesn't produce any result set that needs to be validated.
3211
3227
if s .queryResultCallback == nil {
3212
3228
_ , err = tx .Exec (ctx , s .sql )
@@ -3280,6 +3296,16 @@ func (s *opStmt) executeStmt(ctx context.Context, tx pgx.Tx, og *operationGenera
3280
3296
return err
3281
3297
}
3282
3298
}
3299
+ // Reset any timeout for this statement
3300
+ if s .statementTimeout > 0 {
3301
+ _ , err = tx .Exec (ctx , "SET LOCAL statement_timeout=0" )
3302
+ if err != nil {
3303
+ return errors .Mark (
3304
+ og .WrapWithErrorState (errors .Wrap (err , "***UNEXPECTED ERROR; Unable to reset statement timeout." ), s ),
3305
+ errRunInTxnFatalSentinel ,
3306
+ )
3307
+ }
3308
+ }
3283
3309
return nil
3284
3310
}
3285
3311
0 commit comments