Skip to content

Commit dd22f7a

Browse files
committed
[Data Service] Implement simple upstream transaction filtering
There are two types of transaction filtering we will support in the future: 1. Per stream configuration: The downstream declares what txns they want to receive. 2. Global configuration: At the data service level we refuse to include full txns for all streams. This PR implements the second of these, using @CapCap's work here: aptos-labs/aptos-indexer-processors#398. Rather than not sending txns at all if they match the blocklist filters, we just omit the writesets and events. Not sending the txns entirely would cause issues with processors, which today assume that they will receive all txns.
1 parent c51f147 commit dd22f7a

File tree

4 files changed

+75
-47
lines changed

4 files changed

+75
-47
lines changed

Cargo.lock

Lines changed: 7 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ aptos-indexer-grpc-utils = { workspace = true }
1919
aptos-metrics-core = { workspace = true }
2020
aptos-moving-average = { workspace = true }
2121
aptos-protos = { workspace = true }
22+
aptos-transaction-filter = { workspace = true }
2223
async-trait = { workspace = true }
2324
clap = { workspace = true }
2425
futures = { workspace = true }

ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ use aptos_protos::{
1313
transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET,
1414
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
1515
};
16+
use aptos_transaction_filter::BooleanTransactionFilter;
1617
use serde::{Deserialize, Serialize};
17-
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
18+
use std::{net::SocketAddr, sync::Arc};
1819
use tonic::{codec::CompressionEncoding, transport::Server};
1920

2021
pub const SERVER_NAME: &str = "idxdatasvc";
@@ -69,9 +70,18 @@ pub struct IndexerGrpcDataServiceConfig {
6970
pub enable_cache_compression: bool,
7071
#[serde(default)]
7172
pub in_memory_cache_config: InMemoryCacheConfig,
72-
/// Sender addresses to ignore. Transactions from these addresses will not be indexed.
73-
#[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")]
74-
pub sender_addresses_to_ignore: Vec<String>,
73+
/// Any transaction that matches this filter will be stripped. This means we remove
74+
/// the payload, signature, events, and writesets from it before sending it
75+
/// downstream. This should only be used in an emergency situation, e.g. when txns
76+
/// related to a certain module are too large and are causing issues for the data
77+
/// service. Learn more here:
78+
///
79+
/// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85
80+
///
81+
/// Generally you will want to start with this with an OR, and then list out
82+
/// separate filters that describe each type of txn we want to strip.
83+
#[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")]
84+
pub txns_to_strip_filter: BooleanTransactionFilter,
7585
}
7686

7787
impl IndexerGrpcDataServiceConfig {
@@ -84,7 +94,7 @@ impl IndexerGrpcDataServiceConfig {
8494
redis_read_replica_address: RedisUrl,
8595
enable_cache_compression: bool,
8696
in_memory_cache_config: InMemoryCacheConfig,
87-
sender_addresses_to_ignore: Vec<String>,
97+
txns_to_strip_filter: BooleanTransactionFilter,
8898
) -> Self {
8999
Self {
90100
data_service_grpc_tls_config,
@@ -97,7 +107,7 @@ impl IndexerGrpcDataServiceConfig {
97107
redis_read_replica_address,
98108
enable_cache_compression,
99109
in_memory_cache_config,
100-
sender_addresses_to_ignore,
110+
txns_to_strip_filter,
101111
}
102112
}
103113

@@ -109,8 +119,9 @@ impl IndexerGrpcDataServiceConfig {
109119
false
110120
}
111121

112-
pub const fn default_sender_addresses_to_ignore() -> Vec<String> {
113-
vec![]
122+
pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter {
123+
// This filter matches no txns.
124+
BooleanTransactionFilter::new_or(vec![])
114125
}
115126
}
116127

@@ -170,10 +181,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
170181
self.redis_read_replica_address.clone(),
171182
self.file_store_config.clone(),
172183
self.data_service_response_channel_size,
173-
self.sender_addresses_to_ignore
174-
.clone()
175-
.into_iter()
176-
.collect::<HashSet<_>>(),
184+
self.txns_to_strip_filter.clone(),
177185
cache_storage_format,
178186
Arc::new(in_memory_cache),
179187
)?;

ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs

Lines changed: 47 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use aptos_protos::{
2828
indexer::v1::{raw_data_server::RawData, GetTransactionsRequest, TransactionsResponse},
2929
transaction::v1::{transaction::TxnData, Transaction},
3030
};
31+
use aptos_transaction_filter::{BooleanTransactionFilter, Filterable};
3132
use futures::Stream;
3233
use prost::Message;
3334
use redis::Client;
3435
use std::{
35-
collections::{HashMap, HashSet},
36+
collections::HashMap,
3637
pin::Pin,
3738
str::FromStr,
3839
sync::Arc,
@@ -77,7 +78,7 @@ pub struct RawDataServerWrapper {
7778
pub redis_client: Arc<redis::Client>,
7879
pub file_store_config: IndexerGrpcFileStoreConfig,
7980
pub data_service_response_channel_size: usize,
80-
pub sender_addresses_to_ignore: HashSet<String>,
81+
pub txns_to_strip_filter: BooleanTransactionFilter,
8182
pub cache_storage_format: StorageFormat,
8283
in_memory_cache: Arc<InMemoryCache>,
8384
}
@@ -92,10 +93,7 @@ impl std::fmt::Debug for RawDataServerWrapper {
9293
"data_service_response_channel_size",
9394
&self.data_service_response_channel_size,
9495
)
95-
.field(
96-
"sender_addresses_to_ignore",
97-
&self.sender_addresses_to_ignore,
98-
)
96+
.field("txns_to_strip_filter", &self.txns_to_strip_filter)
9997
.field("cache_storage_format", &self.cache_storage_format)
10098
.finish()
10199
}
@@ -106,7 +104,7 @@ impl RawDataServerWrapper {
106104
redis_address: RedisUrl,
107105
file_store_config: IndexerGrpcFileStoreConfig,
108106
data_service_response_channel_size: usize,
109-
sender_addresses_to_ignore: HashSet<String>,
107+
txns_to_strip_filter: BooleanTransactionFilter,
110108
cache_storage_format: StorageFormat,
111109
in_memory_cache: Arc<InMemoryCache>,
112110
) -> anyhow::Result<Self> {
@@ -118,7 +116,7 @@ impl RawDataServerWrapper {
118116
),
119117
file_store_config,
120118
data_service_response_channel_size,
121-
sender_addresses_to_ignore,
119+
txns_to_strip_filter,
122120
cache_storage_format,
123121
in_memory_cache,
124122
})
@@ -194,7 +192,7 @@ impl RawData for RawDataServerWrapper {
194192
let redis_client = self.redis_client.clone();
195193
let cache_storage_format = self.cache_storage_format;
196194
let request_metadata = Arc::new(request_metadata);
197-
let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone();
195+
let txns_to_strip_filter = self.txns_to_strip_filter.clone();
198196
let in_memory_cache = self.in_memory_cache.clone();
199197
tokio::spawn({
200198
let request_metadata = request_metadata.clone();
@@ -206,7 +204,7 @@ impl RawData for RawDataServerWrapper {
206204
request_metadata,
207205
transactions_count,
208206
tx,
209-
sender_addresses_to_ignore,
207+
txns_to_strip_filter,
210208
current_version,
211209
in_memory_cache,
212210
)
@@ -394,7 +392,7 @@ async fn data_fetcher_task(
394392
request_metadata: Arc<IndexerGrpcRequestMetadata>,
395393
transactions_count: Option<u64>,
396394
tx: tokio::sync::mpsc::Sender<Result<TransactionsResponse, Status>>,
397-
sender_addresses_to_ignore: HashSet<String>,
395+
txns_to_strip_filter: BooleanTransactionFilter,
398396
mut current_version: u64,
399397
in_memory_cache: Arc<InMemoryCache>,
400398
) {
@@ -532,7 +530,7 @@ async fn data_fetcher_task(
532530
let resp_items = get_transactions_responses_builder(
533531
transaction_data,
534532
chain_id as u32,
535-
&sender_addresses_to_ignore,
533+
&txns_to_strip_filter,
536534
);
537535
let data_latency_in_secs = resp_items
538536
.last()
@@ -676,11 +674,10 @@ fn ensure_sequential_transactions(mut batches: Vec<Vec<Transaction>>) -> Vec<Tra
676674
fn get_transactions_responses_builder(
677675
transactions: Vec<Transaction>,
678676
chain_id: u32,
679-
sender_addresses_to_ignore: &HashSet<String>,
677+
txns_to_strip_filter: &BooleanTransactionFilter,
680678
) -> Vec<TransactionsResponse> {
681-
let filtered_transactions =
682-
filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore);
683-
let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT);
679+
let stripped_transactions = strip_transactions(transactions, txns_to_strip_filter);
680+
let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT);
684681
chunks
685682
.into_iter()
686683
.map(|chunk| TransactionsResponse {
@@ -956,21 +953,28 @@ async fn channel_send_multiple_with_timeout(
956953
Ok(())
957954
}
958955

959-
fn filter_transactions_for_sender_addresses(
956+
/// This function strips transactions that match the given filter. Stripping means we
957+
/// remove the payload, signature, events, and writesets. Note, the filter can be
958+
/// composed of many conditions, see `BooleanTransactionFilter` for more.
959+
fn strip_transactions(
960960
transactions: Vec<Transaction>,
961-
sender_addresses_to_ignore: &HashSet<String>,
961+
txns_to_strip_filter: &BooleanTransactionFilter,
962962
) -> Vec<Transaction> {
963963
transactions
964964
.into_iter()
965965
.map(|mut txn| {
966-
if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() {
967-
if let Some(utr) = user_transaction.request.as_mut() {
968-
if sender_addresses_to_ignore.contains(&utr.sender) {
966+
// Note: `is_allowed` means the txn is matches the filter, in which case
967+
// we strip it.
968+
if txns_to_strip_filter.is_allowed(&txn) {
969+
if let Some(info) = txn.info.as_mut() {
970+
info.changes = vec![];
971+
}
972+
if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() {
973+
user_transaction.events = vec![];
974+
if let Some(utr) = user_transaction.request.as_mut() {
969975
// Wipe the payload and signature.
970976
utr.payload = None;
971977
utr.signature = None;
972-
user_transaction.events = vec![];
973-
txn.info.as_mut().unwrap().changes = vec![];
974978
}
975979
}
976980
}
@@ -981,12 +985,14 @@ fn filter_transactions_for_sender_addresses(
981985

982986
#[cfg(test)]
983987
mod tests {
984-
use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses};
988+
use super::*;
985989
use aptos_protos::transaction::v1::{
986990
transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload,
987991
UserTransaction, UserTransactionRequest, WriteSetChange,
988992
};
989-
use std::collections::HashSet;
993+
use aptos_transaction_filter::{
994+
boolean_transaction_filter::APIFilter, filters::UserTransactionFilterBuilder,
995+
};
990996

991997
#[test]
992998
fn test_ensure_sequential_transactions_merges_and_sorts() {
@@ -1034,7 +1040,7 @@ mod tests {
10341040
}
10351041

10361042
#[test]
1037-
fn test_transactions_are_filter_correctly() {
1043+
fn test_transactions_are_stripped_correctly_sender_addresses() {
10381044
let sender_address = "0x1234".to_string();
10391045
// Create a transaction with a user transaction
10401046
let txn = Transaction {
@@ -1054,10 +1060,22 @@ mod tests {
10541060
}),
10551061
..Default::default()
10561062
};
1057-
// create ignore list.
1058-
let ignore_hash_set: HashSet<String> = vec![sender_address].into_iter().collect();
10591063

1060-
let filtered_txn = filter_transactions_for_sender_addresses(vec![txn], &ignore_hash_set);
1064+
// Create filter for senders to ignore.
1065+
let sender_filters = vec![sender_address]
1066+
.into_iter()
1067+
.map(|address| {
1068+
BooleanTransactionFilter::from(APIFilter::UserTransactionFilter(
1069+
UserTransactionFilterBuilder::default()
1070+
.sender(address)
1071+
.build()
1072+
.unwrap(),
1073+
))
1074+
})
1075+
.collect();
1076+
let filter = BooleanTransactionFilter::new_or(sender_filters);
1077+
1078+
let filtered_txn = strip_transactions(vec![txn], &filter);
10611079
assert_eq!(filtered_txn.len(), 1);
10621080
let txn = filtered_txn.first().unwrap();
10631081
let user_transaction = match &txn.txn_data {

0 commit comments

Comments
 (0)