Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 0 additions & 75 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,46 +117,6 @@ pub enum DiskManagerMode {
Disabled,
}

/// Configuration for temporary disk access
#[deprecated(since = "48.0.0", note = "Use DiskManagerBuilder instead")]
#[derive(Debug, Clone, Default)]
#[allow(clippy::allow_attributes)]
#[allow(deprecated)]
pub enum DiskManagerConfig {
/// Use the provided [DiskManager] instance
Existing(Arc<DiskManager>),

/// Create a new [DiskManager] that creates temporary files within
/// a temporary directory chosen by the OS
#[default]
NewOs,

/// Create a new [DiskManager] that creates temporary files within
/// the specified directories
NewSpecified(Vec<PathBuf>),

/// Disable disk manager, attempts to create temporary files will error
Disabled,
}

#[expect(deprecated)]
impl DiskManagerConfig {
/// Create temporary files in a temporary directory chosen by the OS
pub fn new() -> Self {
Self::default()
}

/// Create temporary files using the provided disk manager
pub fn new_existing(existing: Arc<DiskManager>) -> Self {
Self::Existing(existing)
}

/// Create temporary files in the specified directories
pub fn new_specified(paths: Vec<PathBuf>) -> Self {
Self::NewSpecified(paths)
}
}

/// Manages files generated during query execution, e.g. spill files generated
/// while processing dataset larger than available memory.
#[derive(Debug)]
Expand Down Expand Up @@ -192,41 +152,6 @@ impl DiskManager {
DiskManagerBuilder::default()
}

/// Create a DiskManager given the configuration
#[expect(deprecated)]
#[deprecated(since = "48.0.0", note = "Use DiskManager::builder() instead")]
pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
match config {
DiskManagerConfig::Existing(manager) => Ok(manager),
DiskManagerConfig::NewOs => Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(&conf_dirs)?;
debug!(
"Created local dirs {local_dirs:?} as DataFusion working directory"
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: AtomicU64::new(
DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: AtomicU64::new(DEFAULT_MAX_TEMP_DIRECTORY_SIZE),
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}

/// Atomically set the max temp directory size at runtime.
///
/// Takes `&self`, so it works through `Arc<DiskManager>` without requiring
Expand Down
32 changes: 10 additions & 22 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
//! Execution [`RuntimeEnv`] environment that manages access to object
//! store, memory manager, disk manager.

#[expect(deprecated)]
use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
use crate::disk_manager::SpillingProgress;
use crate::{
disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
memory_pool::{
Expand Down Expand Up @@ -333,9 +332,8 @@ impl Default for RuntimeEnv {
/// See example on [`RuntimeEnv`]
#[derive(Clone)]
pub struct RuntimeEnvBuilder {
#[expect(deprecated)]
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
pub disk_manager: Option<Arc<DiskManager>>,
/// DiskManager builder to manager temporary disk file usage
pub disk_manager_builder: Option<DiskManagerBuilder>,
/// [`MemoryPool`] from which to allocate memory
Expand Down Expand Up @@ -371,14 +369,6 @@ impl RuntimeEnvBuilder {
}
}

#[expect(deprecated)]
#[deprecated(since = "48.0.0", note = "Use with_disk_manager_builder instead")]
/// Customize disk manager
pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) -> Self {
self.disk_manager = disk_manager;
self
}

/// Customize the disk manager builder
pub fn with_disk_manager_builder(mut self, disk_manager: DiskManagerBuilder) -> Self {
self.disk_manager_builder = Some(disk_manager);
Expand Down Expand Up @@ -472,14 +462,15 @@ impl RuntimeEnvBuilder {
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

let disk_manager: Arc<DiskManager> = match (disk_manager, disk_manager_builder) {
(_, Some(builder)) => Arc::new(builder.build()?),
(Some(manager), None) => manager,
(None, None) => Arc::new(DiskManagerBuilder::default().build()?),
};

Ok(RuntimeEnv {
memory_pool,
disk_manager: if let Some(builder) = disk_manager_builder {
Arc::new(builder.build()?)
} else {
#[expect(deprecated)]
DiskManager::try_new(disk_manager)?
},
disk_manager,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -511,10 +502,7 @@ impl RuntimeEnvBuilder {
};

Self {
#[expect(deprecated)]
disk_manager: DiskManagerConfig::Existing(Arc::clone(
&runtime_env.disk_manager,
)),
disk_manager: Some(Arc::clone(&runtime_env.disk_manager)),
disk_manager_builder: None,
memory_pool: Some(Arc::clone(&runtime_env.memory_pool)),
cache_manager: cache_config,
Expand Down
Loading