Skip to content

Commit

Permalink
feat: add error handler
Browse files Browse the repository at this point in the history
nikola-bozin-org committed May 1, 2024
1 parent 31931cd commit 921e999
Showing 1 changed file with 24 additions and 16 deletions.
40 changes: 24 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -9,60 +9,64 @@ use tokio::{task::JoinHandle, time};
/// Unpin - Types that are used with async tasks, ensuring they can be safely pinned in memory.
/// 'static - It should live for an entire duration of an program
pub struct PgDbIdleAgent<T, F>
pub struct PgDbIdleAgent<T, F, E>
where
T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + 'static,

F: Fn(&T) + Send + Sync + 'static,
E: Fn(sqlx::Error) + Send + Sync + 'static, // Error handling callback

{
interval_secs: Duration,
pool: PgPool,
query: String,
action: F,
error_handler: E,
_marker: PhantomData<T>, // Add this so compile does not complain about unused parameter T.
}

impl<T, F> PgDbIdleAgent<T, F>
impl<T, F, E> PgDbIdleAgent<T, F, E>
where
T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + 'static,

F: Fn(&T) + Send + Sync + 'static,
{
pub fn new(interval_secs: Duration, pool: PgPool, query: String, action: F) -> Self {
E: Fn(sqlx::Error) + Send + Sync + 'static, // Error handling callback


{
pub fn new(interval_secs: Duration, pool: PgPool, query: String, action: F,error_handler:E) -> Self {
Self {
interval_secs,
pool,
query,
action,
error_handler,
_marker: PhantomData, // This is hwo to initialize phantom data.
}
}

pub async fn start(self) -> JoinHandle<()>
where
T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Debug,
{
pub async fn start(self) -> JoinHandle<()> {
let mut ticker = time::interval(self.interval_secs);
tokio::task::spawn(async move {
loop {
ticker.tick().await;
self.check_data().await;
if let Err(e) = self.check_data().await {
(self.error_handler)(e);
}
}
})
}

async fn check_data(&self)
async fn check_data(&self) -> Result<(),sqlx::Error>
where
T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin + Debug,
T: for<'r> sqlx::FromRow<'r, PgRow> + Send + Sync + Unpin ,
{
println!("Checking data");
let rows: Vec<T> = sqlx::query_as::<_, T>(self.query.as_str())
.fetch_all(&self.pool)
.await
.unwrap();
.await?;
rows.into_iter().for_each(|element| {
(self.action)(&element); // This is how to invoke an action thats a property.
})
});
Ok(())
}
}

@@ -124,12 +128,16 @@ mod tests {
.await
.unwrap();

let tx = pool.begin().await.unwrap();

drop_examples(&pool).await;

create_example_table(&pool).await;

insert_example_data(&pool).await;

tx.commit().await.unwrap();

pool
}

0 comments on commit 921e999

Please sign in to comment.