Skip to content

Commit

Permalink
feat: force snapshot under memory pressure
Browse files Browse the repository at this point in the history
closes: #25685
  • Loading branch information
praveen-influx committed Dec 31, 2024
1 parent 0cb00d8 commit a727836
Show file tree
Hide file tree
Showing 8 changed files with 488 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ rand.workspace = true
secrecy.workspace = true
serde_json.workspace = true
sha2.workspace = true
sysinfo.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
Expand Down
48 changes: 46 additions & 2 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs},
write_buffer::{
check_mem_and_force_snapshot_loop, persisted_files::PersistedFiles, WriteBufferImpl,
WriteBufferImplArgs,
},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
Expand Down Expand Up @@ -293,6 +296,25 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// Threshold for internal buffer, can be either percentage or absolute value.
/// eg: 70% or 100000
#[clap(
long = "force-snapshot-mem-threshold",
env = "INFLUXDB3_FORCE_SNAPSHOT_MEM_THRESHOLD",
default_value = "70%",
action
)]
pub force_snapshot_mem_threshold: MemorySize,

/// Interval to check buffer size (and compare with `force_snapshot_mem_threshold`)
#[clap(
long = "force-snapshot-interval",
env = "INFLUXDB3_FORCE_SNAPSHOT_INTERVAL",
default_value = "10s",
action
)]
pub force_snapshot_interval: humantime::Duration,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -445,7 +467,7 @@ pub async fn command(config: Config) -> Result<()> {

let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.host_identifier_prefix,
&config.host_identifier_prefix,
));
let wal_config = WalConfig {
gen1_duration: config.gen1_duration,
Expand Down Expand Up @@ -489,6 +511,14 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;

background_buffer_checker(
config.force_snapshot_mem_threshold.bytes(),
config.force_snapshot_interval,
&write_buffer_impl,
)
.await;

debug!("setting up telemetry store");
let telemetry_store = setup_telemetry_store(
&config.object_store_config,
catalog.instance_id(),
Expand Down Expand Up @@ -542,6 +572,20 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn background_buffer_checker(
mem_threshold_bytes: usize,
check_interval: humantime::Duration,
write_buffer_impl: &Arc<WriteBufferImpl>,
) {
debug!(mem_threshold_bytes, "setting up background buffer checker");
check_mem_and_force_snapshot_loop(
Arc::clone(write_buffer_impl),
mem_threshold_bytes,
check_interval.into(),
)
.await;
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
Expand Down
72 changes: 54 additions & 18 deletions influxdb3_wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb_line_protocol::v3::SeriesValue;
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
use observability_deps::tracing::error;
use observability_deps::tracing::{debug, error};
use schema::{InfluxColumnType, InfluxFieldType};
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
Expand Down Expand Up @@ -84,6 +84,15 @@ pub trait Wal: Debug + Send + Sync + 'static {
OwnedSemaphorePermit,
)>;

/// Forces the flush buffer
async fn force_flush_buffer(
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>;

/// Removes any snapshot wal files
async fn cleanup_snapshot(
&self,
Expand All @@ -99,6 +108,40 @@ pub trait Wal: Debug + Send + Sync + 'static {

/// Stop all writes to the WAL and flush the buffer to a WAL file.
async fn shutdown(&self);

async fn flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let cleanup_after_snapshot = self.flush_buffer().await;
self.cleanup_after_snapshot(cleanup_after_snapshot).await;
}

async fn force_flush_buffer_and_cleanup_snapshot(self: Arc<Self>) {
let cleanup_after_snapshot = self.force_flush_buffer().await;
self.cleanup_after_snapshot(cleanup_after_snapshot).await;
}

async fn cleanup_after_snapshot(
self: Arc<Self>,
cleanup_params: Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)>,
) {
// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) = cleanup_params {
let arcd_wal = Arc::clone(&self);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

arcd_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
} else {
debug!("not flushed the buffer, no snapshot");
}
}
}

/// When the WAL persists a file with buffered ops, the contents are sent to this
Expand Down Expand Up @@ -183,6 +226,10 @@ impl Gen1Duration {
self.0.as_nanos() as i64
}

pub fn new_10s() -> Self {
Self(Duration::from_secs(10))
}

pub fn new_1m() -> Self {
Self(Duration::from_secs(60))
}
Expand Down Expand Up @@ -809,6 +856,10 @@ impl WalContents {
pub fn is_empty(&self) -> bool {
self.ops.is_empty() && self.snapshot.is_none()
}

pub fn is_default(&self) -> bool {
self.min_timestamp_ns == i64::MAX && self.max_timestamp_ns == i64::MIN
}
}

#[derive(
Expand Down Expand Up @@ -890,23 +941,8 @@ pub fn background_wal_flush<W: Wal>(

loop {
interval.tick().await;

let cleanup_after_snapshot = wal.flush_buffer().await;

// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);

snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
}
let wal = Arc::clone(&wal);
wal.flush_buffer_and_cleanup_snapshot().await;
}
})
}
52 changes: 36 additions & 16 deletions influxdb3_wal/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl WalObjectStore {
let snapshot_info = {
let mut buffer = self.flush_buffer.lock().await;

match buffer.snapshot_tracker.snapshot() {
match buffer.snapshot_tracker.snapshot(false) {
None => None,
Some(info) => {
let semaphore = Arc::clone(&buffer.snapshot_semaphore);
Expand Down Expand Up @@ -178,7 +178,9 @@ impl WalObjectStore {
self.flush_buffer.lock().await.wal_buffer.is_shutdown = true;

// do the flush and wait for the snapshot if that's running
if let Some((snapshot_done, snapshot_info, snapshot_permit)) = self.flush_buffer().await {
if let Some((snapshot_done, snapshot_info, snapshot_permit)) =
self.flush_buffer(false).await
{
let snapshot_details = snapshot_done.await.expect("snapshot should complete");
assert_eq!(snapshot_info.snapshot_details, snapshot_details);
self.remove_snapshot_wal_files(snapshot_info, snapshot_permit)
Expand Down Expand Up @@ -216,18 +218,20 @@ impl WalObjectStore {

async fn flush_buffer(
&self,
force_snapshot: bool,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)> {
let (wal_contents, responses, snapshot) = {
let mut flush_buffer = self.flush_buffer.lock().await;
if flush_buffer.wal_buffer.is_empty() {
debug!(wal_buffer_empty = ?flush_buffer.wal_buffer.is_empty(), ">>> flushing buffer");
if !force_snapshot && flush_buffer.wal_buffer.is_empty() {
return None;
}
flush_buffer
.flush_buffer_into_contents_and_responses()
.flush_buffer_into_contents_and_responses(force_snapshot)
.await
};
info!(
Expand Down Expand Up @@ -280,6 +284,8 @@ impl WalObjectStore {
}
}

debug!(snapshot_info = ?wal_contents.snapshot, ">>> snapshot info");

// now that we've persisted this latest notify and start the snapshot, if set
let snapshot_response = match wal_contents.snapshot {
Some(snapshot_details) => {
Expand Down Expand Up @@ -386,7 +392,17 @@ impl Wal for WalObjectStore {
SnapshotInfo,
OwnedSemaphorePermit,
)> {
self.flush_buffer().await
self.flush_buffer(false).await
}

async fn force_flush_buffer(
&self,
) -> Option<(
oneshot::Receiver<SnapshotDetails>,
SnapshotInfo,
OwnedSemaphorePermit,
)> {
self.flush_buffer(true).await
}

async fn cleanup_snapshot(
Expand Down Expand Up @@ -450,21 +466,24 @@ impl FlushBuffer {
/// responses. If a snapshot should occur with this flush, a semaphore permit is also returned.
async fn flush_buffer_into_contents_and_responses(
&mut self,
force_snapshot: bool,
) -> (
WalContents,
Vec<oneshot::Sender<WriteResult>>,
Option<(SnapshotInfo, OwnedSemaphorePermit)>,
) {
// convert into wal contents and responses and capture if a snapshot should be taken
let (mut wal_contents, responses) = self.flush_buffer_with_responses();
self.snapshot_tracker.add_wal_period(WalPeriod {
wal_file_number: wal_contents.wal_file_number,
min_time: Timestamp::new(wal_contents.min_timestamp_ns),
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
});

let snapshot = match self.snapshot_tracker.snapshot() {
if !force_snapshot {
self.snapshot_tracker.add_wal_period(WalPeriod {
wal_file_number: wal_contents.wal_file_number,
min_time: Timestamp::new(wal_contents.min_timestamp_ns),
max_time: Timestamp::new(wal_contents.max_timestamp_ns),
});
}
let snapshot = match self.snapshot_tracker.snapshot(force_snapshot) {
Some(snapshot_info) => {
debug!(?snapshot_info, ">>> snapshot info");
wal_contents.snapshot = Some(snapshot_info.snapshot_details);

Some((snapshot_info, self.acquire_snapshot_permit().await))
Expand Down Expand Up @@ -767,7 +786,7 @@ mod tests {
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();

// create wal file 1
let ret = wal.flush_buffer().await;
let ret = wal.flush_buffer(false).await;
assert!(ret.is_none());
let file_1_contents = create::wal_contents(
(1, 62_000_000_000, 1),
Expand Down Expand Up @@ -835,7 +854,7 @@ mod tests {

// create wal file 2
wal.buffer_op_unconfirmed(op2.clone()).await.unwrap();
assert!(wal.flush_buffer().await.is_none());
assert!(wal.flush_buffer(false).await.is_none());

let file_2_contents = create::wal_contents(
(62_000_000_000, 62_000_000_000, 2),
Expand Down Expand Up @@ -941,7 +960,8 @@ mod tests {
});
wal.buffer_op_unconfirmed(op3.clone()).await.unwrap();

let (snapshot_done, snapshot_info, snapshot_permit) = wal.flush_buffer().await.unwrap();
let (snapshot_done, snapshot_info, snapshot_permit) =
wal.flush_buffer(false).await.unwrap();
let expected_info = SnapshotInfo {
snapshot_details: SnapshotDetails {
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
Expand Down Expand Up @@ -1066,7 +1086,7 @@ mod tests {
None,
);

assert!(wal.flush_buffer().await.is_none());
assert!(wal.flush_buffer(false).await.is_none());
let notifier = notifier.as_any().downcast_ref::<TestNotifier>().unwrap();
assert!(notifier.notified_writes.lock().is_empty());

Expand Down
Loading

0 comments on commit a727836

Please sign in to comment.