Skip to content

Commit cbb8776

Browse files
authored
Merge pull request #16 from Ponewor/15-add-anyhow-crate
Convert boxed errors into anyhow errors
2 parents 1974fbf + 8b9ebb4 commit cbb8776

File tree

2 files changed

+11
-15
lines changed

2 files changed

+11
-15
lines changed

scylla-cdc/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
anyhow = "1.0.48"
910
scylla = "0.3.0"
1011
tokio = { version = "1.1.0", features = ["rt", "io-util", "net", "time", "macros", "sync"] }
1112
chrono = "0.4.19"

scylla-cdc/src/stream_generations.rs

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
use anyhow;
12
use futures::stream::StreamExt;
23
use scylla::batch::Consistency;
34
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
45
use scylla::frame::response::result::{CqlValue, Row};
56
use scylla::frame::value::{Timestamp, Value, ValueTooBig};
67
use scylla::query::Query;
78
use scylla::{FromRow, IntoTypedRows, Session};
8-
use std::error::Error;
99
use std::sync::Arc;
1010

1111
#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, FromRow)]
@@ -73,7 +73,7 @@ impl GenerationFetcher {
7373

7474
/// In case of a success returns a vector containing all the generations in the database.
7575
/// Propagates all errors.
76-
pub async fn fetch_all_generations(&self) -> Result<Vec<GenerationTimestamp>, Box<dyn Error>> {
76+
pub async fn fetch_all_generations(&self) -> anyhow::Result<Vec<GenerationTimestamp>> {
7777
let mut generations = Vec::new();
7878

7979
let mut query =
@@ -115,7 +115,7 @@ impl GenerationFetcher {
115115
pub async fn fetch_generation_by_timestamp(
116116
&self,
117117
time: &chrono::Duration,
118-
) -> Result<Option<GenerationTimestamp>, Box<dyn Error>> {
118+
) -> anyhow::Result<Option<GenerationTimestamp>> {
119119
let query =
120120
new_distributed_system_query(self.get_generation_by_timestamp_query(), &self.session)
121121
.await?;
@@ -146,7 +146,7 @@ impl GenerationFetcher {
146146
pub async fn fetch_next_generation(
147147
&self,
148148
generation: &GenerationTimestamp,
149-
) -> Result<Option<GenerationTimestamp>, Box<dyn Error>> {
149+
) -> anyhow::Result<Option<GenerationTimestamp>> {
150150
let query =
151151
new_distributed_system_query(self.get_next_generation_query(), &self.session).await?;
152152

@@ -172,7 +172,7 @@ impl GenerationFetcher {
172172
pub async fn fetch_stream_ids(
173173
&self,
174174
generation: &GenerationTimestamp,
175-
) -> Result<Vec<Vec<StreamID>>, Box<dyn Error>> {
175+
) -> anyhow::Result<Vec<Vec<StreamID>>> {
176176
let mut result_vec = Vec::new();
177177

178178
let mut query =
@@ -195,9 +195,7 @@ impl GenerationFetcher {
195195
}
196196

197197
// Return single row containing generation.
198-
fn return_single_row(
199-
row: Option<Vec<Row>>,
200-
) -> Result<Option<GenerationTimestamp>, Box<dyn Error>> {
198+
fn return_single_row(row: Option<Vec<Row>>) -> anyhow::Result<Option<GenerationTimestamp>> {
201199
if let Some(row) = row {
202200
if let Some(generation) = row.into_typed::<GenerationTimestamp>().next() {
203201
return Ok(Some(generation?));
@@ -209,7 +207,7 @@ impl GenerationFetcher {
209207
}
210208

211209
// Returns current cluster size in case of a success.
212-
async fn get_cluster_size(session: &Session) -> Result<usize, Box<dyn Error>> {
210+
async fn get_cluster_size(session: &Session) -> anyhow::Result<usize> {
213211
// We are using default consistency here since the system keyspace is special and
214212
// the coordinator which handles the query will only read local data
215213
// and will not contact other nodes, so the query will work with any cluster size larger than 0.
@@ -225,18 +223,15 @@ async fn get_cluster_size(session: &Session) -> Result<usize, Box<dyn Error>> {
225223
}
226224

227225
// Choose appropriate consistency level depending on the cluster size.
228-
async fn select_consistency(session: &Session, query: &mut Query) -> Result<(), Box<dyn Error>> {
226+
async fn select_consistency(session: &Session, query: &mut Query) -> anyhow::Result<()> {
229227
query.set_consistency(match get_cluster_size(session).await? {
230228
1 => Consistency::One,
231229
_ => Consistency::Quorum,
232230
});
233231
Ok(())
234232
}
235233

236-
async fn new_distributed_system_query(
237-
stmt: String,
238-
session: &Session,
239-
) -> Result<Query, Box<dyn Error>> {
234+
async fn new_distributed_system_query(stmt: String, session: &Session) -> anyhow::Result<Query> {
240235
let mut query = Query::new(stmt);
241236
select_consistency(session, &mut query).await?;
242237

@@ -369,7 +364,7 @@ mod tests {
369364
}
370365

371366
// Create setup for tests.
372-
async fn setup() -> Result<GenerationFetcher, Box<dyn Error>> {
367+
async fn setup() -> anyhow::Result<GenerationFetcher> {
373368
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
374369

375370
let session = SessionBuilder::new().known_node(uri).build().await?;

0 commit comments

Comments
 (0)