Skip to content
70 changes: 70 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
Expand Down Expand Up @@ -72,6 +73,46 @@ const INDEX_TYPE: &str = "index";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);

#[derive(Debug)]
pub(crate) struct RangeResultMemoryLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
permit_bytes: usize,
}

impl Default for RangeResultMemoryLimiter {
fn default() -> Self {
Self::new(
RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)
}
}

impl RangeResultMemoryLimiter {
pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
let permit_bytes = permit_bytes.max(1);
let permits = limit_bytes.div_ceil(permit_bytes).max(1);
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
permit_bytes,
}
}

pub(crate) fn permit_bytes(&self) -> usize {
self.permit_bytes
}

pub(crate) async fn acquire(
&self,
bytes: usize,
) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
self.semaphore.acquire_many(permits).await
}
}

/// Cached SST metadata combines the parquet footer with the decoded region metadata.
///
Expand Down Expand Up @@ -373,6 +414,23 @@ impl CacheStrategy {
}
}

/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
}
}

pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
Some(cache_manager.range_result_memory_limiter())
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
Expand Down Expand Up @@ -476,6 +534,8 @@ pub struct CacheManager {
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
range_result_cache: Option<RangeResultCache>,
/// Shared memory limiter for async range-result cache tasks.
range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
}
Expand Down Expand Up @@ -735,6 +795,15 @@ impl CacheManager {
}
}

/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
self.range_result_cache.is_some()
}

pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
&self.range_result_memory_limiter
}

/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
Expand Down Expand Up @@ -969,6 +1038,7 @@ impl CacheManagerBuilder {
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
index_result_cache,
}
}
Expand Down
96 changes: 96 additions & 0 deletions src/mito2/src/engine/scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,99 @@ fn collect_and_assert_partition_rows(
actual_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
actual_rows
}

/// Tests series scan with multiple partition ranges (each with multiple overlapping sources)
/// and small semaphore permits (controlled by num_partitions).
#[tokio::test]
async fn test_series_scan_flat_small_permits() {
let mut env = TestEnv::with_prefix("test_series_scan_small_permits").await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
..Default::default()
})
.await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.time_window", "1h")
.build();
let column_schemas = test_util::rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// Create overlapping SSTs in each time window so partition ranges have multiple sources.
let put_flush_rows = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// Window 0 (0s-999s): 3 overlapping SSTs
put_flush_rows(0, 3).await;
put_flush_rows(1, 5).await;
put_flush_rows(3, 7).await;
// Window 1 (3600s-4599s): 2 overlapping SSTs
put_flush_rows(3600, 3603).await;
put_flush_rows(3601, 3605).await;
// Window 2 (7200s-8199s): 2 overlapping SSTs
put_flush_rows(7200, 7203).await;
put_flush_rows(7201, 7204).await;

let mut expected_rows = Vec::new();
for value in [
0_i64, 1, 2, 3, 4, 5, 6, 3600, 3601, 3602, 3603, 3604, 7200, 7201, 7202, 7203,
] {
expected_rows.push((value.to_string(), value as f64, value * 1000));
}
expected_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));

// Test with different semaphore sizes (num_partitions controls Semaphore::new(num_partitions)).
for num_partitions in [1, 2] {
let request = ScanRequest {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(mut scanner) = scanner else {
panic!("Scanner should be series scan");
};

// Collect all partition ranges and redistribute into `num_partitions` partitions.
let raw_ranges: Vec<_> = scanner
.properties()
.partitions
.iter()
.flatten()
.cloned()
.collect();
assert!(
raw_ranges.len() >= 3,
"expected at least 3 partition ranges, got {}",
raw_ranges.len()
);

let mut new_ranges = vec![vec![]; num_partitions];
for (i, range) in raw_ranges.into_iter().enumerate() {
new_ranges[i % num_partitions].push(range);
}
scanner
.prepare(PrepareRequest {
ranges: Some(new_ranges),
..Default::default()
})
.unwrap();

let actual_rows = collect_partition_rows_round_robin(&scanner, num_partitions).await;
assert_eq!(
expected_rows, actual_rows,
"mismatch with num_partitions={num_partitions}"
);
}
}
Loading
Loading