Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions core/src/database/postgres/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,23 @@ pub enum PostgresConnectionError {

#[derive(thiserror::Error, Debug)]
pub enum PostgresError {
#[error("PgError {0}")]
PgError(#[from] PgError),
#[error("Postgres database error: {0}")]
PgError(String),

#[error("Connection pool error: {0}")]
ConnectionPoolError(#[from] RunError<tokio_postgres::Error>),
}

impl From<tokio_postgres::error::Error> for PostgresError {
fn from(err: PgError) -> Self {
if let Some(db_err) = err.as_db_error() {
PostgresError::PgError(db_err.message().to_string())
} else {
PostgresError::PgError(err.to_string())
}
}
}

#[allow(unused)]
pub struct PostgresTransaction<'a> {
pub transaction: PgTransaction<'a>,
Expand All @@ -66,17 +76,17 @@ impl PostgresTransaction<'_> {
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> Result<u64, PostgresError> {
self.transaction.execute(query, params).await.map_err(PostgresError::PgError)
self.transaction.execute(query, params).await.map_err(PostgresError::from)
}

#[allow(unused)]
pub async fn commit(self) -> Result<(), PostgresError> {
self.transaction.commit().await.map_err(PostgresError::PgError)
self.transaction.commit().await.map_err(PostgresError::from)
}

#[allow(unused)]
pub async fn rollback(self) -> Result<(), PostgresError> {
self.transaction.rollback().await.map_err(PostgresError::PgError)
self.transaction.rollback().await.map_err(PostgresError::from)
}
}

Expand Down Expand Up @@ -170,7 +180,7 @@ impl PostgresClient {

pub async fn batch_execute(&self, sql: &str) -> Result<(), PostgresError> {
let conn = self.pool.get().await?;
conn.batch_execute(sql).await.map_err(PostgresError::PgError)
conn.batch_execute(sql).await.map_err(PostgresError::from)
}

pub async fn execute<T>(
Expand All @@ -182,7 +192,7 @@ impl PostgresClient {
T: ?Sized + ToStatement,
{
let conn = self.pool.get().await?;
conn.execute(query, params).await.map_err(PostgresError::PgError)
conn.execute(query, params).await.map_err(PostgresError::from)
}

pub async fn prepare(
Expand All @@ -191,7 +201,7 @@ impl PostgresClient {
parameter_types: &[PgType],
) -> Result<Statement, PostgresError> {
let conn = self.pool.get().await?;
conn.prepare_typed(query, parameter_types).await.map_err(PostgresError::PgError)
conn.prepare_typed(query, parameter_types).await.map_err(PostgresError::from)
}

pub async fn with_transaction<F, Fut, T, Q>(
Expand All @@ -206,13 +216,13 @@ impl PostgresClient {
Q: ?Sized + ToStatement,
{
let mut conn = self.pool.get().await.map_err(PostgresError::ConnectionPoolError)?;
let transaction = conn.transaction().await.map_err(PostgresError::PgError)?;
let transaction = conn.transaction().await.map_err(PostgresError::from)?;

let count = transaction.execute(query, params).await.map_err(PostgresError::PgError)?;
let count = transaction.execute(query, params).await.map_err(PostgresError::from)?;

let result = f(count).await?;

transaction.commit().await.map_err(PostgresError::PgError)?;
transaction.commit().await.map_err(PostgresError::from)?;

Ok(result)
}
Expand All @@ -226,7 +236,7 @@ impl PostgresClient {
T: ?Sized + ToStatement,
{
let conn = self.pool.get().await?;
let rows = conn.query(query, params).await.map_err(PostgresError::PgError)?;
let rows = conn.query(query, params).await.map_err(PostgresError::from)?;
Ok(rows)
}

Expand All @@ -239,7 +249,7 @@ impl PostgresClient {
T: ?Sized + ToStatement,
{
let conn = self.pool.get().await?;
let row = conn.query_one(query, params).await.map_err(PostgresError::PgError)?;
let row = conn.query_one(query, params).await.map_err(PostgresError::from)?;
Ok(row)
}

Expand All @@ -252,7 +262,7 @@ impl PostgresClient {
T: ?Sized + ToStatement,
{
let conn = self.pool.get().await?;
let row = conn.query_opt(query, params).await.map_err(PostgresError::PgError)?;
let row = conn.query_opt(query, params).await.map_err(PostgresError::from)?;
Ok(row)
}

Expand All @@ -265,15 +275,15 @@ impl PostgresClient {
T: ?Sized + ToStatement,
{
let mut conn = self.pool.get().await?;
let transaction = conn.transaction().await.map_err(PostgresError::PgError)?;
let transaction = conn.transaction().await.map_err(PostgresError::from)?;

for params in params_list {
let params_refs: Vec<&(dyn ToSql + Sync)> =
params.iter().map(|param| param.as_ref() as &(dyn ToSql + Sync)).collect();
transaction.execute(query, &params_refs).await.map_err(PostgresError::PgError)?;
transaction.execute(query, &params_refs).await.map_err(PostgresError::from)?;
}

transaction.commit().await.map_err(PostgresError::PgError)?;
transaction.commit().await.map_err(PostgresError::from)?;
Ok(())
}

Expand All @@ -284,7 +294,7 @@ impl PostgresClient {
{
let conn = self.pool.get().await?;

conn.copy_in(statement).await.map_err(PostgresError::PgError)
conn.copy_in(statement).await.map_err(PostgresError::from)
}

// Internal method used by insert_bulk for large datasets (>100 rows).
Expand Down
1 change: 1 addition & 0 deletions documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-------------------------------------------------
- fix: reorg safe range calculation for out-of-range an error message
- fix: Upgrade dependecy minor versions to get latest alloy fixes
- fix: Improve error dislay for pg error

### Breaking changes
-------------------------------------------------
Expand Down