diff --git a/src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs b/src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs index 1307805924a41..d0d459cedc3c8 100644 --- a/src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs +++ b/src/query/storages/fuse/fuse/src/operations/mutation/compact/compact_transform.rs @@ -37,6 +37,7 @@ use opendal::Operator; use super::compact_meta::CompactSourceMeta; use super::compact_part::CompactTask; use super::CompactSinkMeta; +use crate::io::write_data; use crate::io::BlockReader; use crate::io::TableMetaLocationGenerator; use crate::operations::mutation::AbortOperation; @@ -111,7 +112,7 @@ impl CompactTransform { thresholds: BlockCompactThresholds, ) -> Result { let settings = ctx.get_settings(); - let max_memory_usage = settings.get_max_memory_usage()?; + let max_memory_usage = (settings.get_max_memory_usage()? as f64 * 0.95) as u64; let max_threads = settings.get_max_threads()?; let max_memory = max_memory_usage / max_threads; Ok(ProcessorPtr::create(Box::new(CompactTransform { @@ -361,13 +362,9 @@ impl Processor for CompactTransform { while let Some(state) = serialize_states.pop() { handles.push(async move { // write block data. - dal.object(&state.block_location) - .write(state.block_data) - .await?; + write_data(&state.block_data, dal, &state.block_location).await?; // write index data. - dal.object(&state.index_location) - .write(state.index_data) - .await + write_data(&state.index_data, dal, &state.index_location).await }); } futures::future::try_join_all(handles).await?;