Skip to content

Commit

Permalink
feat: force snapshot under memory pressure
Browse files Browse the repository at this point in the history
- The main change is to detach wal and snapshot, in a way all 3 of the
  following things can happen
    - flush the wal buffer only (already handled, before this commit)
    - flush wal buffer and snapshot (already handled, before this commit)
    - snapshot without flushing wal buffer (introduced in this commit)
  This is achieved by introducing another method `snapshot` in
  `WalFileNotifier` trait. The main dependency between wal and snapshot
  is the `wal_file_number`, since this is tracked in `SnapshotTracker`
  separately we can switch to using `SnapshotTracker`'s
  `last_wal_file_number` instead of the one that comes through the
  `WalContents`.
- A higher level background loop is introduced that checks the overall
  table buffer size every `N` seconds and if it is greater than a
  threshold (`X`) then it calls `snapshot` method. Both `N` and `X` are
  configurable through cli. `N` defaults to 10s and `X` defaults to 70%
- Some refactoring of code so that existing methods can be reused when
  only snapshotting

closes: #25685
  • Loading branch information
praveen-influx committed Jan 2, 2025
1 parent 0cb00d8 commit d842d86
Show file tree
Hide file tree
Showing 9 changed files with 537 additions and 203 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ rand.workspace = true
secrecy.workspace = true
serde_json.workspace = true
sha2.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
Expand Down
39 changes: 36 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
write_buffer::{
check_mem_and_force_snapshot_loop, persisted_files::PersistedFiles, WriteBufferImpl,
WriteBufferImplArgs,
},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
Expand All @@ -36,7 +39,7 @@ use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{collections::HashMap, path::Path, str::FromStr, time::Duration};
use std::{num::NonZeroUsize, sync::Arc};
use thiserror::Error;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -293,6 +296,16 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
#[clap(
long = "force-snapshot-mem-threshold",
env = "INFLUXDB3_FORCE_SNAPSHOT_MEM_THRESHOLD",
default_value = "70%",
action
)]
pub force_snapshot_mem_threshold: MemorySize,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -445,7 +458,7 @@ pub async fn command(config: Config) -> Result<()> {

let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.host_identifier_prefix,
&config.host_identifier_prefix,
));
let wal_config = WalConfig {
gen1_duration: config.gen1_duration,
Expand Down Expand Up @@ -489,6 +502,13 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

background_buffer_checker(
config.force_snapshot_mem_threshold.bytes(),
&write_buffer_impl,
)
.await;

debug!("setting up telemetry store");
let telemetry_store = setup_telemetry_store(
&config.object_store_config,
catalog.instance_id(),
Expand Down Expand Up @@ -542,6 +562,19 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn background_buffer_checker(
mem_threshold_bytes: usize,
write_buffer_impl: &Arc<WriteBufferImpl>,
) {
debug!(mem_threshold_bytes, "setting up background buffer checker");
check_mem_and_force_snapshot_loop(
Arc::clone(write_buffer_impl),
mem_threshold_bytes,
Duration::from_secs(10),
)
.await;
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
Expand Down
67 changes: 49 additions & 18 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
use observability_deps::tracing::error;
use observability_deps::tracing::{debug, error};
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -97,8 +97,44 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;

/// Returns the snapshot info, if force snapshot is set it avoids checking
/// certain cases and returns snapshot info leaving only the last wal period
async fn snapshot_info(
&self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)>;

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);

async fn flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let cleanup_after_snapshot = self.flush_buffer().await;
self.cleanup_after_snapshot(cleanup_after_snapshot).await;
}

async fn cleanup_after_snapshot(
self: Arc<Self>,
cleanup_params: Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>,
) {
// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) = cleanup_params {
let arcd_wal = Arc::clone(&self);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

arcd_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
} else {
debug!("not flushed the buffer, no snapshot");
}
}
}

/// When the WAL persists a file with buffered ops, the contents are sent to this
Expand All @@ -116,6 +152,12 @@ pub trait WalFileNotifier: Debug + Send + Sync + 'static {
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

/// Snapshot only, currently used to force the snapshot
async fn snapshot(
&self,
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

fn as_any(&self) -> &dyn Any;
}

Expand Down Expand Up @@ -183,6 +225,10 @@ impl Gen1Duration {
self.0.as_nanos() as i64
}

pub fn new_10s() -> Self {
Self(Duration::from_secs(10))
}

pub fn new_1m() -> Self {
Self(Duration::from_secs(60))
}
Expand Down Expand Up @@ -890,23 +936,8 @@ pub fn background_wal_flush<W: Wal>(

loop {
interval.tick().await;

let cleanup_after_snapshot = wal.flush_buffer().await;

// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
}
let wal = Arc::clone(&wal);
wal.flush_buffer_and_cleanup_snapshot().await;
}
})
}
43 changes: 36 additions & 7 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl WalObjectStore {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;

match buffer.snapshot_tracker.snapshot() {
match buffer.snapshot_tracker.snapshot(false) {
None => None,
Some(info) => {
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
Expand Down Expand Up @@ -280,6 +280,8 @@ impl WalObjectStore {
}
}

debug!(snapshot_info = ?wal_contents.snapshot, ">>> snapshot info");

// now that we've persisted this latest notify and start the snapshot, if set
let snapshot_response = match wal_contents.snapshot {
Some(snapshot_details) => {
Expand Down Expand Up @@ -414,6 +416,14 @@ impl Wal for WalObjectStore {
.last_snapshot_sequence_number()
}

async fn snapshot_info(
&self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)> {
let mut buff = self.flush_buffer.lock().await;
buff.get_snapshot_info(force_snapshot).await
}

async fn shutdown(&self) {
self.shutdown().await
}
Expand Down Expand Up @@ -446,6 +456,22 @@ impl FlushBuffer {
self.snapshot_tracker.add_wal_period(wal_period);
}

async fn get_snapshot_info(
&mut self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)> {
let maybe_snapshot = self.snapshot_tracker.snapshot(force_snapshot);

match maybe_snapshot {
Some(snapshot_info) => {
debug!(?snapshot_info, ">>> snapshot info");

Some((snapshot_info, self.acquire_snapshot_permit().await))
}
None => None,
}
}

/// Converts the wal_buffer into contents and resets it. Returns the channels waiting for
/// responses. If a snapshot should occur with this flush, a semaphore permit is also returned.
async fn flush_buffer_into_contents_and_responses(
Expand All @@ -463,12 +489,11 @@ impl FlushBuffer {
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
});

let snapshot = match self.snapshot_tracker.snapshot() {
Some(snapshot_info) => {
wal_contents.snapshot = Some(snapshot_info.snapshot_details);

Some((snapshot_info, self.acquire_snapshot_permit().await))
}
let snapshot = match self.get_snapshot_info(false).await {
Some(info) => {
wal_contents.snapshot = Some(info.0.snapshot_details);
Some(info)
},
None => None,
};

Expand Down Expand Up @@ -1123,5 +1148,9 @@ mod tests {
fn as_any(&self) -> &dyn Any {
self
}

async fn snapshot(&self, _snapshot_details: SnapshotDetails) -> Receiver<SnapshotDetails> {
unimplemented!()
}
}
}
Loading

0 comments on commit d842d86

Please sign in to comment.