Skip to content

Interface for consuming changes #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 30, 2021

Conversation

procrastinationfighter
Copy link
Contributor

@procrastinationfighter procrastinationfighter commented Nov 12, 2021

Fixes: #5

This pull request adds an interface for consuming changes from CDC table.
The module consumer provides a Consumer trait and a ConsumerFactory trait that define interface used by the reader component. Data from the table is passed to a consumer in a CDCRow object. CDCSchema is used to create new CDCRow objects.

mapping: HashMap<String, usize>,
}

const STREAM_ID_NAME: &'static str = "cdc$stream_id";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think constants have static lifetime by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I can't decide which option looks better, so I will just leave it for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Clippy recommended me to remove it.

@procrastinationfighter procrastinationfighter changed the title Create a draft version of the customer interface. Interface for consuming changes Nov 12, 2021
Comment on lines 9 to 18
PreImage = 0,
RowUpdate = 1,
RowInsert = 2,
RowDelete = 3,
PartitionDelete = 4,
RowRangeDelInclLeft = 5,
RowRangeDelExclLeft = 6,
RowRangeDelInclRight = 7,
RowRangeDelExclRight = 8,
PostImage = 9,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this comment is redundant at this point after @piodul's suggestion to use num_enum but for the record:
you don't have to be explicit with integer values here, they are assigned this way by default.

@@ -0,0 +1,113 @@
use std::collections::HashMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably wanted to name the file consumer.rs, didn't you?

@procrastinationfighter
Copy link
Contributor Author

I've applied some changes to the code - it compiles now, however it was not tested yet, so it's still marked as a draft - I will mark this request as ready for review as soon as I add the tests.

Comment on lines 136 to 152
// I have no idea why, but I couldn't use match here - variables were not accessible inside the match block.
if i == schema.stream_id {
stream_id = column.unwrap().into_blob().unwrap();
} else if i == schema.time {
time = column.unwrap().as_uuid().unwrap();
} else if i == schema.batch_seq_no {
batch_seq_no = column.unwrap().as_int().unwrap();
} else if i == schema.end_of_batch {
end_of_batch = column.unwrap().as_boolean().unwrap()
} else if i == schema.operation {
operation = OperationType::try_from(column.unwrap().as_tinyint().unwrap()).unwrap();
} else if i == schema.ttl {
ttl = column.map(|ttl| ttl.as_bigint().unwrap());
} else {
data.push(column);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why, but when I tried to do something like that:
match i { schema.stream_id = > (...) }
it just didn't work - as if the variable schema was out of scope.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can put variables into match arms - only constants known at compile time are suitable.

let mut i = 0;

// .iter().enumerate() can't be used here, because it doesn't take ownership of taken value.
for column in row.columns.into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't have to use into_iter here.

let mut i = 0;

// .iter().enumerate() can't be used here, because it doesn't take ownership of taken value.
for column in row.columns.into_iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't have to use into_iter here.

Comment on lines 136 to 152
// I have no idea why, but I couldn't use match here - variables were not accessible inside the match block.
if i == schema.stream_id {
stream_id = column.unwrap().into_blob().unwrap();
} else if i == schema.time {
time = column.unwrap().as_uuid().unwrap();
} else if i == schema.batch_seq_no {
batch_seq_no = column.unwrap().as_int().unwrap();
} else if i == schema.end_of_batch {
end_of_batch = column.unwrap().as_boolean().unwrap()
} else if i == schema.operation {
operation = OperationType::try_from(column.unwrap().as_tinyint().unwrap()).unwrap();
} else if i == schema.ttl {
ttl = column.map(|ttl| ttl.as_bigint().unwrap());
} else {
data.push(column);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can put variables into match arms - only constants known at compile time are suitable.

Comment on lines 117 to 119
let mapping = schema.mapping.clone();
let deleted_mapping = schema.deleted_mapping.clone();
let deleted_el_mapping = schema.deleted_el_mapping.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really sad that construction of a CDCRow requires cloning three hashmaps.

Let's change the CDCRow so that it keeps a reference to a CDCRowSchema:

pub struct CDCRow<'schema>
    // ...
    schema: &'schema CDCRowSchema,

Then, in the getter methods you can refer to the mappings through the schema reference.

@procrastinationfighter procrastinationfighter marked this pull request as ready for review November 18, 2021 19:12
@procrastinationfighter
Copy link
Contributor Author

procrastinationfighter commented Nov 18, 2021

I applied suggested fixes and created some tests for the module. I also have an unfinished test that checks if CDCRow is created properly (without connecting to the database, with prepared Row and CDCRowSchema), but I didn't finish it, because I changed my opinion about its usefulness and writing it was tiring. If you think that such a test is a good idea, I will finish it.

Copy link
Collaborator

@piodul piodul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for changing my mind on the interface so late... but after I saw how many unwraps there are in tests I think the return types get_value, is_value_deleted and get_deleted_elements should be simplified even if it means that some information about columns existing or not will be unavailable. I think that the users will usually know what their schema is, so they which column names to expect and which cdc$deleted_ and cdc$deleted_elements_ special columns to expect. They will have to unwrap anyway (I don't think they will want to handle it in some other way), so we can do it for them. I imagine the following interface:

// We need this one `Option` here because it represents if the value is null or not
// and that is a case some users would definitely want to check for
pub fn get_value(&self, name: &str) -> &Option<CqlValue> {
    // Panic if the column does not exist
    // Otherwise return a reference to its value
}

pub fn is_value_deleted(&self, name: &str) -> bool {
    // Panic if the column does not exist
    // Otherwise return if the value was deleted
}

pub fn get_deleted_elements(&self, name: &str) -> &[CqlValue] {
    // Panic if the column does not exist
    // If the value is null, return an empty slice: &[]
    // Otherwise return the slice with deleted elements
}

If I'm wrong and somebody does need this information, we can add another method for them which checks if the column exists:

pub fn column_exists(&self, name: &str) -> bool {
	// ...
}

The panicking behavior should be documented in the docstring comments for those methods.

Apart from that and some other small comments I left, I think it LGTM.

}

pub trait ConsumerFactory {
fn new_consumer() -> Box<dyn Consumer>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should be non-static, i.e. take &self as the first parameter.

session.query(query, &[]).await.unwrap();
session.await_schema_agreement().await.unwrap();

// Create test tables containing information about generations and streams.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is incorrect - the code below just creates tables with CDC enabled, it does not create mock tables containing information about streams and generations (as the tables created in stream_generations.rs).

// We must allow filtering in order to search by cdc$operation.
let result = session
.query(format!("SELECT * FROM {} WHERE \"cdc$operation\" = {} AND pk = {} AND ck = {} ALLOW FILTERING;",
TEST_SINGLE_VALUE_CDC_TABLE, 1, 1, 2), ()) // 1 is row update
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using 1 as cdc$operation, please use OperationType::RowUpdate as i8 to be more clear.

// Test against the default values in CDCRow::from_row
assert!(cdc_row.stream_id.len() > 0);
assert_ne!(cdc_row.time, uuid::Uuid::default());
assert_ne!(cdc_row.batch_seq_no, i32::MAX);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can assert that the batch_seq_no is equal to 0 here.

}

#[tokio::test]
async fn test_get_item() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test necessary? The same things (and more) are checked in the test_query.

Comment on lines 333 to 334
// The operation type is insert in this case.
assert_ne!(cdc_row.operation, OperationType::PreImage);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The operation type is insert in this case.

You can express such comments as assertions, you know :) Now you are only asserting that the operation type is not PreImage.

/// Returns None if there is no collection column with such name.
/// Returns Some(None) if there is such collection, but nothing was deleted from it.
/// Otherwise returns Some(Some(x)) where x is a reference to vector containing deleted values.
pub fn get_deleted_elements(&self, name: &str) -> Option<Option<&Vec<CqlValue>>> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify a bit here, similarly to is_value_deleted. I don't think that the set of deleted elements can be empty, so we can consider null to be an empty Vec in order to remove one layer of Option. Additionally, you should change &Vec<CqlValue> to &[CqlValue] - always prefer slices to vec references.

Also, take a look at my summary comment of this review - I included more suggestions which will affect the final shape of this function.

@piodul
Copy link
Collaborator

piodul commented Nov 19, 2021

@kbr- please review, too.

@piodul
Copy link
Collaborator

piodul commented Nov 23, 2021

@kbr- ping

@piodul
Copy link
Collaborator

piodul commented Nov 24, 2021

@kbr- ping^2

@kbr-
Copy link

kbr- commented Nov 25, 2021

Please fix the git log according to these instructions:
#8 (comment)

also the PR cover letter says:

This is definitely not finished, as you can see, there are many TODOs and it does not compile.

I'm guessing this is no longer true, so please update it.

pub(crate) end_of_batch: usize,
pub(crate) operation: usize,
pub(crate) ttl: usize,
pub(crate) mapping: HashMap<String, usize>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please describe in a comment the nature of this mapping (i.e. it maps what to what)

}

pub struct CDCRowSchema {
pub(crate) stream_id: usize,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These values are indices of these columns in the CDC log table schema, or something else? A comment should mention this. (I didn't understand what these usizes mean until I read the implementation of new)

let mut data: Vec<Option<CqlValue>> = Vec::with_capacity(data_count);

for (i, column) in row.columns.into_iter().enumerate() {
if i == schema.stream_id {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps using guards would be more elegant:
https://doc.rust-lang.org/rust-by-example/flow_control/match/guard.html
(just a suggestion, depends on your preference)

mod tests {
// Because we are planning to extract a common setup to all tests,
// the setup for this module is based on generation fetcher's tests,
// which has already been merged to the main branch.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last part of this commit is really unnecessary (about merging stuff into main)

Comment on lines +207 to +205
match val {
Some(vec) => vec,
None => &[],
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is probably equivalent to val.unwrap_or(&[])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about that. It does not compile with message: expected struct Vec, found array of 0 elements. val is of type Option<&Vec<CqlValue>> and if I understand the generics correctly, unwrap_or should take as an argument of the same type as its return type: https://doc.rust-lang.org/std/option/enum.Option.html#method.unwrap_or

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then nevermind. It looks like Rust implicitly converts &Vec<CqlValue> to &[CqlValue] in your code, but it can't do it in unwrap_or. I think that if you placed an as_ref somewhere higher in the code then unwrap_or would work too (so that we explicitly perform this conversion), but I won't insist on changing that.

}
}

pub fn column_exists(&self, name: &str) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the newly added methods belong to CDCRowSchema rather than CDCRow, so please move them there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created these functions with thought that they will serve the consumer, for example, to debug. As far as I know, the consumer isn't going to see any CDCRowSchema, because this is an internal struct that is used to create new CDCRow objects. If these functions were moved to CDCRowSchema, they would be useless in my opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I see nothing wrong with exposing the CDCRowSchema to the user, (for example through CDCRow::get_schema() or something) and I think that those methods make more sense to be there, but maybe it will make the interface harder to use... I'm not sure, so I won't insist.

Create Consumer and ConsumerFactory traits.

Implement CDCRow struct that represents data passed to the consumer.

Implement CDCRowSchema that contains info about column order in the CDC table.

Create tests for Consumer module.
@piodul piodul merged commit 0935aa2 into scylladb:main Nov 30, 2021
@procrastinationfighter procrastinationfighter deleted the consumer branch December 14, 2021 22:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Design and implement an interface for consuming changes from CDC log
5 participants