Skip to content

Commit b84e0d2

Browse files
refactor: Split scan module (#1120)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? Split Scan module, i was going through it and all the information for scan was on one page (too long) Co-authored-by: Renjie Liu <[email protected]>
1 parent c5d61c6 commit b84e0d2

File tree

4 files changed

+639
-553
lines changed

4 files changed

+639
-553
lines changed

crates/iceberg/src/scan/cache.rs

+237
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::sync::{Arc, RwLock};
20+
21+
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
22+
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
23+
use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
24+
use crate::expr::{Bind, BoundPredicate};
25+
use crate::spec::{Schema, TableMetadataRef};
26+
use crate::{Error, ErrorKind, Result};
27+
28+
/// Manages the caching of [`BoundPredicate`] objects
29+
/// for [`PartitionSpec`]s based on partition spec id.
30+
#[derive(Debug)]
31+
pub(crate) struct PartitionFilterCache(RwLock<HashMap<i32, Arc<BoundPredicate>>>);
32+
33+
impl PartitionFilterCache {
34+
/// Creates a new [`PartitionFilterCache`]
35+
/// with an empty internal HashMap.
36+
pub(crate) fn new() -> Self {
37+
Self(RwLock::new(HashMap::new()))
38+
}
39+
40+
/// Retrieves a [`BoundPredicate`] from the cache
41+
/// or computes it if not present.
42+
pub(crate) fn get(
43+
&self,
44+
spec_id: i32,
45+
table_metadata: &TableMetadataRef,
46+
schema: &Schema,
47+
case_sensitive: bool,
48+
filter: BoundPredicate,
49+
) -> Result<Arc<BoundPredicate>> {
50+
// we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
51+
// below, otherwise we hit deadlock
52+
{
53+
let read = self.0.read().map_err(|_| {
54+
Error::new(
55+
ErrorKind::Unexpected,
56+
"PartitionFilterCache RwLock was poisoned",
57+
)
58+
})?;
59+
60+
if read.contains_key(&spec_id) {
61+
return Ok(read.get(&spec_id).unwrap().clone());
62+
}
63+
}
64+
65+
let partition_spec = table_metadata
66+
.partition_spec_by_id(spec_id)
67+
.ok_or(Error::new(
68+
ErrorKind::Unexpected,
69+
format!("Could not find partition spec for id {}", spec_id),
70+
))?;
71+
72+
let partition_type = partition_spec.partition_type(schema)?;
73+
let partition_fields = partition_type.fields().to_owned();
74+
let partition_schema = Arc::new(
75+
Schema::builder()
76+
.with_schema_id(partition_spec.spec_id())
77+
.with_fields(partition_fields)
78+
.build()?,
79+
);
80+
81+
let mut inclusive_projection = InclusiveProjection::new(partition_spec.clone());
82+
83+
let partition_filter = inclusive_projection
84+
.project(&filter)?
85+
.rewrite_not()
86+
.bind(partition_schema.clone(), case_sensitive)?;
87+
88+
self.0
89+
.write()
90+
.map_err(|_| {
91+
Error::new(
92+
ErrorKind::Unexpected,
93+
"PartitionFilterCache RwLock was poisoned",
94+
)
95+
})?
96+
.insert(spec_id, Arc::new(partition_filter));
97+
98+
let read = self.0.read().map_err(|_| {
99+
Error::new(
100+
ErrorKind::Unexpected,
101+
"PartitionFilterCache RwLock was poisoned",
102+
)
103+
})?;
104+
105+
Ok(read.get(&spec_id).unwrap().clone())
106+
}
107+
}
108+
109+
/// Manages the caching of [`ManifestEvaluator`] objects
110+
/// for [`PartitionSpec`]s based on partition spec id.
111+
#[derive(Debug)]
112+
pub(crate) struct ManifestEvaluatorCache(RwLock<HashMap<i32, Arc<ManifestEvaluator>>>);
113+
114+
impl ManifestEvaluatorCache {
115+
/// Creates a new [`ManifestEvaluatorCache`]
116+
/// with an empty internal HashMap.
117+
pub(crate) fn new() -> Self {
118+
Self(RwLock::new(HashMap::new()))
119+
}
120+
121+
/// Retrieves a [`ManifestEvaluator`] from the cache
122+
/// or computes it if not present.
123+
pub(crate) fn get(
124+
&self,
125+
spec_id: i32,
126+
partition_filter: Arc<BoundPredicate>,
127+
) -> Arc<ManifestEvaluator> {
128+
// we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
129+
// below, otherwise we hit deadlock
130+
{
131+
let read = self
132+
.0
133+
.read()
134+
.map_err(|_| {
135+
Error::new(
136+
ErrorKind::Unexpected,
137+
"ManifestEvaluatorCache RwLock was poisoned",
138+
)
139+
})
140+
.unwrap();
141+
142+
if read.contains_key(&spec_id) {
143+
return read.get(&spec_id).unwrap().clone();
144+
}
145+
}
146+
147+
self.0
148+
.write()
149+
.map_err(|_| {
150+
Error::new(
151+
ErrorKind::Unexpected,
152+
"ManifestEvaluatorCache RwLock was poisoned",
153+
)
154+
})
155+
.unwrap()
156+
.insert(
157+
spec_id,
158+
Arc::new(ManifestEvaluator::new(partition_filter.as_ref().clone())),
159+
);
160+
161+
let read = self
162+
.0
163+
.read()
164+
.map_err(|_| {
165+
Error::new(
166+
ErrorKind::Unexpected,
167+
"ManifestEvaluatorCache RwLock was poisoned",
168+
)
169+
})
170+
.unwrap();
171+
172+
read.get(&spec_id).unwrap().clone()
173+
}
174+
}
175+
176+
/// Manages the caching of [`ExpressionEvaluator`] objects
177+
/// for [`PartitionSpec`]s based on partition spec id.
178+
#[derive(Debug)]
179+
pub(crate) struct ExpressionEvaluatorCache(RwLock<HashMap<i32, Arc<ExpressionEvaluator>>>);
180+
181+
impl ExpressionEvaluatorCache {
182+
/// Creates a new [`ExpressionEvaluatorCache`]
183+
/// with an empty internal HashMap.
184+
pub(crate) fn new() -> Self {
185+
Self(RwLock::new(HashMap::new()))
186+
}
187+
188+
/// Retrieves a [`ExpressionEvaluator`] from the cache
189+
/// or computes it if not present.
190+
pub(crate) fn get(
191+
&self,
192+
spec_id: i32,
193+
partition_filter: &BoundPredicate,
194+
) -> Result<Arc<ExpressionEvaluator>> {
195+
// we need a block here to ensure that the `read()` gets dropped before we hit the `write()`
196+
// below, otherwise we hit deadlock
197+
{
198+
let read = self.0.read().map_err(|_| {
199+
Error::new(
200+
ErrorKind::Unexpected,
201+
"PartitionFilterCache RwLock was poisoned",
202+
)
203+
})?;
204+
205+
if read.contains_key(&spec_id) {
206+
return Ok(read.get(&spec_id).unwrap().clone());
207+
}
208+
}
209+
210+
self.0
211+
.write()
212+
.map_err(|_| {
213+
Error::new(
214+
ErrorKind::Unexpected,
215+
"ManifestEvaluatorCache RwLock was poisoned",
216+
)
217+
})
218+
.unwrap()
219+
.insert(
220+
spec_id,
221+
Arc::new(ExpressionEvaluator::new(partition_filter.clone())),
222+
);
223+
224+
let read = self
225+
.0
226+
.read()
227+
.map_err(|_| {
228+
Error::new(
229+
ErrorKind::Unexpected,
230+
"ManifestEvaluatorCache RwLock was poisoned",
231+
)
232+
})
233+
.unwrap();
234+
235+
Ok(read.get(&spec_id).unwrap().clone())
236+
}
237+
}

0 commit comments

Comments
 (0)