|
27 | 27 | import io.r2dbc.spi.IsolationLevel; |
28 | 28 | import io.r2dbc.spi.Lifecycle; |
29 | 29 | import io.r2dbc.spi.R2dbcException; |
| 30 | +import io.r2dbc.spi.Result; |
30 | 31 | import io.r2dbc.spi.Statement; |
31 | 32 | import io.r2dbc.spi.TransactionDefinition; |
32 | 33 | import io.r2dbc.spi.ValidationDepth; |
33 | 34 | import org.reactivestreams.Publisher; |
| 35 | +import reactor.core.publisher.Flux; |
34 | 36 | import reactor.core.publisher.Mono; |
35 | 37 |
|
36 | 38 | import java.sql.SQLException; |
| 39 | +import java.sql.Savepoint; |
37 | 40 | import java.time.Duration; |
38 | 41 |
|
39 | 42 | import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED; |
@@ -475,21 +478,23 @@ public ConnectionMetadata getMetadata() { |
475 | 478 | /** |
476 | 479 | * {@inheritDoc} |
477 | 480 | * <p> |
478 | | - * This SPI method is not yet implemented. |
| 481 | + * This SPI method is implemented to execute a "SAVEPOINT ..." command. |
| 482 | + * This is the same as how Oracle JDBC implements |
| 483 | + * {@link java.sql.Connection#setSavepoint(String)}, except that JDBC uses a |
| 484 | + * blocking call to {@code java.sql.Statement.executeUpdate(String)}. This |
| 485 | + * method uses a non-blocking call. |
479 | 486 | * </p> |
480 | | - * @throws UnsupportedOperationException In this release of Oracle |
481 | | - * R2DBC |
482 | 487 | * @throws IllegalStateException If this {@code Connection} is closed |
483 | 488 | */ |
484 | 489 | @Override |
485 | 490 | public Publisher<Void> createSavepoint(String name) { |
486 | 491 | requireNonNull(name, "name is null"); |
487 | 492 | requireOpenConnection(jdbcConnection); |
488 | | - // TODO: Execute SQL to create a savepoint. Examine and understand the |
489 | | - // Oracle JDBC driver's implementation of |
490 | | - // OracleConnection.oracleSetSavepoint(), and replicate it without |
491 | | - // blocking a thread. Consider adding a ReactiveJDBCAdapter API to do this. |
492 | | - throw new UnsupportedOperationException("createSavepoint not supported"); |
| 493 | + return Mono.from(setAutoCommit(false)) |
| 494 | + .then(Flux.from(createStatement("SAVEPOINT " + name) |
| 495 | + .execute()) |
| 496 | + .flatMap(Result::getRowsUpdated) |
| 497 | + .then()); |
493 | 498 | } |
494 | 499 |
|
495 | 500 | /** |
@@ -535,20 +540,22 @@ public Publisher<Void> rollbackTransaction() { |
535 | 540 | /** |
536 | 541 | * {@inheritDoc} |
537 | 542 | * <p> |
538 | | - * This SPI method is not yet implemented. |
| 543 | + * This SPI method is implemented to execute a "ROLLBACK TO " command. This |
| 544 | + * is the same as how Oracle JDBC implements |
| 545 | + * {@link java.sql.Connection#rollback(Savepoint)}, except that JDBC uses a |
| 546 | + * blocking call to {@code java.sql.Statement.executeUpdate(String)}. This |
| 547 | + * method uses a non-blocking call. |
539 | 548 | * </p> |
540 | 549 | * @throws IllegalStateException If this {@code Connection} is closed |
541 | | - * @throws UnsupportedOperationException In version this release of Oracle |
542 | | - * R2DBC |
543 | 550 | */ |
544 | 551 | @Override |
545 | 552 | public Publisher<Void> rollbackTransactionToSavepoint(String name) { |
546 | 553 | requireNonNull(name, "name is null"); |
547 | 554 | requireOpenConnection(jdbcConnection); |
548 | | - // TODO: Use the JDBC connection to rollback to a savepoint without blocking |
549 | | - // a thread. |
550 | | - throw new UnsupportedOperationException( |
551 | | - "rollbackTransactionToSavepoint not supported"); |
| 555 | + return Flux.from(createStatement("ROLLBACK TO " + name) |
| 556 | + .execute()) |
| 557 | + .flatMap(Result::getRowsUpdated) |
| 558 | + .then(); |
552 | 559 | } |
553 | 560 |
|
554 | 561 | /** |
|
0 commit comments