Skip to content

Commit

Permalink
feat: force snapshot under memory pressure
Browse files Browse the repository at this point in the history
- The main change is to detach wal and snapshot, in a way all 3 of the
  following things can happen
    - flush the wal buffer only (already handled, before this commit)
    - flush wal buffer and snapshot (already handled, before this commit)
    - snapshot without flushing wal buffer (introduced in this commit)
  This is achieved by introducing another method `snapshot` in
  `WalFileNotifier` trait. The main dependency between wal and snapshot
  is the `wal_file_number`, since this is tracked in `SnapshotTracker`
  separately we can switch to using `SnapshotTracker`'s
  `last_wal_file_number` instead of the one that comes through the
  `WalContents`.
- A higher level background loop is introduced that checks the overall
  table buffer size every `N` seconds and if it is greater than a
  threshold (`X`) then it calls `snapshot` method. Both `N` and `X` are
  configurable through cli. `N` defaults to 10s and `X` defaults to 70%
- Some refactoring of code so that existing methods can be reused when
  only snapshotting

closes: #25685
  • Loading branch information
praveen-influx committed Jan 2, 2025
1 parent 0cb00d8 commit 240d5a1
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 197 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ rand.workspace = true
secrecy.workspace = true
serde_json.workspace = true
sha2.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
Expand Down
48 changes: 46 additions & 2 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
write_buffer::{
check_mem_and_force_snapshot_loop, persisted_files::PersistedFiles, WriteBufferImpl,
WriteBufferImplArgs,
},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
Expand Down Expand Up @@ -293,6 +296,25 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
#[clap(
long = "force-snapshot-mem-threshold",
env = "INFLUXDB3_FORCE_SNAPSHOT_MEM_THRESHOLD",
default_value = "70%",
action
)]
pub force_snapshot_mem_threshold: MemorySize,

/// Interval to check buffer size (and compare with `force_snapshot_mem_threshold`)
#[clap(
long = "force-snapshot-interval",
env = "INFLUXDB3_FORCE_SNAPSHOT_INTERVAL",
default_value = "10s",
action
)]
pub force_snapshot_interval: humantime::Duration,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -445,7 +467,7 @@ pub async fn command(config: Config) -> Result<()> {

let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.host_identifier_prefix,
&config.host_identifier_prefix,
));
let wal_config = WalConfig {
gen1_duration: config.gen1_duration,
Expand Down Expand Up @@ -489,6 +511,14 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

background_buffer_checker(
config.force_snapshot_mem_threshold.bytes(),
config.force_snapshot_interval,
&write_buffer_impl,
)
.await;

debug!("setting up telemetry store");
let telemetry_store = setup_telemetry_store(
&config.object_store_config,
catalog.instance_id(),
Expand Down Expand Up @@ -542,6 +572,20 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn background_buffer_checker(
mem_threshold_bytes: usize,
check_interval: humantime::Duration,
write_buffer_impl: &Arc<WriteBufferImpl>,
) {
debug!(mem_threshold_bytes, "setting up background buffer checker");
check_mem_and_force_snapshot_loop(
Arc::clone(write_buffer_impl),
mem_threshold_bytes,
check_interval.into(),
)
.await;
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
Expand Down
71 changes: 53 additions & 18 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
use observability_deps::tracing::error;
use observability_deps::tracing::{debug, error};
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -97,8 +97,44 @@ pub trait Wal: Debug + Send + Sync + 'static {
/// Returns the last persisted wal file sequence number
async fn last_snapshot_sequence_number(&self) -> SnapshotSequenceNumber;

/// Returns the snapshot info, if force snapshot is set it avoids checking
/// certain cases and returns snapshot info leaving only the last wal period
async fn snapshot_info(
&self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)>;

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);

async fn flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let cleanup_after_snapshot = self.flush_buffer().await;
self.cleanup_after_snapshot(cleanup_after_snapshot).await;
}

async fn cleanup_after_snapshot(
self: Arc<Self>,
cleanup_params: Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>,
) {
// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) = cleanup_params {
let arcd_wal = Arc::clone(&self);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

arcd_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
} else {
debug!("not flushed the buffer, no snapshot");
}
}
}

/// When the WAL persists a file with buffered ops, the contents are sent to this
Expand All @@ -116,6 +152,12 @@ pub trait WalFileNotifier: Debug + Send + Sync + 'static {
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

/// Snapshot only, currently used to force the snapshot
async fn snapshot(
&self,
snapshot_details: SnapshotDetails,
) -> oneshot::Receiver<SnapshotDetails>;

fn as_any(&self) -> &dyn Any;
}

Expand Down Expand Up @@ -183,6 +225,10 @@ impl Gen1Duration {
self.0.as_nanos() as i64
}

pub fn new_10s() -> Self {
Self(Duration::from_secs(10))
}

pub fn new_1m() -> Self {
Self(Duration::from_secs(60))
}
Expand Down Expand Up @@ -809,6 +855,10 @@ impl WalContents {
pub fn is_empty(&self) -> bool {
self.ops.is_empty() && self.snapshot.is_none()
}

pub fn is_default(&self) -> bool {
self.min_timestamp_ns == i64::MAX && self.max_timestamp_ns == i64::MIN
}
}

#[derive(
Expand Down Expand Up @@ -890,23 +940,8 @@ pub fn background_wal_flush<W: Wal>(

loop {
interval.tick().await;

let cleanup_after_snapshot = wal.flush_buffer().await;

// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
}
let wal = Arc::clone(&wal);
wal.flush_buffer_and_cleanup_snapshot().await;
}
})
}
30 changes: 27 additions & 3 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl WalObjectStore {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;

match buffer.snapshot_tracker.snapshot() {
match buffer.snapshot_tracker.snapshot(false) {
None => None,
Some(info) => {
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
Expand Down Expand Up @@ -280,6 +280,8 @@ impl WalObjectStore {
}
}

debug!(snapshot_info = ?wal_contents.snapshot, ">>> snapshot info");

// now that we've persisted this latest notify and start the snapshot, if set
let snapshot_response = match wal_contents.snapshot {
Some(snapshot_details) => {
Expand Down Expand Up @@ -414,6 +416,24 @@ impl Wal for WalObjectStore {
.last_snapshot_sequence_number()
}

async fn snapshot_info(
&self,
force_snapshot: bool,
) -> Option<(SnapshotInfo, OwnedSemaphorePermit)> {
let mut buff = self.flush_buffer.lock().await;

let maybe_snapshot = buff.snapshot_tracker.snapshot(force_snapshot);

match maybe_snapshot {
Some(snapshot_info) => {
debug!(?snapshot_info, ">>> snapshot info");

Some((snapshot_info, buff.acquire_snapshot_permit().await))
}
None => None,
}
}

async fn shutdown(&self) {
self.shutdown().await
}
Expand Down Expand Up @@ -462,9 +482,9 @@ impl FlushBuffer {
min_time: Timestamp::new(wal_contents.min_timestamp_ns),
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
});

let snapshot = match self.snapshot_tracker.snapshot() {
let snapshot = match self.snapshot_tracker.snapshot(false) {
Some(snapshot_info) => {
debug!(?snapshot_info, ">>> snapshot info");
wal_contents.snapshot = Some(snapshot_info.snapshot_details);

Some((snapshot_info, self.acquire_snapshot_permit().await))
Expand Down Expand Up @@ -1123,5 +1143,9 @@ mod tests {
fn as_any(&self) -> &dyn Any {
self
}

async fn snapshot(&self, _snapshot_details: SnapshotDetails) -> Receiver<SnapshotDetails> {
unimplemented!()
}
}
}
Loading

0 comments on commit 240d5a1

Please sign in to comment.