Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
Expand Down Expand Up @@ -72,6 +73,46 @@ const INDEX_TYPE: &str = "index";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);

#[derive(Debug)]
pub(crate) struct RangeResultMemoryLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
permit_bytes: usize,
}

impl Default for RangeResultMemoryLimiter {
fn default() -> Self {
Self::new(
RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)
}
}

impl RangeResultMemoryLimiter {
pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
let permit_bytes = permit_bytes.max(1);
let permits = limit_bytes.div_ceil(permit_bytes).max(1);
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
permit_bytes,
}
}

pub(crate) fn permit_bytes(&self) -> usize {
self.permit_bytes
}

pub(crate) async fn acquire(
&self,
bytes: usize,
) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
self.semaphore.acquire_many(permits).await
}
}

/// Cached SST metadata combines the parquet footer with the decoded region metadata.
///
Expand Down Expand Up @@ -373,6 +414,23 @@ impl CacheStrategy {
}
}

/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
}
}

pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
Some(cache_manager.range_result_memory_limiter())
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
Expand Down Expand Up @@ -476,6 +534,8 @@ pub struct CacheManager {
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
range_result_cache: Option<RangeResultCache>,
/// Shared memory limiter for async range-result cache tasks.
range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
}
Expand Down Expand Up @@ -735,6 +795,15 @@ impl CacheManager {
}
}

/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
self.range_result_cache.is_some()
}

pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
&self.range_result_memory_limiter
}

/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
Expand Down Expand Up @@ -969,6 +1038,7 @@ impl CacheManagerBuilder {
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
index_result_cache,
}
}
Expand Down
84 changes: 48 additions & 36 deletions src/mito2/src/engine/scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType};
use futures::TryStreamExt;
use store_api::region_engine::{PrepareRequest, RegionEngine, RegionScanner};
use store_api::region_request::RegionRequest;
Expand Down Expand Up @@ -472,43 +476,51 @@ async fn test_series_scan_flat() {
}
}

let mut check_result = |expected| {
let batches =
RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
};
let schema = schema.unwrap();
let mut series_to_partition = BTreeMap::new();
let mut actual_rows = Vec::new();

for (partition, batches) in partition_batches.into_iter().enumerate() {
let batches = RecordBatches::try_new(schema.clone(), batches).unwrap();
let mut partition_series = Vec::new();

for batch in batches.iter() {
let tags = batch.column_by_name("tag_0").unwrap().as_string::<i32>();
let fields = batch
.column_by_name("field_0")
.unwrap()
.as_primitive::<Float64Type>();
let ts = batch
.column_by_name("ts")
.unwrap()
.as_primitive::<TimestampMillisecondType>();

for row in 0..batch.num_rows() {
let tag = tags.value(row).to_string();
let field = fields.value(row);
let ts = ts.value(row);
partition_series.push(tag.clone());
actual_rows.push((tag, field.to_bits(), ts));
}
}

// Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 7200 | 7200.0 | 1970-01-01T02:00:00 |
| 7201 | 7201.0 | 1970-01-01T02:00:01 |
| 7202 | 7202.0 | 1970-01-01T02:00:02 |
+-------+---------+---------------------+";
check_result(expected);
partition_series.sort();
partition_series.dedup();
for tag in partition_series {
let prev = series_to_partition.insert(tag.clone(), partition);
assert_eq!(
None, prev,
"series {tag} appears in multiple partitions: {prev:?} and {partition}"
);
}
}

let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 3600 | 3600.0 | 1970-01-01T01:00:00 |
| 3601 | 3601.0 | 1970-01-01T01:00:01 |
| 3602 | 3602.0 | 1970-01-01T01:00:02 |
+-------+---------+---------------------+";
check_result(expected);
let mut expected_rows = Vec::new();
for value in [0_i64, 1, 2, 3, 4, 5, 3600, 3601, 3602, 7200, 7201, 7202] {
expected_rows.push((value.to_string(), (value as f64).to_bits(), value * 1000));
}

let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
check_result(expected);
actual_rows.sort();
expected_rows.sort();
assert_eq!(expected_rows, actual_rows);
}
Loading
Loading