From 0b152bd63aeaff10a5aa594e8300ac284518ccfb Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Fri, 26 Nov 2021 12:36:01 +0100 Subject: [PATCH] Create an interface for consuming data that fixes #5. Create Consumer and ConsumerFactory traits. Implement CDCRow struct that represents data passed to the consumer. Implement CDCRowSchema that contains info about column order in the CDC table. Create tests for Consumer module. --- scylla-cdc/Cargo.toml | 2 + scylla-cdc/src/consumer.rs | 468 +++++++++++++++++++++++++++++++++++++ scylla-cdc/src/lib.rs | 1 + 3 files changed, 471 insertions(+) create mode 100644 scylla-cdc/src/consumer.rs diff --git a/scylla-cdc/Cargo.toml b/scylla-cdc/Cargo.toml index a343400..87b051f 100644 --- a/scylla-cdc/Cargo.toml +++ b/scylla-cdc/Cargo.toml @@ -11,6 +11,8 @@ scylla = "0.3.0" tokio = { version = "1.1.0", features = ["rt", "io-util", "net", "time", "macros", "sync"] } chrono = "0.4.19" futures = "0.3.17" +uuid = "0.8.2" +num_enum = "0.5.4" [dev-dependencies] hex = "0.4.3" diff --git a/scylla-cdc/src/consumer.rs b/scylla-cdc/src/consumer.rs new file mode 100644 index 0000000..e0b0fd9 --- /dev/null +++ b/scylla-cdc/src/consumer.rs @@ -0,0 +1,468 @@ +use num_enum::TryFromPrimitive; +use scylla::frame::response::result::{ColumnSpec, CqlValue, Row}; +use std::collections::HashMap; + +pub trait Consumer { + fn consume_cdc(&mut self, data: CDCRow); +} + +pub trait ConsumerFactory { + fn new_consumer(&self) -> Box; +} + +#[derive(Debug, Eq, PartialEq, TryFromPrimitive)] +#[repr(i8)] +pub enum OperationType { + PreImage, + RowUpdate, + RowInsert, + RowDelete, + PartitionDelete, + RowRangeDelInclLeft, + RowRangeDelExclLeft, + RowRangeDelInclRight, + RowRangeDelExclRight, + PostImage, +} + +pub struct CDCRowSchema { + // The usize values are indices of given values in the Row.columns vector. + pub(crate) stream_id: usize, + pub(crate) time: usize, + pub(crate) batch_seq_no: usize, + pub(crate) end_of_batch: usize, + pub(crate) operation: usize, + pub(crate) ttl: usize, + + // These HashMaps map names of columns in the observed table to matching columns in the CDC table. + // The usize value is an index of the column in an internal vector inside of the CDCRow struct. + + // Maps name of a column to a matching column in the CDC table. + pub(crate) mapping: HashMap, + // Maps name of a column to a column that tells if value in this column was deleted. + pub(crate) deleted_mapping: HashMap, + // Maps name of a collection column to a column that tells + // if elements from this collection were deleted. + pub(crate) deleted_el_mapping: HashMap, +} + +const STREAM_ID_NAME: &str = "cdc$stream_id"; +const TIME_NAME: &str = "cdc$time"; +const BATCH_SEQ_NO_NAME: &str = "cdc$batch_seq_no"; +const END_OF_BATCH_NAME: &str = "cdc$end_of_batch"; +const OPERATION_NAME: &str = "cdc$operation"; +const TTL_NAME: &str = "cdc$ttl"; +const IS_DELETED_PREFIX: &str = "cdc$deleted_"; +const ARE_ELEMENTS_DELETED_PREFIX: &str = "cdc$deleted_elements_"; + +impl CDCRowSchema { + pub fn new(specs: &[ColumnSpec]) -> CDCRowSchema { + let mut stream_id = 0; + let mut time = 0; + let mut batch_seq_no = 0; + let mut end_of_batch = 0; + let mut operation = 0; + let mut ttl = 0; + let mut mapping: HashMap = HashMap::new(); + let mut deleted_mapping: HashMap = HashMap::new(); + let mut deleted_el_mapping: HashMap = HashMap::new(); + + let mut j = 0; + + // Hashmaps will have indices of data in a new vector without the hardcoded values. + for (i, spec) in specs.iter().enumerate() { + match spec.name.as_str() { + // spec.name is public since 0.3.0 driver version, it didn't work on 0.2.1. + STREAM_ID_NAME => stream_id = i, + TIME_NAME => time = i, + BATCH_SEQ_NO_NAME => batch_seq_no = i, + END_OF_BATCH_NAME => end_of_batch = i, + OPERATION_NAME => operation = i, + TTL_NAME => ttl = i, + x => { + if let Some(stripped) = x.strip_prefix(ARE_ELEMENTS_DELETED_PREFIX) { + deleted_el_mapping.insert(stripped.to_string(), j); + } else if let Some(stripped) = x.strip_prefix(IS_DELETED_PREFIX) { + deleted_mapping.insert(stripped.to_string(), j); + } else { + mapping.insert(x.to_string(), j); + } + + j += 1; + } + } + } + + CDCRowSchema { + stream_id, + time, + batch_seq_no, + end_of_batch, + operation, + ttl, + mapping, + deleted_mapping, + deleted_el_mapping, + } + } +} + +pub struct CDCRow<'schema> { + pub stream_id: Vec, + pub time: uuid::Uuid, + pub batch_seq_no: i32, + pub end_of_batch: bool, + pub operation: OperationType, + // Can be NULL in the database. + pub ttl: Option, + data: Vec>, + // Maps element name to its index in the data vector. + schema: &'schema CDCRowSchema, +} + +impl CDCRow<'_> { + pub fn from_row(row: Row, schema: &CDCRowSchema) -> CDCRow { + // If cdc read was successful, these default values will not be used. + let mut stream_id = vec![]; + let mut time = uuid::Uuid::default(); + let mut batch_seq_no = i32::MAX; + let mut end_of_batch = false; + let mut operation = OperationType::PreImage; + let mut ttl = None; + + let data_count = + schema.mapping.len() + schema.deleted_mapping.len() + schema.deleted_el_mapping.len(); + let mut data: Vec> = Vec::with_capacity(data_count); + + for (i, column) in row.columns.into_iter().enumerate() { + if i == schema.stream_id { + stream_id = column.unwrap().into_blob().unwrap(); + } else if i == schema.time { + time = column.unwrap().as_uuid().unwrap(); + } else if i == schema.batch_seq_no { + batch_seq_no = column.unwrap().as_int().unwrap(); + } else if i == schema.end_of_batch { + end_of_batch = column.unwrap().as_boolean().unwrap() + } else if i == schema.operation { + operation = OperationType::try_from(column.unwrap().as_tinyint().unwrap()).unwrap(); + } else if i == schema.ttl { + ttl = column.map(|ttl| ttl.as_bigint().unwrap()); + } else { + data.push(column); + } + } + + CDCRow { + stream_id, + time, + batch_seq_no, + end_of_batch, + operation, + ttl, + data, + schema, + } + } + + /// Allows to get a value from the column that corresponds to the logged table. + /// Returns None if the value is null. + /// Panics if the column does not exist in this table. + /// To check if such column exists, use column_exists() method. + pub fn get_value(&self, name: &str) -> &Option { + self.schema + .mapping + .get(name) + .map(|id| &self.data[*id]) + .unwrap() + } + + /// Allows to get info if a value was deleted in this operation. + /// Panics if the column does not exist in this table + /// or the column is a part of primary key (because these values can't be deleted). + /// To check if such column exists, use column_deletable() method. + pub fn is_value_deleted(&self, name: &str) -> bool { + self.schema + .deleted_mapping + .get(name) + .map(|id| self.data[*id].is_some()) + .unwrap() + } + + /// Allows to get deleted elements from a collection. + /// Returns empty slice if the value is null. + /// Panics if the column does not exist in this table or is not a collection. + /// To check if such column exists, use collection_exists() method. + pub fn get_deleted_elements(&self, name: &str) -> &[CqlValue] { + let val = self + .schema + .deleted_el_mapping + .get(name) + .map(|id| self.data[*id].as_ref().map(|val| val.as_set().unwrap())) + .unwrap(); + match val { + Some(vec) => vec, + None => &[], + } + } + + pub fn column_exists(&self, name: &str) -> bool { + self.schema.mapping.contains_key(name) + } + + pub fn column_deletable(&self, name: &str) -> bool { + self.schema.deleted_mapping.contains_key(name) + } + + pub fn collection_exists(&self, name: &str) -> bool { + self.schema.deleted_el_mapping.contains_key(name) + } +} + +#[cfg(test)] +mod tests { + // Because we are planning to extract a common setup to all tests, + // the setup for this module is based on generation fetcher's tests. + + use super::*; + use scylla::batch::Consistency; + use scylla::query::Query; + use scylla::{Session, SessionBuilder}; + // These tests should be indifferent to things like number of Scylla nodes, + // so if run separately, they can be tested on one Scylla instance. + + const TEST_SINGLE_VALUE_TABLE: &str = "ConsumerTest.single_value"; + const TEST_SINGLE_COLLECTION_TABLE: &str = "ConsumerTest.single_collection"; + const TEST_KEYSPACE: &str = "ConsumerTest"; + const CDC_CONFIG: &str = "{'enabled': 'true'}"; + const TEST_SINGLE_VALUE_CDC_TABLE: &str = "ConsumerTest.single_value_scylla_cdc_log"; + const TEST_SINGLE_COLLECTION_CDC_TABLE: &str = "ConsumerTest.single_collection_scylla_cdc_log"; + + fn construct_single_value_table_query() -> String { + format!( + " + CREATE TABLE IF NOT EXISTS {}( + pk int, + ck int, + v int, + PRIMARY KEY(pk, ck)) WITH cdc = {};", + TEST_SINGLE_VALUE_TABLE, CDC_CONFIG + ) + } + + async fn populate_single_value_table(session: &Session) { + // We want to use ttl, because we check its value in some tests. + session + .query( + format!( + "INSERT INTO {} (pk, ck, v) VALUES ({}, {}, {}) USING TTL {};", + TEST_SINGLE_VALUE_TABLE, 1, 2, 3, 86400 + ), + (), + ) + .await + .unwrap(); + } + + fn construct_single_collection_table_query() -> String { + format!( + " + CREATE TABLE IF NOT EXISTS {}( + pk int, + ck int, + vs set, + PRIMARY KEY(pk, ck)) WITH cdc = {};", + TEST_SINGLE_COLLECTION_TABLE, CDC_CONFIG + ) + } + + async fn populate_single_collection_table(session: &Session) { + session + .query( + format!( + "INSERT INTO {} (pk, ck, vs) VALUES (?, ?, ?);", + TEST_SINGLE_COLLECTION_TABLE + ), + (1, 2, vec![1, 2]), + ) + .await + .unwrap(); + } + + // This is copied from stream_generations::tests, because we plan to standardize this. + async fn create_test_db(session: &Session) { + // These tests don't rely on how the cluster looks like, so we can test on one node. + let mut query = Query::new(format!( + "CREATE KEYSPACE IF NOT EXISTS {} WITH replication + = {{'class':'SimpleStrategy', 'replication_factor': 1}};", + TEST_KEYSPACE + )); + query.set_consistency(Consistency::All); + + session.query(query, &[]).await.unwrap(); + session.await_schema_agreement().await.unwrap(); + + // Create test tables containing sample data for tests. + for query in vec![ + construct_single_value_table_query(), + construct_single_collection_table_query(), + ] { + session.query(query, &[]).await.unwrap(); + } + session.await_schema_agreement().await.unwrap(); + + // Delete all leftovers from previous tests. + for table in vec![ + TEST_SINGLE_VALUE_TABLE.to_string(), + TEST_SINGLE_COLLECTION_TABLE.to_string(), + TEST_SINGLE_VALUE_CDC_TABLE.to_string(), + TEST_SINGLE_COLLECTION_CDC_TABLE.to_string(), + ] { + session + .query(format!("TRUNCATE {};", table), &[]) + .await + .unwrap(); + } + } + + async fn setup() -> anyhow::Result { + let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); + + let session = SessionBuilder::new().known_node(uri).build().await?; + + create_test_db(&session).await; + populate_single_value_table(&session).await; + populate_single_collection_table(&session).await; + + Ok(session) + } + + // Tests if field are properly set. + #[tokio::test] + async fn test_query() { + let session = setup().await.unwrap(); + let result = session + .query( + format!("SELECT * FROM {};", TEST_SINGLE_VALUE_CDC_TABLE), + (), + ) + .await + .unwrap(); + + let row = result.rows.unwrap().remove(0); + let schema = CDCRowSchema::new(&result.col_specs); + let cdc_row = CDCRow::from_row(row, &schema); + + // Test against the default values in CDCRow::from_row + assert!(cdc_row.stream_id.len() > 0); + assert_ne!(cdc_row.time, uuid::Uuid::default()); + assert_eq!(cdc_row.batch_seq_no, 0); + assert!(cdc_row.end_of_batch); + assert_eq!(cdc_row.operation, OperationType::RowInsert); + assert!(cdc_row.ttl.is_some()); + + assert_eq!( + cdc_row.get_value("v").as_ref().unwrap().as_int().unwrap(), + 3 + ); + assert!(!cdc_row.is_value_deleted("v")); + assert!(!cdc_row.collection_exists("v")); + } + + #[tokio::test] + async fn test_get_deleted() { + let session = setup().await.unwrap(); + session + .query( + format!( + "DELETE v FROM {} WHERE pk = {} AND ck = {};", + TEST_SINGLE_VALUE_TABLE, 1, 2 + ), + (), + ) + .await + .unwrap(); + // We must allow filtering in order to search by cdc$operation. + let result = session + .query(format!("SELECT * FROM {} WHERE \"cdc$operation\" = {} AND pk = {} AND ck = {} ALLOW FILTERING;", + TEST_SINGLE_VALUE_CDC_TABLE, OperationType::RowUpdate as i8, 1, 2), ()) + .await + .unwrap(); + let row = result.rows.unwrap().remove(0); + let schema = CDCRowSchema::new(&result.col_specs); + let cdc_row = CDCRow::from_row(row, &schema); + + assert!(cdc_row.is_value_deleted("v")) + } + + #[tokio::test] + async fn test_get_deleted_elements() { + let session = setup().await.unwrap(); + session + .query( + format!( + "UPDATE {} SET vs = vs - {{{}}} WHERE pk = {} AND ck = {}", + TEST_SINGLE_COLLECTION_TABLE, 2, 1, 2 + ), + (), + ) + .await + .unwrap(); + // We must allow filtering in order to search by cdc$operation. + let result = session + .query(format!("SELECT * FROM {} WHERE \"cdc$operation\" = {} AND pk = {} AND ck = {} ALLOW FILTERING;", + TEST_SINGLE_COLLECTION_CDC_TABLE, OperationType::RowUpdate as i8, 1, 2), ()) + .await + .unwrap(); + + let row = result.rows.unwrap().remove(0); + let schema = CDCRowSchema::new(&result.col_specs); + let cdc_row = CDCRow::from_row(row, &schema); + + let vec = cdc_row.get_deleted_elements("vs"); + + assert_eq!(vec.len(), 1); + assert_eq!(vec[0].as_int().unwrap(), 2); + } + + // Unit test for schema. + #[tokio::test] + async fn test_create_schema() { + let session = setup().await.unwrap(); + // Set the columns order to test if schema maps that correctly. + let result = session + .query( + format!( + "SELECT ck, pk, v, \"cdc$deleted_v\",\ + \"cdc$time\", \"cdc$stream_id\", \"cdc$batch_seq_no\", \ + \"cdc$ttl\", \"cdc$end_of_batch\", \"cdc$operation\"\ + FROM {};", + TEST_SINGLE_VALUE_CDC_TABLE + ), + (), + ) + .await + .unwrap(); + + let schema = CDCRowSchema::new(&result.col_specs); + // Check fixed values. + assert_eq!(schema.stream_id, 5); + assert_eq!(schema.time, 4); + assert_eq!(schema.batch_seq_no, 6); + assert_eq!(schema.end_of_batch, 8); + assert_eq!(schema.operation, 9); + assert_eq!(schema.ttl, 7); + + // Check values from observed table. + assert_eq!(*schema.mapping.get("pk").unwrap(), 1_usize); + assert_eq!(*schema.mapping.get("ck").unwrap(), 0_usize); + assert_eq!(*schema.mapping.get("v").unwrap(), 2_usize); + + // Check deleted_*. + assert_eq!(*schema.deleted_mapping.get("v").unwrap(), 3_usize); + + // Check maps' size. + assert_eq!(schema.mapping.len(), 3); + assert_eq!(schema.deleted_mapping.len(), 1); + assert_eq!(schema.deleted_el_mapping.len(), 0); + } +} diff --git a/scylla-cdc/src/lib.rs b/scylla-cdc/src/lib.rs index 7b33efc..3ace3da 100644 --- a/scylla-cdc/src/lib.rs +++ b/scylla-cdc/src/lib.rs @@ -1,3 +1,4 @@ +pub mod consumer; pub mod stream_generations; #[cfg(test)]