diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index fd8b2667259f5..7703d201aaea9 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -26,7 +26,7 @@ use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +use datafusion_physical_expr_common::physical_expr::DynHash; /// State of a dynamic filter, tracking both updates and completion. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -103,8 +103,11 @@ impl Inner { impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { - let inner = self.current().expect("Failed to get current expression"); - inner.dyn_hash(state); + // Use pointer identity of the inner Arc for stable hashing. + // This is stable across update() calls and consistent with Eq. + // See issue #19641 for details on why content-based hashing violates + // the Hash/Eq contract when the underlying expression can change. + Arc::as_ptr(&self.inner).hash(state); self.children.dyn_hash(state); self.remapped_children.dyn_hash(state); } @@ -112,11 +115,13 @@ impl Hash for DynamicFilterPhysicalExpr { impl PartialEq for DynamicFilterPhysicalExpr { fn eq(&self, other: &Self) -> bool { - let inner = self.current().expect("Failed to get current expression"); - let our_children = self.remapped_children.as_ref().unwrap_or(&self.children); - let other_children = other.remapped_children.as_ref().unwrap_or(&other.children); - let other = other.current().expect("Failed to get current expression"); - inner.dyn_eq(other.as_any()) && our_children == other_children + // Two dynamic filters are equal if they share the same inner source + // AND have the same children configuration. + // This is consistent with Hash using Arc::as_ptr. + // See issue #19641 for details on the Hash/Eq contract violation fix. + Arc::ptr_eq(&self.inner, &other.inner) + && self.children == other.children + && self.remapped_children == other.remapped_children } } @@ -753,4 +758,106 @@ mod test { "Filter should still be used with multiple consumers" ); } + + /// Test that verifies the Hash/Eq contract is now satisfied (issue #19641 fix). + /// + /// After the fix, Hash uses Arc::as_ptr(&self.inner) which is stable across + /// update() calls, fixing the HashMap key instability issue. + #[test] + fn test_hash_stable_after_update() { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + // Create filter with initial value + let filter = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + + // Compute hash BEFORE update + let mut hasher_before = DefaultHasher::new(); + filter.hash(&mut hasher_before); + let hash_before = hasher_before.finish(); + + // Update changes the underlying expression + filter + .update(lit(false) as Arc) + .expect("Update should succeed"); + + // Compute hash AFTER update + let mut hasher_after = DefaultHasher::new(); + filter.hash(&mut hasher_after); + let hash_after = hasher_after.finish(); + + // FIXED: Hash should now be STABLE after update() because we use + // Arc::as_ptr for identity-based hashing instead of expression content. + assert_eq!( + hash_before, hash_after, + "Hash should be stable after update() - fix for issue #19641" + ); + + // Self-equality should still hold + assert!(filter.eq(&filter), "Self-equality should hold"); + } + + /// Test that verifies separate DynamicFilterPhysicalExpr instances + /// with the same expression are NOT equal (identity-based comparison). + #[test] + fn test_identity_based_equality() { + // Create two separate filters with identical initial expressions + let filter1 = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + let filter2 = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + + // Different instances should NOT be equal even with same expression + // because they have independent inner Arcs (different update lifecycles) + assert!( + !filter1.eq(&filter2), + "Different instances should not be equal (identity-based)" + ); + + // Self-equality should hold + assert!(filter1.eq(&filter1), "Self-equality should hold"); + } + + /// Test that hash is stable for the same filter instance. + /// After the fix, hash uses Arc::as_ptr which is pointer-based. + #[test] + fn test_hash_stable_for_same_instance() { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let filter = + DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc); + + // Compute hash twice for the same instance + let hash1 = { + let mut h = DefaultHasher::new(); + filter.hash(&mut h); + h.finish() + }; + let hash2 = { + let mut h = DefaultHasher::new(); + filter.hash(&mut h); + h.finish() + }; + + assert_eq!(hash1, hash2, "Same instance should have stable hash"); + + // Update the expression + filter + .update(lit(false) as Arc) + .expect("Update should succeed"); + + // Hash should STILL be the same (identity-based) + let hash3 = { + let mut h = DefaultHasher::new(); + filter.hash(&mut h); + h.finish() + }; + + assert_eq!( + hash1, hash3, + "Hash should be stable after update (identity-based)" + ); + } }