Skip to content

Commit 393b731

Browse files
abonandermpywbonsairobo
authored
Merge of #3427 (by @mpyw) and #3614 (by @bonsairobo) (#3765)
* feat: Implement `get_transaction_depth` for drivers * test: Verify `get_transaction_depth()` on postgres * Refactor: `TransactionManager` delegation without BC SQLite implementation is currently WIP * Fix: Avoid breaking changes on `AnyConnectionBackend` * Refactor: Remove verbose `SqliteConnection` typing * Feat: Implementation for SQLite I have included `AtomicUsize` in `WorkerSharedState`. Ideally, it is not desirable to execute `load` and `fetch_add` in two separate steps, but we decided to allow it here since there is only one thread writing. To prevent writing from other threads, the field itself was made private, and a getter method was provided with `pub(crate)`. * Refactor: Same approach for `cached_statements_size` ref: a66787d * Fix: Add missing `is_in_transaction` for backend * Doc: Remove verbose "synchronously" word * Fix: Remove useless `mut` qualifier * feat: add Connection::begin_with This patch completes the plumbing of an optional statement from these methods to `TransactionManager::begin` without any validation of the provided statement. There is a new `Error::InvalidSavePoint` which is triggered by any attempt to call `Connection::begin_with` when we are already inside of a transaction. * feat: add Pool::begin_with and Pool::try_begin_with * feat: add Error::BeginFailed and validate that custom "begin" statements are successful * chore: add tests of Error::BeginFailed * chore: add tests of Error::InvalidSavePointStatement * chore: test begin_with works for all SQLite "BEGIN" statements * chore: improve comment on Connection::begin_with * feat: add default impl of `Connection::begin_with` This makes the new method a non-breaking change. * refactor: combine if statement + unwrap_or_else into one match * feat: use in-memory SQLite DB to avoid conflicts across tests run in parallel * feedback: remove public wrapper for sqlite3_txn_state Move the wrapper directly into the test that uses it instead. * fix: cache Status on MySqlConnection * fix: compilation errors * fix: format * fix: postgres test * refactor: delete `Connection::get_transaction_depth` * fix: tests --------- Co-authored-by: mpyw <[email protected]> Co-authored-by: Duncan Fairbanks <[email protected]>
1 parent 2f10c29 commit 393b731

File tree

29 files changed

+494
-61
lines changed

29 files changed

+494
-61
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ rand_xoshiro = "0.6.0"
189189
hex = "0.4.3"
190190
tempfile = "3.10.1"
191191
criterion = { version = "0.5.1", features = ["async_tokio"] }
192+
libsqlite3-sys = { version = "0.30.1" }
192193

193194
# If this is an unconditional dev-dependency then Cargo will *always* try to build `libsqlite3-sys`,
194195
# even when SQLite isn't the intended test target, and fail if the build environment is not set up for compiling C code.

sqlx-core/src/acquire.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl<'a, DB: Database> Acquire<'a> for &'_ Pool<DB> {
9393
let conn = self.acquire();
9494

9595
Box::pin(async move {
96-
Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?)).await
96+
Transaction::begin(MaybePoolConnection::PoolConnection(conn.await?), None).await
9797
})
9898
}
9999
}
@@ -121,7 +121,7 @@ macro_rules! impl_acquire {
121121
'c,
122122
Result<$crate::transaction::Transaction<'c, $DB>, $crate::error::Error>,
123123
> {
124-
$crate::transaction::Transaction::begin(self)
124+
$crate::transaction::Transaction::begin(self, None)
125125
}
126126
}
127127
};

sqlx-core/src/any/connection/backend.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::describe::Describe;
33
use either::Either;
44
use futures_core::future::BoxFuture;
55
use futures_core::stream::BoxStream;
6+
use std::borrow::Cow;
67
use std::fmt::Debug;
78

89
pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
@@ -26,14 +27,40 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
2627
fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>>;
2728

2829
/// Begin a new transaction or establish a savepoint within the active transaction.
29-
fn begin(&mut self) -> BoxFuture<'_, crate::Result<()>>;
30+
///
31+
/// If this is a new transaction, `statement` may be used instead of the
32+
/// default "BEGIN" statement.
33+
///
34+
/// If we are already inside a transaction and `statement.is_some()`, then
35+
/// `Error::InvalidSavePoint` is returned without running any statements.
36+
fn begin(&mut self, statement: Option<Cow<'static, str>>) -> BoxFuture<'_, crate::Result<()>>;
3037

3138
fn commit(&mut self) -> BoxFuture<'_, crate::Result<()>>;
3239

3340
fn rollback(&mut self) -> BoxFuture<'_, crate::Result<()>>;
3441

3542
fn start_rollback(&mut self);
3643

44+
/// Returns the current transaction depth.
45+
///
46+
/// Transaction depth indicates the level of nested transactions:
47+
/// - Level 0: No active transaction.
48+
/// - Level 1: A transaction is active.
49+
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
50+
fn get_transaction_depth(&self) -> usize {
51+
unimplemented!("get_transaction_depth() is not implemented for this backend. This is a provided method to avoid a breaking change, but it will become a required method in version 0.9 and later.");
52+
}
53+
54+
/// Checks if the connection is currently in a transaction.
55+
///
56+
/// This method returns `true` if the current transaction depth is greater than 0,
57+
/// indicating that a transaction is active. It returns `false` if the transaction depth is 0,
58+
/// meaning no transaction is active.
59+
#[inline]
60+
fn is_in_transaction(&self) -> bool {
61+
self.get_transaction_depth() != 0
62+
}
63+
3764
/// The number of statements currently cached in the connection.
3865
fn cached_statements_size(&self) -> usize {
3966
0

sqlx-core/src/any/connection/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use futures_core::future::BoxFuture;
2+
use std::borrow::Cow;
23

34
use crate::any::{Any, AnyConnectOptions};
45
use crate::connection::{ConnectOptions, Connection};
@@ -87,7 +88,17 @@ impl Connection for AnyConnection {
8788
where
8889
Self: Sized,
8990
{
90-
Transaction::begin(self)
91+
Transaction::begin(self, None)
92+
}
93+
94+
fn begin_with(
95+
&mut self,
96+
statement: impl Into<Cow<'static, str>>,
97+
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
98+
where
99+
Self: Sized,
100+
{
101+
Transaction::begin(self, Some(statement.into()))
91102
}
92103

93104
fn cached_statements_size(&self) -> usize {

sqlx-core/src/any/transaction.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use futures_util::future::BoxFuture;
2+
use std::borrow::Cow;
23

34
use crate::any::{Any, AnyConnection};
5+
use crate::database::Database;
46
use crate::error::Error;
57
use crate::transaction::TransactionManager;
68

@@ -9,8 +11,11 @@ pub struct AnyTransactionManager;
911
impl TransactionManager for AnyTransactionManager {
1012
type Database = Any;
1113

12-
fn begin(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> {
13-
conn.backend.begin()
14+
fn begin<'conn>(
15+
conn: &'conn mut AnyConnection,
16+
statement: Option<Cow<'static, str>>,
17+
) -> BoxFuture<'conn, Result<(), Error>> {
18+
conn.backend.begin(statement)
1419
}
1520

1621
fn commit(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> {
@@ -24,4 +29,8 @@ impl TransactionManager for AnyTransactionManager {
2429
fn start_rollback(conn: &mut AnyConnection) {
2530
conn.backend.start_rollback()
2631
}
32+
33+
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
34+
conn.backend.get_transaction_depth()
35+
}
2736
}

sqlx-core/src/connection.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::database::{Database, HasStatementCache};
22
use crate::error::Error;
33

4-
use crate::transaction::Transaction;
4+
use crate::transaction::{Transaction, TransactionManager};
55
use futures_core::future::BoxFuture;
66
use log::LevelFilter;
7+
use std::borrow::Cow;
78
use std::fmt::Debug;
89
use std::str::FromStr;
910
use std::time::Duration;
@@ -49,6 +50,33 @@ pub trait Connection: Send {
4950
where
5051
Self: Sized;
5152

53+
/// Begin a new transaction with a custom statement.
54+
///
55+
/// Returns a [`Transaction`] for controlling and tracking the new transaction.
56+
///
57+
/// Returns an error if the connection is already in a transaction or if
58+
/// `statement` does not put the connection into a transaction.
59+
fn begin_with(
60+
&mut self,
61+
statement: impl Into<Cow<'static, str>>,
62+
) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
63+
where
64+
Self: Sized,
65+
{
66+
Transaction::begin(self, Some(statement.into()))
67+
}
68+
69+
/// Returns `true` if the connection is currently in a transaction.
70+
///
71+
/// # Note: Automatic Rollbacks May Not Be Counted
72+
/// Certain database errors (such as a serializable isolation failure)
73+
/// can cause automatic rollbacks of a transaction
74+
/// which may not be indicated in the return value of this method.
75+
#[inline]
76+
fn is_in_transaction(&self) -> bool {
77+
<Self::Database as Database>::TransactionManager::get_transaction_depth(self) != 0
78+
}
79+
5280
/// Execute the function inside a transaction.
5381
///
5482
/// If the function returns an error, the transaction will be rolled back. If it does not

sqlx-core/src/error.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ pub enum Error {
117117
#[cfg(feature = "migrate")]
118118
#[error("{0}")]
119119
Migrate(#[source] Box<crate::migrate::MigrateError>),
120+
121+
#[error("attempted to call begin_with at non-zero transaction depth")]
122+
InvalidSavePointStatement,
123+
124+
#[error("got unexpected connection status after attempting to begin transaction")]
125+
BeginFailed,
120126
}
121127

122128
impl StdError for Box<dyn DatabaseError> {}

sqlx-core/src/pool/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB
191191
self,
192192
) -> futures_core::future::BoxFuture<'c, Result<crate::transaction::Transaction<'c, DB>, Error>>
193193
{
194-
crate::transaction::Transaction::begin(&mut **self)
194+
crate::transaction::Transaction::begin(&mut **self, None)
195195
}
196196
}
197197

sqlx-core/src/pool/mod.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
//! [`Pool::acquire`] or
5555
//! [`Pool::begin`].
5656
57+
use std::borrow::Cow;
5758
use std::fmt;
5859
use std::future::Future;
5960
use std::pin::{pin, Pin};
@@ -368,20 +369,54 @@ impl<DB: Database> Pool<DB> {
368369

369370
/// Retrieves a connection and immediately begins a new transaction.
370371
pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error> {
371-
Transaction::begin(MaybePoolConnection::PoolConnection(self.acquire().await?)).await
372+
Transaction::begin(
373+
MaybePoolConnection::PoolConnection(self.acquire().await?),
374+
None,
375+
)
376+
.await
372377
}
373378

374379
/// Attempts to retrieve a connection and immediately begins a new transaction if successful.
375380
pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error> {
376381
match self.try_acquire() {
377-
Some(conn) => Transaction::begin(MaybePoolConnection::PoolConnection(conn))
382+
Some(conn) => Transaction::begin(MaybePoolConnection::PoolConnection(conn), None)
378383
.await
379384
.map(Some),
380385

381386
None => Ok(None),
382387
}
383388
}
384389

390+
/// Retrieves a connection and immediately begins a new transaction using `statement`.
391+
pub async fn begin_with(
392+
&self,
393+
statement: impl Into<Cow<'static, str>>,
394+
) -> Result<Transaction<'static, DB>, Error> {
395+
Transaction::begin(
396+
MaybePoolConnection::PoolConnection(self.acquire().await?),
397+
Some(statement.into()),
398+
)
399+
.await
400+
}
401+
402+
/// Attempts to retrieve a connection and, if successful, immediately begins a new
403+
/// transaction using `statement`.
404+
pub async fn try_begin_with(
405+
&self,
406+
statement: impl Into<Cow<'static, str>>,
407+
) -> Result<Option<Transaction<'static, DB>>, Error> {
408+
match self.try_acquire() {
409+
Some(conn) => Transaction::begin(
410+
MaybePoolConnection::PoolConnection(conn),
411+
Some(statement.into()),
412+
)
413+
.await
414+
.map(Some),
415+
416+
None => Ok(None),
417+
}
418+
}
419+
385420
/// Shut down the connection pool, immediately waking all tasks waiting for a connection.
386421
///
387422
/// Upon calling this method, any currently waiting or subsequent calls to [`Pool::acquire`] and

sqlx-core/src/transaction.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,16 @@ pub trait TransactionManager {
1616
type Database: Database;
1717

1818
/// Begin a new transaction or establish a savepoint within the active transaction.
19-
fn begin(
20-
conn: &mut <Self::Database as Database>::Connection,
21-
) -> BoxFuture<'_, Result<(), Error>>;
19+
///
20+
/// If this is a new transaction, `statement` may be used instead of the
21+
/// default "BEGIN" statement.
22+
///
23+
/// If we are already inside a transaction and `statement.is_some()`, then
24+
/// `Error::InvalidSavePoint` is returned without running any statements.
25+
fn begin<'conn>(
26+
conn: &'conn mut <Self::Database as Database>::Connection,
27+
statement: Option<Cow<'static, str>>,
28+
) -> BoxFuture<'conn, Result<(), Error>>;
2229

2330
/// Commit the active transaction or release the most recent savepoint.
2431
fn commit(
@@ -32,6 +39,14 @@ pub trait TransactionManager {
3239

3340
/// Starts to abort the active transaction or restore from the most recent snapshot.
3441
fn start_rollback(conn: &mut <Self::Database as Database>::Connection);
42+
43+
/// Returns the current transaction depth.
44+
///
45+
/// Transaction depth indicates the level of nested transactions:
46+
/// - Level 0: No active transaction.
47+
/// - Level 1: A transaction is active.
48+
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
49+
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
3550
}
3651

3752
/// An in-progress database transaction or savepoint.
@@ -83,11 +98,12 @@ where
8398
#[doc(hidden)]
8499
pub fn begin(
85100
conn: impl Into<MaybePoolConnection<'c, DB>>,
101+
statement: Option<Cow<'static, str>>,
86102
) -> BoxFuture<'c, Result<Self, Error>> {
87103
let mut conn = conn.into();
88104

89105
Box::pin(async move {
90-
DB::TransactionManager::begin(&mut conn).await?;
106+
DB::TransactionManager::begin(&mut conn, statement).await?;
91107

92108
Ok(Self {
93109
connection: conn,
@@ -237,7 +253,7 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'
237253

238254
#[inline]
239255
fn begin(self) -> BoxFuture<'t, Result<Transaction<'t, DB>, Error>> {
240-
Transaction::begin(&mut **self)
256+
Transaction::begin(&mut **self, None)
241257
}
242258
}
243259

sqlx-mysql/src/any.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use sqlx_core::database::Database;
1616
use sqlx_core::describe::Describe;
1717
use sqlx_core::executor::Executor;
1818
use sqlx_core::transaction::TransactionManager;
19+
use std::borrow::Cow;
1920
use std::{future, pin::pin};
2021

2122
sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
@@ -37,8 +38,11 @@ impl AnyConnectionBackend for MySqlConnection {
3738
Connection::ping(self)
3839
}
3940

40-
fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
41-
MySqlTransactionManager::begin(self)
41+
fn begin(
42+
&mut self,
43+
statement: Option<Cow<'static, str>>,
44+
) -> BoxFuture<'_, sqlx_core::Result<()>> {
45+
MySqlTransactionManager::begin(self, statement)
4246
}
4347

4448
fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
@@ -53,6 +57,10 @@ impl AnyConnectionBackend for MySqlConnection {
5357
MySqlTransactionManager::start_rollback(self)
5458
}
5559

60+
fn get_transaction_depth(&self) -> usize {
61+
MySqlTransactionManager::get_transaction_depth(self)
62+
}
63+
5664
fn shrink_buffers(&mut self) {
5765
Connection::shrink_buffers(self);
5866
}

sqlx-mysql/src/connection/establish.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ impl MySqlConnection {
2727
inner: Box::new(MySqlConnectionInner {
2828
stream,
2929
transaction_depth: 0,
30+
status_flags: Default::default(),
3031
cache_statement: StatementCache::new(options.statement_cache_capacity),
3132
log_settings: options.log_settings.clone(),
3233
}),

sqlx-mysql/src/connection/executor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ impl MySqlConnection {
166166
// this indicates either a successful query with no rows at all or a failed query
167167
let ok = packet.ok()?;
168168

169+
self.inner.status_flags = ok.status;
170+
169171
let rows_affected = ok.affected_rows;
170172
logger.increase_rows_affected(rows_affected);
171173
let done = MySqlQueryResult {
@@ -208,6 +210,8 @@ impl MySqlConnection {
208210
if packet[0] == 0xfe && packet.len() < 9 {
209211
let eof = packet.eof(self.inner.stream.capabilities)?;
210212

213+
self.inner.status_flags = eof.status;
214+
211215
r#yield!(Either::Left(MySqlQueryResult {
212216
rows_affected: 0,
213217
last_insert_id: 0,

0 commit comments

Comments
 (0)