diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 402dab961..24a3972b8 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1664,6 +1664,26 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +[[package]] +name = "lz4" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "macros" version = "0.1.0" @@ -2808,18 +2828,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.193" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", @@ -3146,18 +3166,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", @@ -3597,6 +3617,20 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "transaction-filter" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-protos", + "lz4", + "memchr", + "prost 0.12.3", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "try-lock" version = "0.2.4" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3f3d86076..aa7762b39 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" -members = ["indexer-metrics", "moving-average", "processor", "server-framework"] +members = ["indexer-metrics", "moving-average", "processor", "server-framework", "transaction-filter"] [workspace.package] authors = ["Aptos Labs "] @@ -67,6 +67,8 @@ jemallocator = { version = "0.5.0", features = [ ] } kanal = { version = "0.1.0-pre8", features = ["async"] } once_cell = "1.10.0" +# SIMD for string search +memchr = "2.7.2" num_cpus = "1.16.0" pbjson = "0.5.1" prometheus = { version = "0.13.0", default-features = false } @@ -86,6 +88,7 @@ sha2 = "0.9.3" sha3 = "0.9.1" strum = { version = "0.24.1", features = ["derive"] } tempfile = "3.3.0" +thiserror = "1.0.61" toml = "0.7.4" tracing-subscriber = { version = "0.3.17", features = ["json", "env-filter"] } tokio = { version = "1.35.1", features = ["full"] } diff --git a/rust/processor/src/utils/util.rs b/rust/processor/src/utils/util.rs index 14f750322..9638da9be 100644 --- a/rust/processor/src/utils/util.rs +++ b/rust/processor/src/utils/util.rs @@ -53,8 +53,11 @@ pub struct MultisigPayloadClean { } /// Standardizes all addresses and table handles to be length 66 (0x-64 length hash) +#[inline] pub fn standardize_address(handle: &str) -> String { - if let Some(handle) = handle.strip_prefix("0x") { + if handle.len() == 66 { + handle.to_string() + } else if let Some(handle) = handle.strip_prefix("0x") { format!("0x{:0>64}", handle) } else { format!("0x{:0>64}", handle) diff --git a/rust/transaction-filter/Cargo.toml b/rust/transaction-filter/Cargo.toml new file mode 100644 index 000000000..b08215c52 --- /dev/null +++ b/rust/transaction-filter/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "transaction-filter" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = { workspace = true } +aptos-protos = { workspace = true } +# SIMD for string search. TODO: benchmark this on various real inputs to see if it's worth it +memchr = { workspace = true } + +prost = { workspace = true } + +serde = { workspace = true } +serde_json = { workspace = true } + +thiserror = { workspace = true } + +[dev-dependencies] +# we only decompress the fixture protos in test +lz4 = "1.24.0" + diff --git a/rust/transaction-filter/fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4 b/rust/transaction-filter/fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4 new file mode 100644 index 000000000..cee326c26 Binary files /dev/null and b/rust/transaction-filter/fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4 differ diff --git a/rust/transaction-filter/fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4 b/rust/transaction-filter/fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4 new file mode 100644 index 000000000..ab76ef837 Binary files /dev/null and b/rust/transaction-filter/fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4 differ diff --git a/rust/transaction-filter/fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4 b/rust/transaction-filter/fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4 new file mode 100644 index 000000000..512e84e77 Binary files /dev/null and b/rust/transaction-filter/fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4 differ diff --git a/rust/transaction-filter/src/errors.rs b/rust/transaction-filter/src/errors.rs new file mode 100644 index 000000000..a6289cf40 --- /dev/null +++ b/rust/transaction-filter/src/errors.rs @@ -0,0 +1,81 @@ +use serde::{Serialize, Serializer}; +use std::fmt::Display; +use thiserror::Error as ThisError; + +#[derive(Debug, Serialize)] +pub struct FilterStepTrace { + pub serialized_filter: String, + pub filter_type: String, +} + +impl Display for FilterStepTrace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {}", self.filter_type, self.serialized_filter) + } +} +#[derive(Debug)] +pub struct SerializableError { + pub inner: Box, +} + +/// Custom error that allows for keeping track of the filter type/path that caused the error +#[derive(Debug, Serialize, ThisError)] +pub struct FilterError { + pub filter_path: Vec, + pub error: SerializableError, +} + +impl FilterError { + pub fn new(error: Box) -> Self { + Self { + filter_path: Vec::new(), + error: SerializableError::new(error), + } + } + + pub fn add_trace(&mut self, serialized_filter: String, filter_type: String) { + self.filter_path.push(FilterStepTrace { + serialized_filter, + filter_type, + }); + } +} + +impl From for FilterError { + fn from(error: anyhow::Error) -> Self { + Self::new(error.into()) + } +} + +impl Display for FilterError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let trace_path = self + .filter_path + .iter() + .map(|trace| format!("{}", trace)) + .collect::>() + .join("\n"); + write!( + f, + "Filter Error: {:?}\nTrace Path:\n{}", + self.error.inner, trace_path + ) + } +} + +impl SerializableError { + fn new(error: Box) -> Self { + SerializableError { inner: error } + } +} + +// Implement Serialize for the wrapper +impl Serialize for SerializableError { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // Serialize the error as its string representation + serializer.serialize_str(&self.inner.to_string()) + } +} diff --git a/rust/transaction-filter/src/filter_operator.rs b/rust/transaction-filter/src/filter_operator.rs new file mode 100644 index 000000000..a54f41ca5 --- /dev/null +++ b/rust/transaction-filter/src/filter_operator.rs @@ -0,0 +1,303 @@ +use crate::{ + errors::FilterError, + filters::{ + EventFilter, TransactionRootFilter, UserTransactionRequestFilter, WriteSetChangeFilter, + }, + traits::Filterable, +}; +use aptos_protos::transaction::v1::{transaction::TxnData, Transaction}; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +/// These are filters we would expect to be exposed via API +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(tag = "type")] +pub enum APIFilter { + TransactionRootFilter(TransactionRootFilter), + UserTransactionRequestFilter(UserTransactionRequestFilter), + EventFilter(EventFilter), + WriteSetChangeFilter(WriteSetChangeFilter), +} + +impl Filterable for APIFilter { + fn validate_state(&self) -> Result<(), FilterError> { + match self { + APIFilter::TransactionRootFilter(filter) => filter.is_valid(), + APIFilter::UserTransactionRequestFilter(filter) => filter.is_valid(), + APIFilter::EventFilter(filter) => filter.is_valid(), + APIFilter::WriteSetChangeFilter(filter) => filter.is_valid(), + } + } + + fn is_allowed(&self, txn: &Transaction) -> bool { + match self { + APIFilter::TransactionRootFilter(filter) => filter.is_allowed(txn), + APIFilter::UserTransactionRequestFilter(ut_filter) => ut_filter.is_allowed(txn), + APIFilter::EventFilter(events_filter) => { + if let Some(txn_data) = &txn.txn_data { + let events = match txn_data { + TxnData::BlockMetadata(bm) => &bm.events, + TxnData::Genesis(g) => &g.events, + TxnData::StateCheckpoint(_) => return false, + TxnData::User(u) => &u.events, + TxnData::Validator(_) => return false, + }; + events_filter.is_allowed_vec(events) + } else { + false + } + }, + APIFilter::WriteSetChangeFilter(changes_filter) => { + changes_filter.is_allowed_opt_vec(&txn.info.as_ref().map(|inf| &inf.changes)) + }, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +pub enum FilterOperator { + And(LogicalAnd), + Or(LogicalOr), + Not(LogicalNot), + Filter(APIFilter), +} + +impl FilterOperator { + pub fn new_and(and: Vec) -> Self { + FilterOperator::And(LogicalAnd { and }) + } + + pub fn new_or(or: Vec) -> Self { + FilterOperator::Or(LogicalOr { or }) + } + + pub fn new_not(not: Vec) -> Self { + FilterOperator::Not(LogicalNot { not }) + } + + pub fn new_filter(filter: APIFilter) -> Self { + FilterOperator::Filter(filter) + } +} + +impl Filterable for FilterOperator { + fn validate_state(&self) -> Result<(), FilterError> { + match self { + FilterOperator::And(and) => and.is_valid(), + FilterOperator::Or(or) => or.is_valid(), + FilterOperator::Not(not) => not.is_valid(), + FilterOperator::Filter(filter) => filter.is_valid(), + } + } + + fn is_allowed(&self, item: &Transaction) -> bool { + match self { + FilterOperator::And(and) => and.is_allowed(item), + FilterOperator::Or(or) => or.is_allowed(item), + FilterOperator::Not(not) => not.is_allowed(item), + FilterOperator::Filter(filter) => filter.is_allowed(item), + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalAnd { + and: Vec, +} + +impl Filterable for LogicalAnd { + fn validate_state(&self) -> Result<(), FilterError> { + for filter in &self.and { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + self.and.iter().all(|filter| filter.is_allowed(item)) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalOr { + or: Vec, +} + +impl Filterable for LogicalOr { + fn validate_state(&self) -> Result<(), FilterError> { + for filter in &self.or { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + self.or.iter().any(|filter| filter.is_allowed(item)) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct LogicalNot { + not: Vec, +} + +impl Filterable for LogicalNot { + fn validate_state(&self) -> Result<(), FilterError> { + for filter in &self.not { + filter.is_valid()?; + } + Ok(()) + } + + fn is_allowed(&self, item: &Transaction) -> bool { + !self.not.iter().any(|filter| filter.is_allowed(item)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + filters::{ + user_transaction_request::EntryFunctionFilter, + write_set_change_filter::{ + ChangeItemFilter, ModuleChangeFilter, ResourceChangeFilter, TableChangeFilter, + }, + MoveStructTagFilter, PositionalFilter, UserTransactionPayloadFilter, + }, + json_search::{JsonOrStringSearch, JsonSearchTerm}, + test_lib::load_graffio_fixture, + }; + + #[test] + pub fn test_query_parsing() { + let trf = TransactionRootFilter { + success: Some(true), + txn_type: Some(aptos_protos::transaction::v1::transaction::TransactionType::User), + }; + + let utrf = UserTransactionRequestFilter { + sender: Some("0x0011".into()), + payload: Some(UserTransactionPayloadFilter { + function: Some(EntryFunctionFilter { + address: Some("0x001".into()), + module: Some("module".into()), + function: Some("F".into()), + }), + arguments: Some(vec![PositionalFilter { + index: 0, + value: "0x0011".into(), + }]), + }), + }; + + let ef = EventFilter { + data: Some(JsonSearchTerm::new("spins".into(), 5.into()).unwrap()), + struct_type: Some(MoveStructTagFilter { + address: Some("0x0077".into()), + module: Some("roulette".into()), + name: Some("spin".into()), + }), + }; + + let wscf_res = WriteSetChangeFilter { + change: Some(ChangeItemFilter::ResourceChange(ResourceChangeFilter { + resource_type: Some(MoveStructTagFilter { + address: Some("0x001af32".into()), + module: Some("airport".into()), + name: Some("airplane".into()), + }), + address: Some("0x001af32".into()), + data: Some(JsonSearchTerm::new("takeoff".into(), true.into()).unwrap()), + })), + }; + let wscf_table = WriteSetChangeFilter { + change: Some(ChangeItemFilter::TableChange(TableChangeFilter { + handle: Some("0x796857465434253644536475453432453".into()), + key: Some(JsonOrStringSearch::String("table_key".into())), + key_type_str: Some("0x423453466345::some_module::SomeStruct".into()), + })), + }; + let wscf_mod = WriteSetChangeFilter { + change: Some(ChangeItemFilter::ModuleChange(ModuleChangeFilter { + address: Some("0x0000098".into()), + })), + }; + + let write_set_ors = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_res)), + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_table)), + FilterOperator::Filter(APIFilter::WriteSetChangeFilter(wscf_mod)), + ]); + + let event_filter_or_write_set = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::EventFilter(ef)), + write_set_ors, + ]); + + let transaction_root_and_request_filter = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::TransactionRootFilter(trf)), + FilterOperator::Filter(APIFilter::UserTransactionRequestFilter(utrf)), + ]); + + let query = FilterOperator::new_or(vec![ + transaction_root_and_request_filter, + event_filter_or_write_set, + ]); + + println!( + "JSON RESULT: \n {}", + serde_json::to_string_pretty(&query).unwrap() + ); + + let txns = load_graffio_fixture(); + + // Benchmark how long it takes to do this 100 times + let start = std::time::Instant::now(); + const LOOPS: i32 = 1000; + for _ in 0..LOOPS { + for txn in &txns.transactions { + query.is_allowed(txn); + } + } + let elapsed = start.elapsed(); + + let total_txn = LOOPS * txns.transactions.len() as i32; + println!( + "BENCH: Took {:?} for {} transactions ({:?} each)", + elapsed, + total_txn, + elapsed / total_txn as u32 + ); + + let ef_econia = EventFilter { + data: None, + struct_type: Some(MoveStructTagFilter { + address: Some("0x00ECONIA".into()), + module: None, + name: None, + }), + }; + let ef_aries = EventFilter { + data: None, + struct_type: Some(MoveStructTagFilter { + address: Some("0x00ARIES".into()), + module: None, + name: None, + }), + }; + let query = FilterOperator::new_or(vec![ + FilterOperator::Filter(APIFilter::EventFilter(ef_econia)), + FilterOperator::Filter(APIFilter::EventFilter(ef_aries)), + ]); + println!( + "JSON RESULT: \n {}", + serde_json::to_string_pretty(&query).unwrap() + ); + + //println!("Filter result for u32: {}", filter.is_allowed(&item_u32)); // true + //println!("Filter result for String: {}", filter.is_allowed(&item_s)); // false + } +} diff --git a/rust/transaction-filter/src/filters/event.rs b/rust/transaction-filter/src/filters/event.rs new file mode 100644 index 000000000..a5fa98ccb --- /dev/null +++ b/rust/transaction-filter/src/filters/event.rs @@ -0,0 +1,51 @@ +use crate::{ + errors::FilterError, filters::MoveStructTagFilter, json_search::JsonSearchTerm, + traits::Filterable, +}; +use anyhow::Error; +use aptos_protos::transaction::v1::{move_type::Content, Event}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct EventFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, + // Only for events that have a struct as their generic + #[serde(skip_serializing_if = "Option::is_none")] + pub struct_type: Option, +} + +impl Filterable for EventFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.data.is_none() && self.struct_type.is_none() { + return Err(Error::msg("At least one of data or struct_type must be set").into()); + }; + + self.data.is_valid()?; + self.struct_type.is_valid()?; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &Event) -> bool { + if let Some(struct_type_filter) = &self.struct_type { + if let Some(Content::Struct(struct_tag)) = + &item.r#type.as_ref().and_then(|t| t.content.as_ref()) + { + if !struct_type_filter.is_allowed(struct_tag) { + return false; + } + } else { + return false; + } + } + + if !self.data.is_allowed(&item.data) { + return false; + } + + true + } +} diff --git a/rust/transaction-filter/src/filters/mod.rs b/rust/transaction-filter/src/filters/mod.rs new file mode 100644 index 000000000..3aca4e6cd --- /dev/null +++ b/rust/transaction-filter/src/filters/mod.rs @@ -0,0 +1,14 @@ +pub mod event; +pub mod move_module; +pub mod positional; +pub mod transaction_root; +pub mod user_transaction_request; +pub mod write_set_change_filter; + +// Re-export for easier use +pub use event::EventFilter; +pub use move_module::{MoveModuleFilter, MoveStructTagFilter}; +pub use positional::PositionalFilter; +pub use transaction_root::TransactionRootFilter; +pub use user_transaction_request::{UserTransactionPayloadFilter, UserTransactionRequestFilter}; +pub use write_set_change_filter::WriteSetChangeFilter; diff --git a/rust/transaction-filter/src/filters/move_module.rs b/rust/transaction-filter/src/filters/move_module.rs new file mode 100644 index 000000000..2e23eeb2c --- /dev/null +++ b/rust/transaction-filter/src/filters/move_module.rs @@ -0,0 +1,56 @@ +use crate::{errors::FilterError, traits::Filterable}; +use anyhow::anyhow; +use aptos_protos::transaction::v1::{MoveModuleId, MoveStructTag}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct MoveModuleFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl Filterable for MoveModuleFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.address.is_none() && self.name.is_none() { + return Err(anyhow!("At least one of address or name must be set").into()); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, module_id: &MoveModuleId) -> bool { + self.address.is_allowed(&module_id.address) && self.name.is_allowed(&module_id.name) + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct MoveStructTagFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub module: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, +} + +impl Filterable for MoveStructTagFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.address.is_none() && self.module.is_none() && self.name.is_none() { + return Err(anyhow!("At least one of address, module or name must be set").into()); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, struct_tag: &MoveStructTag) -> bool { + self.address.is_allowed(&struct_tag.address) + && self.module.is_allowed(&struct_tag.module) + && self.name.is_allowed(&struct_tag.name) + } +} diff --git a/rust/transaction-filter/src/filters/positional.rs b/rust/transaction-filter/src/filters/positional.rs new file mode 100644 index 000000000..304a962d7 --- /dev/null +++ b/rust/transaction-filter/src/filters/positional.rs @@ -0,0 +1,45 @@ +use crate::{errors::FilterError, traits::Filterable}; +use anyhow::Error; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +/// Allows matching a given value within an array of values, by index +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct PositionalFilter +where + T: PartialEq + Debug, +{ + pub value: T, + pub index: usize, +} + +impl Filterable> for PositionalFilter +where + T: PartialEq + Debug + Serialize, +{ + fn validate_state(&self) -> Result<(), FilterError> { + Ok(()) + } + + fn is_allowed(&self, items: &Vec) -> bool { + items.get(self.index).map_or(false, |v| v == &self.value) + } +} + +impl Filterable> for Vec> +where + T: PartialEq + Debug + Serialize, +{ + fn validate_state(&self) -> Result<(), FilterError> { + if self.is_empty() { + return Err(Error::msg("PositionalFilter must have at least one element").into()); + } + Ok(()) + } + + fn is_allowed(&self, items: &Vec) -> bool { + self.iter() + .all(|arg| items.get(arg.index).map_or(false, |v| v == &arg.value)) + } +} diff --git a/rust/transaction-filter/src/filters/transaction_root.rs b/rust/transaction-filter/src/filters/transaction_root.rs new file mode 100644 index 000000000..bb81556e7 --- /dev/null +++ b/rust/transaction-filter/src/filters/transaction_root.rs @@ -0,0 +1,43 @@ +use crate::{errors::FilterError, traits::Filterable}; +use anyhow::Error; +use aptos_protos::transaction::v1::{transaction::TransactionType, Transaction}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TransactionRootFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub success: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub txn_type: Option, +} + +impl Filterable for TransactionRootFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.success.is_none() && self.txn_type.is_none() { + return Err(Error::msg("At least one of success or txn_types must be set").into()); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &Transaction) -> bool { + if !self + .success + .is_allowed_opt(&item.info.as_ref().map(|i| i.success)) + { + return false; + } + + if let Some(txn_type) = &self.txn_type { + if txn_type + != &TransactionType::try_from(item.r#type).expect("Invalid transaction type") + { + return false; + } + } + + true + } +} diff --git a/rust/transaction-filter/src/filters/user_transaction_request.rs b/rust/transaction-filter/src/filters/user_transaction_request.rs new file mode 100644 index 000000000..a4bf36f64 --- /dev/null +++ b/rust/transaction-filter/src/filters/user_transaction_request.rs @@ -0,0 +1,158 @@ +use crate::{errors::FilterError, filters::PositionalFilter, traits::Filterable}; +use anyhow::{anyhow, Error}; +use aptos_protos::transaction::v1::{ + multisig_transaction_payload, transaction::TxnData, transaction_payload, EntryFunctionId, + EntryFunctionPayload, Transaction, TransactionPayload, +}; +use serde::{Deserialize, Serialize}; + +/// We use this for UserTransactions. +/// We support UserPayload and MultisigPayload +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct UserTransactionRequestFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub sender: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload: Option, +} + +impl Filterable for UserTransactionRequestFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.sender.is_none() && self.payload.is_none() { + return Err(Error::msg("At least one of sender or payload must be set").into()); + }; + self.payload.is_valid()?; + Ok(()) + } + + #[inline] + fn is_allowed(&self, txn: &Transaction) -> bool { + let user_request = if let Some(TxnData::User(u)) = txn.txn_data.as_ref() { + if let Some(user_request) = u.request.as_ref() { + user_request + } else { + return false; + } + } else { + return false; + }; + + if let Some(sender_filter) = &self.sender { + if &user_request.sender != sender_filter { + return false; + } + } + + if let Some(payload_filter) = &self.payload { + // Get the entry_function_payload from both UserPayload and MultisigPayload + let entry_function_payload = user_request + .payload + .as_ref() + .and_then(get_entry_function_payload_from_transaction_payload); + if let Some(payload) = entry_function_payload { + // Here we have an actual EntryFunctionPayload + if !payload_filter.is_allowed(payload) { + return false; + } + } + } + + true + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct EntryFunctionFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub module: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub function: Option, +} + +impl Filterable for EntryFunctionFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.address.is_none() && self.module.is_none() && self.function.is_none() { + return Err(anyhow!("At least one of address, name or function must be set").into()); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, module_id: &EntryFunctionId) -> bool { + if !self.module.is_allowed(&module_id.name) { + return false; + } + + if self.address.is_some() || self.function.is_some() { + if let Some(module) = &module_id.module.as_ref() { + if !(self.address.is_allowed(&module.address) + && self.function.is_allowed(&module.name)) + { + return false; + } + } else { + return false; + } + } + + true + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct UserTransactionPayloadFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub function: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option>>, + // TODO: handle type args? +} + +impl Filterable for UserTransactionPayloadFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.function.is_none() && self.arguments.is_none() { + return Err(Error::msg("At least one of function or arguments must be set").into()); + }; + self.function.is_valid()?; + self.arguments.is_valid()?; + Ok(()) + } + + #[inline] + fn is_allowed(&self, payload: &EntryFunctionPayload) -> bool { + self.function.is_allowed_opt(&payload.function) + && self.arguments.is_allowed(&payload.arguments) + } +} + +/// Get the entry_function_payload from both UserPayload and MultisigPayload +fn get_entry_function_payload_from_transaction_payload( + payload: &TransactionPayload, +) -> Option<&EntryFunctionPayload> { + let z = if let Some(payload) = &payload.payload { + match payload { + transaction_payload::Payload::EntryFunctionPayload(ef_payload) => Some(ef_payload), + transaction_payload::Payload::MultisigPayload(ms_payload) => ms_payload + .transaction_payload + .as_ref() + .and_then(|tp| tp.payload.as_ref()) + .map(|payload| match payload { + multisig_transaction_payload::Payload::EntryFunctionPayload(ef_payload) => { + ef_payload + }, + }), + _ => None, + } + } else { + None + }; + z +} diff --git a/rust/transaction-filter/src/filters/write_set_change_filter.rs b/rust/transaction-filter/src/filters/write_set_change_filter.rs new file mode 100644 index 000000000..a43eace7d --- /dev/null +++ b/rust/transaction-filter/src/filters/write_set_change_filter.rs @@ -0,0 +1,327 @@ +use crate::{ + errors::FilterError, + filters::MoveStructTagFilter, + json_search::{JsonOrStringSearch, JsonSearchTerm}, + traits::Filterable, +}; +use anyhow::Error; +use aptos_protos::transaction::v1::{ + write_set_change::Change, DeleteModule, DeleteResource, DeleteTableItem, WriteModule, + WriteResource, WriteSetChange, WriteTableItem, +}; +use serde::{Deserialize, Serialize}; + +/// This is a wrapper around ChangeItemFilter, which differs because: +/// While `ChangeItemFilter` will return false if the Event does not match the filter, +/// `ChangeItemFilter` will return true- i.e `WriteSetChangeFilter` *only* tries to match if the +/// change type matches its internal change type +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct WriteSetChangeFilter { + // TODO: handle actual changes!!! + #[serde(skip_serializing_if = "Option::is_none")] + pub change: Option, +} + +impl Filterable for WriteSetChangeFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.change.is_none() { + return Err(Error::msg("field change must be set").into()); + }; + self.change.is_valid()?; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &WriteSetChange) -> bool { + if let Some(change_filter) = &self.change { + if let Some(change) = item.change.as_ref() { + match change { + Change::DeleteModule(dm) => { + if let ChangeItemFilter::ModuleChange(mcf) = change_filter { + if !mcf.is_allowed(&ModuleChange::DeleteModule(dm)) { + return false; + } + } + }, + Change::WriteModule(wm) => { + if let ChangeItemFilter::ModuleChange(mcf) = change_filter { + if !mcf.is_allowed(&ModuleChange::WriteModule(wm)) { + return false; + } + } + }, + Change::DeleteResource(dr) => { + if let ChangeItemFilter::ResourceChange(rcf) = change_filter { + if !rcf.is_allowed(&ResourceChange::DeleteResource(dr)) { + return false; + } + } + }, + Change::WriteResource(wr) => { + if let ChangeItemFilter::ResourceChange(rcf) = change_filter { + if !rcf.is_allowed(&ResourceChange::WriteResource(wr)) { + return false; + } + } + }, + Change::DeleteTableItem(dti) => { + if let ChangeItemFilter::TableChange(tcf) = change_filter { + if !tcf.is_allowed(&TableChange::DeleteTableItem(dti)) { + return false; + } + } + }, + Change::WriteTableItem(wti) => { + if let ChangeItemFilter::TableChange(tcf) = change_filter { + if !tcf.is_allowed(&TableChange::WriteTableItem(wti)) { + return false; + } + } + }, + } + } else { + return false; + } + } + + true + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(tag = "type")] +pub enum ChangeItemFilter { + ResourceChange(ResourceChangeFilter), + ModuleChange(ModuleChangeFilter), + TableChange(TableChangeFilter), +} + +impl Filterable for ChangeItemFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + match self { + ChangeItemFilter::ResourceChange(rcf) => rcf.is_valid(), + ChangeItemFilter::ModuleChange(mcf) => mcf.is_valid(), + ChangeItemFilter::TableChange(tcf) => tcf.is_valid(), + } + } + + #[inline] + fn is_allowed(&self, item: &Change) -> bool { + match item { + Change::DeleteModule(dm) => { + if let ChangeItemFilter::ModuleChange(mcf) = self { + return mcf.is_allowed(&ModuleChange::DeleteModule(dm)); + } + false + }, + Change::WriteModule(wm) => { + if let ChangeItemFilter::ModuleChange(mcf) = self { + return mcf.is_allowed(&ModuleChange::WriteModule(wm)); + } + false + }, + Change::DeleteResource(dr) => { + if let ChangeItemFilter::ResourceChange(rcf) = self { + return rcf.is_allowed(&ResourceChange::DeleteResource(dr)); + } + false + }, + Change::WriteResource(wr) => { + if let ChangeItemFilter::ResourceChange(rcf) = self { + return rcf.is_allowed(&ResourceChange::WriteResource(wr)); + } + false + }, + Change::DeleteTableItem(dti) => { + if let ChangeItemFilter::TableChange(tcf) = self { + return tcf.is_allowed(&TableChange::DeleteTableItem(dti)); + } + false + }, + Change::WriteTableItem(wti) => { + if let ChangeItemFilter::TableChange(tcf) = self { + return tcf.is_allowed(&TableChange::WriteTableItem(wti)); + } + false + }, + } + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ResourceChangeFilter { + // todo: handle `generic_type_params` as well + #[serde(skip_serializing_if = "Option::is_none")] + pub resource_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, + // This is only applicable to WriteResource, but I'm lazy + #[serde(skip_serializing_if = "Option::is_none")] + pub data: Option, +} + +pub enum ResourceChange<'a> { + DeleteResource(&'a DeleteResource), + WriteResource(&'a WriteResource), +} + +impl Filterable> for ResourceChangeFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.resource_type.is_none() && self.address.is_none() { + return Err( + Error::msg("At least one of resource_type, address, or data must be set").into(), + ); + }; + self.resource_type.is_valid()?; + self.data.is_valid()?; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &ResourceChange) -> bool { + match &item { + ResourceChange::DeleteResource(dr) => { + if let Some(address) = &self.address { + if address != &dr.address { + return false; + } + } + if let Some(resource_type) = &self.resource_type { + if !resource_type.is_allowed_opt(&dr.r#type) { + return false; + } + } + if self.data.is_some() { + return false; + } + }, + ResourceChange::WriteResource(wr) => { + if let Some(address) = &self.address { + if address != &wr.address { + return false; + } + } + if let Some(resource_type) = &self.resource_type { + if !resource_type.is_allowed_opt(&wr.r#type) { + return false; + } + } + if let Some(data) = &self.data { + if !data.find(&wr.data) { + return false; + } + } + }, + } + true + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ModuleChangeFilter { + #[serde(skip_serializing_if = "Option::is_none")] + pub address: Option, +} +pub enum ModuleChange<'a> { + DeleteModule(&'a DeleteModule), + WriteModule(&'a WriteModule), +} + +impl Filterable> for ModuleChangeFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.address.is_none() { + return Err(Error::msg("At least one of address must be set").into()); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &ModuleChange) -> bool { + if let Some(address) = &self.address { + return match &item { + ModuleChange::DeleteModule(dm) => address == &dm.address, + ModuleChange::WriteModule(wm) => address == &wm.address, + }; + } + true + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TableChangeFilter { + pub handle: Option, + pub key: Option, + pub key_type_str: Option, +} + +pub enum TableChange<'a> { + DeleteTableItem(&'a DeleteTableItem), + WriteTableItem(&'a WriteTableItem), +} +impl Filterable> for TableChangeFilter { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + if self.handle.is_none() && self.key.is_none() && self.key_type_str.is_none() { + return Err( + Error::msg("At least one of handle, key, or key_type_str must be set").into(), + ); + }; + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &TableChange) -> bool { + match &item { + TableChange::DeleteTableItem(dti) => { + if let Some(handle) = &self.handle { + return handle == &dti.handle; + } + if let Some(key_type) = &self.key_type_str { + if !dti + .data + .as_ref() + .map_or(false, |dtd| key_type == &dtd.key_type) + { + return false; + } + } + if let Some(key) = &self.key { + if !dti + .data + .as_ref() + .map_or(false, |dtd| key.is_allowed(&dtd.key)) + { + return false; + } + } + }, + TableChange::WriteTableItem(wti) => { + if let Some(handle) = &self.handle { + if handle != &wti.handle { + return false; + } + } + if let Some(key_type) = &self.key_type_str { + if !wti + .data + .as_ref() + .map_or(false, |wtd| key_type == &wtd.key_type) + { + return false; + } + } + self.key.is_allowed(&wti.key); + }, + } + true + } +} diff --git a/rust/transaction-filter/src/json_search.rs b/rust/transaction-filter/src/json_search.rs new file mode 100644 index 000000000..fdb0faf5d --- /dev/null +++ b/rust/transaction-filter/src/json_search.rs @@ -0,0 +1,406 @@ +use crate::{errors::FilterError, traits::Filterable}; +use memchr::memmem::Finder; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +/** +Find multiple needles in a haystack. JSON parsing is relatively expensive, so we + instead treat the JSON as a string and do simple matching. + This means false positives are possible. + +Currently, we use the `memchr` crate, which is a SIMD-accelerated library for finding bytes in bytes. + This is faster than the naive `str::find`, however it does repeat work for each needle. + There are alternatives, such as Aho-Corasick algorithm (https://github.com/BurntSushi/aho-corasick), + which uses a trie-based for finding multiple needles in a haystack in a single pass. + This is a good candidate for future work. + +Given we serialize some number types as integers, and some others as strings, we have to search for both options. +This approach means: + 1. It's impossible to specify _where_ in the JSON the key/value pair is. I.e a search for a key of "address" + and value of "0x5" will match both `{"address": "0x5"}`, and `{"inner": {"address": "0x5"}}`. + 2. The above means false positives are clearly possible. Depending on ecosystem feedback and overall performance, + we may change this, and offer full json support. There are SIMD accelerated JSON parsers available, such as + https://github.com/cloudwego/sonic-rs or https://github.com/simd-lite/simd-json . Benchmarks will be needed to + determine if this is worth it. +*/ + +pub const MIN_KEY_TERM_LENGTH: usize = 3; + +#[derive(Error, Debug)] +pub enum JSONSearchError { + #[error("The json type `{0}` is not supported in searches")] + UnsupportedJsonType(&'static str), + #[error( + "The key name is too short, must be at least {} characters", + MIN_KEY_TERM_LENGTH + )] + KeyNameTooShort, + #[error("Invalid JSON key term")] + InvalidKeyTerm, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +pub struct SerializedJsonSearchTerm { + pub key: String, + pub value: serde_json::Value, +} + +impl TryInto for SerializedJsonSearchTerm { + type Error = JSONSearchError; + + fn try_into(self) -> Result { + JsonSearchTerm::new(self.key, self.value) + } +} + +impl From for SerializedJsonSearchTerm { + fn from(term: JsonSearchTerm) -> Self { + term.original_term + } +} + +// Custom serde serialization/deserialization that uses above tryinto and from implementations +impl Serialize for JsonSearchTerm { + fn serialize(&self, serializer: S) -> Result + where + S: serde::ser::Serializer, + { + SerializedJsonSearchTerm::from(self.clone()).serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for JsonSearchTerm { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + let term = SerializedJsonSearchTerm::deserialize(deserializer)?; + term.try_into().map_err(serde::de::Error::custom) + } +} + +#[derive(Clone)] +pub struct JsonSearchTerm { + original_term: SerializedJsonSearchTerm, + // We use an owned finder; while this clones the string once, this is only done once at instantiation + // Because we serialize some numbers as strings, and some as integers, we need to search for both + key_finders: Vec>, +} + +impl PartialEq for JsonSearchTerm { + fn eq(&self, other: &Self) -> bool { + self.original_term == other.original_term + } +} + +impl std::fmt::Debug for JsonSearchTerm { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let keys = &self + .key_finders + .iter() + .map(|f| String::from_utf8_lossy(f.needle())); + f.debug_struct("JsonSearchTerm") + .field("key_finders", keys) + .field("original_term", &self.original_term) + .finish() + } +} + +impl JsonSearchTerm { + pub fn new(key: String, value: serde_json::Value) -> Result { + if key.len() < MIN_KEY_TERM_LENGTH { + Err(JSONSearchError::KeyNameTooShort)?; + } + let values = match value.clone() { + // We use to_string here to we get the quoted JSON value + serde_json::Value::String(s) => vec![double_encode_json_string(s)], + // For numbers, we need to search for both the string and the number. + // TODO: There is probably a better way to do this + serde_json::Value::Number(n) => vec![n.to_string(), format!("\\\"{}\\\"", n)], + serde_json::Value::Bool(b) => vec![b.to_string()], + serde_json::Value::Null => vec!["null".to_string()], + // Maybe we'll support these in the future, but it's more complicated, so for now we don't + // TODO: reconsider supporting arrays and/or other types + serde_json::Value::Array(_) => Err(JSONSearchError::UnsupportedJsonType("Array"))?, + serde_json::Value::Object(_) => Err(JSONSearchError::UnsupportedJsonType("Object"))?, + }; + + // We need to account for the fact that the key is quoted in the JSON: so we double quote. + let encoded_key = double_encode_json_string(key.clone()); + // And we append the `:` to the key, and the value + let key_finders = values + .iter() + .map(|v| Finder::new(&format!("{}:{}", encoded_key, v)).into_owned()) + .collect(); + Ok(Self { + key_finders, + original_term: SerializedJsonSearchTerm { key, value }, + }) + } + + pub fn from_json(json: serde_json::Value) -> Result { + let key = json["key"] + .as_str() + .ok_or(JSONSearchError::InvalidKeyTerm)?; + let value = json["value"].clone(); + Self::new(key.to_string(), value) + } + + pub fn find(&self, haystack: &str) -> bool { + let haystack_bytes = haystack.as_bytes(); + self.key_finders + .iter() + .any(|finder| finder.find(haystack_bytes).is_some()) + } +} + +impl Filterable for JsonSearchTerm { + fn validate_state(&self) -> Result<(), FilterError> { + // Validation is performed elsewhere + Ok(()) + } + + fn is_allowed(&self, item: &String) -> bool { + self.find(item) + } +} + +#[derive(Debug, Deserialize, PartialEq, Serialize)] +#[serde(deny_unknown_fields)] +#[serde(untagged)] +pub enum JsonOrStringSearch { + Json(JsonSearchTerm), + String(String), +} + +impl Filterable for JsonOrStringSearch { + fn validate_state(&self) -> Result<(), FilterError> { + match self { + JsonOrStringSearch::Json(json) => json.is_valid(), + JsonOrStringSearch::String(_) => Ok(()), + } + } + + fn is_allowed(&self, item: &String) -> bool { + match self { + JsonOrStringSearch::Json(json) => json.is_allowed(item), + JsonOrStringSearch::String(s) => item == s, + } + } +} + +pub fn double_encode_json_string(s: String) -> String { + let s = serde_json::Value::String(s.to_string()).to_string(); + let s = serde_json::Value::String(s).to_string(); + // Then we remove the leading and trailing quotes + s[1..s.len() - 1].to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use std::{borrow::Cow, str::FromStr}; + + #[test] + fn test_double_encode_json_string() { + let s = "hello"; + let expected = "\\\"hello\\\""; + assert_eq!(expected, double_encode_json_string(s.to_string())); + } + + fn print_search_debug(needle: &JsonSearchTerm) { + let needles_str = needle + .key_finders + .iter() + .map(|f| String::from_utf8_lossy(f.needle())) + .collect::>>() + .join(", "); + println!("Needles: {}", needles_str); + } + + fn search_test_case(test_json: &serde_json::Value, key: &str, value: serde_json::Value) { + let test_json = serde_json::Value::String(test_json.to_string()).to_string(); + + println!( + "\n==== Searching for `{}: {}` in `{}` ====", + key, value, test_json + ); + + let needle = JsonSearchTerm::new(key.into(), value).unwrap(); + print_search_debug(&needle); + println!("Haystack: {}", test_json); + assert!(needle.find(&test_json), "Failed to find needle in haystack"); + } + + #[test] + fn test_json_search() { + let test_json = json!( + { + // String and nested string + "address": "0x3", + "inner": { + "b": "c", + "address": "0x5" + }, + + // Null + "nullval": null, + + // Numbers + "somenum": 5, + "bignum": "101", + + // Bools + "trueval": true, + "falseval": false, + } + ); + + // String and nested string + search_test_case( + &test_json, + "address", + serde_json::Value::String("0x3".into()), + ); + + search_test_case( + &test_json, + "address", + serde_json::Value::String("0x5".into()), + ); + + // Null + search_test_case(&test_json, "nullval", serde_json::Value::Null); + + // Numbers + search_test_case(&test_json, "somenum", serde_json::Value::Number(5.into())); + search_test_case( + &test_json, + "bignum", + serde_json::Value::String("101".into()), + ); + + // Bools + search_test_case(&test_json, "trueval", serde_json::Value::Bool(true)); + search_test_case(&test_json, "falseval", serde_json::Value::Bool(false)); + } + + /** + For searching `inner.address: 0x5` in a singly nested json, the results are: + ```text + BENCH: Memchr took 19.75µs for 1000 iters (19ns each) + BENCH: Serde Search took 1.82875ms for 1000 iters (1.828µs each) + Memchr is 96x faster than Serde + ``` + + If we double that json- i.e add an inner2 with the contents of test_json, we get: + ```text + BENCH: Memchr took 54.334µs for 1000 iters (54ns each) + BENCH: Serde Search took 14.213292ms for 1000 iters (14.213µs each) + Memchr is 263x faster than Serde + ``` + + This is excluding memory allocation, for which serde Value is [not great](https://github.com/serde-rs/json/issues/635) + + if we look at something like graffio txns, we’d be looking at more than three orders of magnitude in difference + The main problem/optimization is that memchr does a tiny bit of work on startup (few ns), but is then re-used forever; + as long as the stream remains, the per json search is relatively constant, because it’s just so fast, and our jsons are relatively small + + Serde however is not: it scales pretty linearly (and then some), and so the larger the json, the bigger the delta + + Whether we care about an extra 25-50ms per batch is a different story and this is, of course, with serde; + it’s possible using one of the more efficient json parsers I looked into we could shave, maybe, [20-30% off that time](https://github.com/serde-rs/json-benchmark) + **/ + #[test] + fn test_bench_json_search_vs_serde() { + let mut test_json = json!( + { + // String and nested string + "address": "0x3", + "inner": { + "b": "c", + "address": "0x5" + }, + + // Null + "nullval": null, + + // Numbers + "somenum": 5, + "bignum": "101", + + // Bools + "trueval": true, + "falseval": false, + } + ); + + let test_json_clone = test_json.clone(); + test_json + .get_mut("inner") + .unwrap() + .as_object_mut() + .unwrap() + .insert("inner2".to_string(), test_json_clone); + + let test_json_encoded = serde_json::Value::String(test_json.to_string()).to_string(); + + let needle = + JsonSearchTerm::new("address".into(), serde_json::Value::String("0x5".into())).unwrap(); + + let start = std::time::Instant::now(); + + const ITERATIONS: usize = 1000; + for _ in 0..ITERATIONS { + needle.find(&test_json_encoded); + } + + let elapsed = start.elapsed(); + let memchr_average = elapsed / ITERATIONS as u32; + println!( + "BENCH: Memchr took {:?} for {} iters ({:?} each)", + elapsed, ITERATIONS, memchr_average + ); + + let json_search_term = ["inner".to_string(), "address".to_string()]; + let json_search_value = serde_json::Value::String("0x5".into()); + for _ in 0..ITERATIONS { + let test_json_serval = serde_json::Value::from_str(&test_json_encoded).unwrap(); + let test_json_serval = + serde_json::Value::from_str(test_json_serval.as_str().unwrap()).unwrap(); + + if !test_json_serval.is_object() { + panic!("Expected object"); + } + let mut current = &test_json_serval; + for key in json_search_term.iter() { + if let Some(next) = current.get(key) { + current = next; + } else { + break; + } + } + + // Ensure we found the value + if current != &json_search_value { + panic!( + "Failed to find needle in haystack: \n{:} \n{:} \n<<<", + current, json_search_value + ); + } + } + let elapsed = start.elapsed(); + let serde_average = elapsed / ITERATIONS as u32; + println!( + "BENCH: Serde Search took {:?} for {} iters ({:?} each)", + elapsed, ITERATIONS, serde_average + ); + + println!( + "Memchr is {:?}x faster than Serde", + serde_average.as_nanos() / memchr_average.as_nanos() + ); + } +} diff --git a/rust/transaction-filter/src/lib.rs b/rust/transaction-filter/src/lib.rs new file mode 100644 index 000000000..a3a69e679 --- /dev/null +++ b/rust/transaction-filter/src/lib.rs @@ -0,0 +1,43 @@ +pub mod filter_operator; +pub mod filters; +pub mod json_search; +pub mod traits; + +mod errors; +#[cfg(test)] +pub mod test_lib; + +/** +The goal of transaction filtering is to be able to save resources downstream of wherever filtering is used. +For this to be true, the filtering itself must be fast, and so we do a few things: + 1. JSON fields are not parsed. Instead, we treat it as a string, and do simple matching. *This means + false positives are possible*. This may change in the future, but JSON parsing is not cheap. + 2. We avoid clones, copies, etc as much as possible + 3. We do a single pass over the transaction data + +There are four different parts of a transaction that are queryable: + 1. The "root" level. This includes: + - Transaction type + - Success + 2. Arbitrary Transaction-type-specific filters. We currently only support the "user" transaction type. + - Sender + - Payload: we only support the entry function payload + - Entry function (address, module, name) + - Entry function ID string + - Arbitrary JSON data + 3. Events. Each event has: + - Key + - Type + - Arbitrary JSON data + 4. WriteSet Changes. Each change may have: + - Type + - Address + - Arbitrary JSON data +**/ + +#[cfg(test)] +mod tests { + + #[test] + fn it_works() {} +} diff --git a/rust/transaction-filter/src/test_lib/mod.rs b/rust/transaction-filter/src/test_lib/mod.rs new file mode 100644 index 000000000..d7672730e --- /dev/null +++ b/rust/transaction-filter/src/test_lib/mod.rs @@ -0,0 +1,36 @@ +use aptos_protos::indexer::v1::TransactionsInStorage; +use prost::Message; +use std::io::Read; + +pub fn decompress_fixture(bytes: &[u8]) -> TransactionsInStorage { + let mut decompressor = lz4::Decoder::new(bytes).expect("Lz4 decompression failed."); + let mut decompressed = Vec::new(); + decompressor + .read_to_end(&mut decompressed) + .expect("Lz4 decompression failed."); + TransactionsInStorage::decode(decompressed.as_slice()).expect("Failed to parse transaction") +} + +#[allow(dead_code)] +pub fn load_taptos_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_00008bc1d5adcf862d3967c1410001fb_705101000.pb.lz4" + ); + decompress_fixture(data) +} + +#[allow(dead_code)] +pub fn load_random_april_3mb_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_0013c194ec4fdbfb8db7306170aac083_445907000.pb.lz4" + ); + decompress_fixture(data) +} + +#[allow(dead_code)] +pub fn load_graffio_fixture() -> TransactionsInStorage { + let data = include_bytes!( + "../../fixtures/compressed_files_lz4_f3d880d9700c70d71fefe71aa9218aa9_301616000.pb.lz4" + ); + decompress_fixture(data) +} diff --git a/rust/transaction-filter/src/traits.rs b/rust/transaction-filter/src/traits.rs new file mode 100644 index 000000000..9a5a402e9 --- /dev/null +++ b/rust/transaction-filter/src/traits.rs @@ -0,0 +1,191 @@ +use crate::errors::FilterError; +use serde::Serialize; +use std::fmt::Debug; + +/// Simple trait to allow for filtering of items of type T +pub trait Filterable +where + Self: Debug + Serialize, +{ + /// Whether this filter is correctly configured/initialized + /// Any call to `validate_state` is responsible for recursively checking the validity of any nested filters *by calling `is_valid`* + /// The actual public API is via `is_valid` which will call `validate_state` and return an error if it fails, but annotated with the filter type/path + fn validate_state(&self) -> Result<(), FilterError>; + + /** + * This is a convenience method to allow for the error to be annotated with the filter type/path at each level + * This is the public API for checking the validity of a filter! + * Example output looks like: + * ```text + * FilterError: This is a test error!. + * Trace Path: + * transaction_filter::traits::test::InnerStruct: {"a":"test"} + * core::option::Option: {"a":"test"} + * transaction_filter::traits::test::OuterStruct: {"inner":{"a":"test"}} + * ``` + **/ + #[inline] + fn is_valid(&self) -> Result<(), FilterError> { + // T + self.validate_state().map_err(|mut e| { + e.add_trace( + serde_json::to_string(self).unwrap(), + std::any::type_name::().to_string(), + ); + e + }) + } + + /// Whether the item is allowed by this filter + /// This is the core method that should be implemented by any filter + /// This is the method that should be called by any parent filter to determine if an item is allowed + /// *If a filter doesn't explicitly prevent an item, then it should be allowed* + /// This forces the logic of `if !child_filter.is_allowed(item) { return false; }` for any parent filter + fn is_allowed(&self, item: &T) -> bool; + + #[inline] + fn is_allowed_vec(&self, items: &[T]) -> bool { + items.iter().all(|item| self.is_allowed(item)) + } + + #[inline] + fn is_allowed_opt(&self, item: &Option) -> bool { + match item { + Some(item) => self.is_allowed(item), + None => false, + } + } + + #[inline] + fn is_allowed_opt_vec(&self, items: &Option<&Vec>) -> bool { + match items { + Some(items) => self.is_allowed_vec(items), + None => false, + } + } +} + +/// This allows for Option to always return true: i.e if the filter is None, then all items are allowed. +impl Filterable for Option +where + F: Filterable, +{ + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + match self { + Some(filter) => filter.is_valid(), + None => Ok(()), + } + } + + #[inline] + fn is_allowed(&self, item: &T) -> bool { + match self { + Some(filter) => filter.is_allowed(item), + None => true, + } + } + + #[inline] + fn is_allowed_opt(&self, item: &Option) -> bool { + match self { + Some(filter) => filter.is_allowed_opt(item), + None => true, + } + } +} + +impl Filterable for Option { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &String) -> bool { + match self { + Some(filter) => filter == item, + None => true, + } + } +} + +impl Filterable for Option { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &i32) -> bool { + match self { + Some(filter) => filter == item, + None => true, + } + } +} + +impl Filterable for Option { + #[inline] + fn validate_state(&self) -> Result<(), FilterError> { + Ok(()) + } + + #[inline] + fn is_allowed(&self, item: &bool) -> bool { + match self { + Some(filter) => filter == item, + None => true, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use anyhow::anyhow; + + #[derive(Debug, Serialize, PartialEq)] + pub struct InnerStruct { + pub a: Option, + } + + impl Filterable for InnerStruct { + fn validate_state(&self) -> Result<(), FilterError> { + Err(anyhow!("This is a test error!").into()) + } + + fn is_allowed(&self, _item: &InnerStruct) -> bool { + true + } + } + + #[derive(Debug, PartialEq, Serialize)] + pub struct OuterStruct { + pub inner: Option, + } + + impl Filterable for OuterStruct { + fn validate_state(&self) -> Result<(), FilterError> { + self.inner.is_valid()?; + Ok(()) + } + + fn is_allowed(&self, item: &InnerStruct) -> bool { + self.inner.is_allowed(item) + } + } + + #[test] + fn test_error_prop() { + let inner = InnerStruct { + a: Some("test".to_string()), + }; + let outer = OuterStruct { inner: Some(inner) }; + + let res = outer.is_valid(); + assert!(res.is_err()); + let error = res.unwrap_err(); + assert_eq!(error.to_string(), "Filter Error: This is a test error!\nTrace Path:\ntransaction_filter::traits::test::InnerStruct: {\"a\":\"test\"}\ncore::option::Option: {\"a\":\"test\"}\ntransaction_filter::traits::test::OuterStruct: {\"inner\":{\"a\":\"test\"}}"); + } +}