Skip to content

Commit 0fb9743

Browse files
committed
Changed api to returning stream ids grouped by vnodes.
Allows user to learn about a connection between streams and their vnodes. They can easily flatten the vector if they don't need this knowledge. Allows us to accomplish issue #14 Also changed tests for this method and cleaned up a little.
1 parent 64e8de2 commit 0fb9743

File tree

1 file changed

+17
-9
lines changed

1 file changed

+17
-9
lines changed

scylla-cdc/src/stream_generations.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::stream::StreamExt;
22
use scylla::batch::Consistency;
3-
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError, FromCqlValError::BadCqlType};
3+
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
44
use scylla::frame::response::result::{CqlValue, Row};
55
use scylla::frame::value::{Timestamp, Value, ValueTooBig};
66
use scylla::query::Query;
@@ -32,7 +32,10 @@ impl Value for StreamID {
3232

3333
impl FromCqlVal<CqlValue> for StreamID {
3434
fn from_cql(cql_val: CqlValue) -> Result<Self, FromCqlValError> {
35-
let id = cql_val.as_blob().ok_or(BadCqlType)?.to_owned();
35+
let id = cql_val
36+
.as_blob()
37+
.ok_or(FromCqlValError::BadCqlType)?
38+
.to_owned();
3639
Ok(StreamID { id })
3740
}
3841
}
@@ -165,10 +168,11 @@ impl GenerationFetcher {
165168
}
166169

167170
/// Given a generation return identifiers of all streams of this generation.
171+
/// Streams are grouped by vnodes.
168172
pub async fn fetch_stream_ids(
169173
&self,
170174
generation: &GenerationTimestamp,
171-
) -> Result<Vec<StreamID>, Box<dyn Error>> {
175+
) -> Result<Vec<Vec<StreamID>>, Box<dyn Error>> {
172176
let mut result_vec = Vec::new();
173177

174178
let mut query =
@@ -184,10 +188,9 @@ impl GenerationFetcher {
184188

185189
while let Some(next_row) = rows.next().await {
186190
let (ids,) = next_row?;
187-
for id in ids {
188-
result_vec.push(id);
189-
}
191+
result_vec.push(ids);
190192
}
193+
191194
Ok(result_vec)
192195
}
193196

@@ -486,10 +489,15 @@ mod tests {
486489

487490
let stream_ids = fetcher.fetch_stream_ids(&gen).await.unwrap();
488491

489-
let correct_stream_ids: Vec<StreamID> = vec![TEST_STREAM_1, TEST_STREAM_2]
492+
let correct_stream_ids: Vec<Vec<StreamID>> = vec![[TEST_STREAM_1, TEST_STREAM_2]]
490493
.iter()
491-
.map(|stream| StreamID {
492-
id: hex::decode(stream.strip_prefix("0x").unwrap()).unwrap(),
494+
.map(|stream_vec| {
495+
stream_vec
496+
.iter()
497+
.map(|stream| StreamID {
498+
id: hex::decode(stream.strip_prefix("0x").unwrap()).unwrap(),
499+
})
500+
.collect()
493501
})
494502
.collect();
495503

0 commit comments

Comments
 (0)