Skip to content

Commit 153133b

Browse files
authored
[Data Service] Implement simple upstream transaction filtering (#13699)
There are two types of transaction filtering we will support in the future: 1. Per stream configuration: The downstream declares what txns they want to receive. 2. Global configuration: At the data service level we refuse to include full txns for all streams. This PR implements the second of these, using @CapCap's work here: aptos-labs/aptos-indexer-processors#398. Rather than not sending txns at all if they match the blocklist filters, we just omit the writesets and events. Not sending the txns entirely would cause issues with processors, which today assume that they will receive all txns.
1 parent 43d709c commit 153133b

File tree

5 files changed

+162
-63
lines changed

5 files changed

+162
-63
lines changed

Cargo.lock

Lines changed: 7 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ aptos-indexer-grpc-utils = { workspace = true }
1919
aptos-metrics-core = { workspace = true }
2020
aptos-moving-average = { workspace = true }
2121
aptos-protos = { workspace = true }
22+
aptos-transaction-filter = { workspace = true }
2223
async-trait = { workspace = true }
2324
clap = { workspace = true }
2425
futures = { workspace = true }

ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ use aptos_protos::{
1313
transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET,
1414
util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET,
1515
};
16+
use aptos_transaction_filter::BooleanTransactionFilter;
1617
use serde::{Deserialize, Serialize};
17-
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
18+
use std::{net::SocketAddr, sync::Arc};
1819
use tonic::{codec::CompressionEncoding, transport::Server};
1920

2021
pub const SERVER_NAME: &str = "idxdatasvc";
@@ -69,9 +70,18 @@ pub struct IndexerGrpcDataServiceConfig {
6970
pub enable_cache_compression: bool,
7071
#[serde(default)]
7172
pub in_memory_cache_config: InMemoryCacheConfig,
72-
/// Sender addresses to ignore. Transactions from these addresses will not be indexed.
73-
#[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")]
74-
pub sender_addresses_to_ignore: Vec<String>,
73+
/// Any transaction that matches this filter will be stripped. This means we remove
74+
/// the payload, signature, events, and writesets from it before sending it
75+
/// downstream. This should only be used in an emergency situation, e.g. when txns
76+
/// related to a certain module are too large and are causing issues for the data
77+
/// service. Learn more here:
78+
///
79+
/// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85
80+
///
81+
/// Generally you will want to start with this with an OR, and then list out
82+
/// separate filters that describe each type of txn we want to strip.
83+
#[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")]
84+
pub txns_to_strip_filter: BooleanTransactionFilter,
7585
}
7686

7787
impl IndexerGrpcDataServiceConfig {
@@ -84,7 +94,7 @@ impl IndexerGrpcDataServiceConfig {
8494
redis_read_replica_address: RedisUrl,
8595
enable_cache_compression: bool,
8696
in_memory_cache_config: InMemoryCacheConfig,
87-
sender_addresses_to_ignore: Vec<String>,
97+
txns_to_strip_filter: BooleanTransactionFilter,
8898
) -> Self {
8999
Self {
90100
data_service_grpc_tls_config,
@@ -97,7 +107,7 @@ impl IndexerGrpcDataServiceConfig {
97107
redis_read_replica_address,
98108
enable_cache_compression,
99109
in_memory_cache_config,
100-
sender_addresses_to_ignore,
110+
txns_to_strip_filter,
101111
}
102112
}
103113

@@ -109,8 +119,9 @@ impl IndexerGrpcDataServiceConfig {
109119
false
110120
}
111121

112-
pub const fn default_sender_addresses_to_ignore() -> Vec<String> {
113-
vec![]
122+
pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter {
123+
// This filter matches no txns.
124+
BooleanTransactionFilter::new_or(vec![])
114125
}
115126
}
116127

@@ -170,10 +181,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig {
170181
self.redis_read_replica_address.clone(),
171182
self.file_store_config.clone(),
172183
self.data_service_response_channel_size,
173-
self.sender_addresses_to_ignore
174-
.clone()
175-
.into_iter()
176-
.collect::<HashSet<_>>(),
184+
self.txns_to_strip_filter.clone(),
177185
cache_storage_format,
178186
Arc::new(in_memory_cache),
179187
)?;

ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,58 @@ pub static SHORT_CONNECTION_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
100100
.unwrap()
101101
});
102102

103-
/// Count of bytes transfered to the client. This only represents the bytes prepared and ready
104-
/// to send to the client. It does not represent the bytes actually sent to the client.
103+
/// Count of bytes transfered to the client. This only represents the bytes prepared and
104+
/// ready to send to the client. This only t It does not represent the bytes actually
105+
/// sent to the client.
106+
///
107+
/// This is pre stripping, so it may include bytes for transactions that were later
108+
/// stripped. See BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING for post
109+
/// stirpping.
105110
pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy<IntCounterVec> = Lazy::new(|| {
106111
register_int_counter_vec!(
107112
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server",
108-
"Count of bytes ready to transfer to the client",
113+
"Count of bytes ready to transfer to the client (pre stripping)",
114+
&[
115+
"identifier_type",
116+
"identifier",
117+
"email",
118+
"application_name",
119+
"processor"
120+
],
121+
)
122+
.unwrap()
123+
});
124+
125+
/// Count of bytes transfered to the client. This only represents the bytes prepared and
126+
/// ready to send to the client. This only t It does not represent the bytes actually
127+
/// sent to the client.
128+
///
129+
/// This is post stripping, meaning some transactions may have been stripped (removing
130+
/// things such as events, writesets, payload, signature). Compare this with
131+
/// BYTES_READY_TO_TRANSFER_FROM_SERVER to see how many bytes were stripped.
132+
pub static BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING: Lazy<IntCounterVec> =
133+
Lazy::new(|| {
134+
register_int_counter_vec!(
135+
"indexer_grpc_data_service_bytes_ready_to_transfer_from_server_after_stripping",
136+
"Count of bytes ready to transfer to the client (post stripping)",
137+
&[
138+
"identifier_type",
139+
"identifier",
140+
"email",
141+
"application_name",
142+
"processor"
143+
],
144+
)
145+
.unwrap()
146+
});
147+
148+
/// The number of transactions that had data (such as events, writesets, payload,
149+
/// signature) stripped from them due to the `txns_to_strip_filter`. See
150+
/// `strip_transactions` for more.
151+
pub static NUM_TRANSACTIONS_STRIPPED: Lazy<IntCounterVec> = Lazy::new(|| {
152+
register_int_counter_vec!(
153+
"indexer_grpc_data_service_num_transactions_stripped",
154+
"Number of transactions that had data (such as events, writesets, payload, signature) stripped from them",
109155
&[
110156
"identifier_type",
111157
"identifier",

0 commit comments

Comments
 (0)