Skip to content

Commit f3a571d

Browse files
authored
feat: Add integration test and support append DataFile (#349)
* support append data file and add e2e test * fix typos * refine append action * fix cargo sort * add consistent check for partition value * generate unique snapshot id * avoid to set snapshot id for v2 * refine test * fix unit test * export ports * fix None case for parant_snapshot_id * fix parquect schema check * refactor append action of transaction * refine * refine e2e test * refine commit uuid * fix file format field to uppercase in manifest * refine SnapshotProduceAction * rename e2e_test to integration_tests * fix * use docker-compose.yaml from rest catalog * fix check --------- Co-authored-by: ZENOTME <[email protected]>
1 parent 4e5187b commit f3a571d

File tree

17 files changed

+1130
-53
lines changed

17 files changed

+1130
-53
lines changed

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
[workspace]
1919
resolver = "2"
2020
members = [
21-
"crates/catalog/*",
22-
"crates/examples",
23-
"crates/iceberg",
24-
"crates/integrations/*",
25-
"crates/test_utils",
21+
"crates/catalog/*",
22+
"crates/examples",
23+
"crates/iceberg",
24+
"crates/integration_tests",
25+
"crates/integrations/*",
26+
"crates/test_utils",
2627
]
2728
exclude = ["bindings/python"]
2829

crates/catalog/rest/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ mod tests {
13761376
.with_schema_id(0)
13771377
.with_summary(Summary {
13781378
operation: Operation::Append,
1379-
other: HashMap::from_iter([
1379+
additional_properties: HashMap::from_iter([
13801380
("spark.app.id", "local-1646787004168"),
13811381
("added-data-files", "1"),
13821382
("added-records", "1"),

crates/iceberg/src/catalog/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ mod tests {
14231423
.with_schema_id(1)
14241424
.with_summary(Summary {
14251425
operation: Operation::Append,
1426-
other: HashMap::default(),
1426+
additional_properties: HashMap::default(),
14271427
})
14281428
.build(),
14291429
};
@@ -1457,7 +1457,7 @@ mod tests {
14571457
.with_manifest_list("s3://a/b/2.avro")
14581458
.with_summary(Summary {
14591459
operation: Operation::Append,
1460-
other: HashMap::default(),
1460+
additional_properties: HashMap::default(),
14611461
})
14621462
.build(),
14631463
};

crates/iceberg/src/io/object_cache.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ mod tests {
185185
use crate::spec::{
186186
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
187187
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
188-
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
188+
ManifestWriter, Struct, TableMetadata,
189189
};
190190
use crate::table::Table;
191191
use crate::TableIdent;
@@ -293,9 +293,7 @@ mod tests {
293293
.new_output(current_snapshot.manifest_list())
294294
.unwrap(),
295295
current_snapshot.snapshot_id(),
296-
current_snapshot
297-
.parent_snapshot_id()
298-
.unwrap_or(EMPTY_SNAPSHOT_ID),
296+
current_snapshot.parent_snapshot_id(),
299297
current_snapshot.sequence_number(),
300298
);
301299
manifest_list_write

crates/iceberg/src/scan.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,6 @@ mod tests {
977977
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
978978
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
979979
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
980-
EMPTY_SNAPSHOT_ID,
981980
};
982981
use crate::table::Table;
983982
use crate::TableIdent;
@@ -1124,9 +1123,7 @@ mod tests {
11241123
.new_output(current_snapshot.manifest_list())
11251124
.unwrap(),
11261125
current_snapshot.snapshot_id(),
1127-
current_snapshot
1128-
.parent_snapshot_id()
1129-
.unwrap_or(EMPTY_SNAPSHOT_ID),
1126+
current_snapshot.parent_snapshot_id(),
11301127
current_snapshot.sequence_number(),
11311128
);
11321129
manifest_list_write

crates/iceberg/src/spec/manifest.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,12 @@ impl ManifestEntry {
900900
}
901901
}
902902

903+
/// Snapshot id
904+
#[inline]
905+
pub fn snapshot_id(&self) -> Option<i64> {
906+
self.snapshot_id
907+
}
908+
903909
/// Data sequence number.
904910
#[inline]
905911
pub fn sequence_number(&self) -> Option<i64> {
@@ -1328,7 +1334,7 @@ mod _serde {
13281334
Ok(Self {
13291335
content: value.content as i32,
13301336
file_path: value.file_path,
1331-
file_format: value.file_format.to_string(),
1337+
file_format: value.file_format.to_string().to_ascii_uppercase(),
13321338
partition: RawLiteral::try_from(
13331339
Literal::Struct(value.partition),
13341340
&Type::Struct(partition_type.clone()),

crates/iceberg/src/spec/manifest_list.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
106106

107107
impl ManifestListWriter {
108108
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
109-
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
110-
let metadata = HashMap::from_iter([
109+
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
110+
let mut metadata = HashMap::from_iter([
111111
("snapshot-id".to_string(), snapshot_id.to_string()),
112-
(
113-
"parent-snapshot-id".to_string(),
114-
parent_snapshot_id.to_string(),
115-
),
116112
("format-version".to_string(), "1".to_string()),
117113
]);
114+
if let Some(parent_snapshot_id) = parent_snapshot_id {
115+
metadata.insert(
116+
"parent-snapshot-id".to_string(),
117+
parent_snapshot_id.to_string(),
118+
);
119+
}
118120
Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
119121
}
120122

121123
/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
122124
pub fn v2(
123125
output_file: OutputFile,
124126
snapshot_id: i64,
125-
parent_snapshot_id: i64,
127+
parent_snapshot_id: Option<i64>,
126128
sequence_number: i64,
127129
) -> Self {
128-
let metadata = HashMap::from_iter([
130+
let mut metadata = HashMap::from_iter([
129131
("snapshot-id".to_string(), snapshot_id.to_string()),
130-
(
131-
"parent-snapshot-id".to_string(),
132-
parent_snapshot_id.to_string(),
133-
),
134132
("sequence-number".to_string(), sequence_number.to_string()),
135133
("format-version".to_string(), "2".to_string()),
136134
]);
135+
metadata.insert(
136+
"parent-snapshot-id".to_string(),
137+
parent_snapshot_id
138+
.map(|v| v.to_string())
139+
.unwrap_or("null".to_string()),
140+
);
137141
Self::new(
138142
FormatVersion::V2,
139143
output_file,
@@ -580,6 +584,18 @@ pub struct ManifestFile {
580584
pub key_metadata: Vec<u8>,
581585
}
582586

587+
impl ManifestFile {
588+
/// Checks if the manifest file has any added files.
589+
pub fn has_added_files(&self) -> bool {
590+
self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
591+
}
592+
593+
/// Checks if the manifest file has any existed files.
594+
pub fn has_existing_files(&self) -> bool {
595+
self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
596+
}
597+
}
598+
583599
/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
584600
#[derive(Debug, PartialEq, Clone, Eq)]
585601
pub enum ManifestContentType {
@@ -1146,7 +1162,7 @@ mod test {
11461162
let mut writer = ManifestListWriter::v1(
11471163
file_io.new_output(full_path.clone()).unwrap(),
11481164
1646658105718557341,
1149-
1646658105718557341,
1165+
Some(1646658105718557341),
11501166
);
11511167

11521168
writer
@@ -1213,7 +1229,7 @@ mod test {
12131229
let mut writer = ManifestListWriter::v2(
12141230
file_io.new_output(full_path.clone()).unwrap(),
12151231
1646658105718557341,
1216-
1646658105718557341,
1232+
Some(1646658105718557341),
12171233
1,
12181234
);
12191235

@@ -1335,7 +1351,7 @@ mod test {
13351351
let io = FileIOBuilder::new_fs_io().build().unwrap();
13361352
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
13371353

1338-
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
1354+
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
13391355
writer
13401356
.add_manifests(expected_manifest_list.entries.clone().into_iter())
13411357
.unwrap();
@@ -1391,7 +1407,7 @@ mod test {
13911407
let io = FileIOBuilder::new_fs_io().build().unwrap();
13921408
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
13931409

1394-
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num);
1410+
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
13951411
writer
13961412
.add_manifests(expected_manifest_list.entries.clone().into_iter())
13971413
.unwrap();
@@ -1445,7 +1461,7 @@ mod test {
14451461
let io = FileIOBuilder::new_fs_io().build().unwrap();
14461462
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
14471463

1448-
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
1464+
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1);
14491465
writer
14501466
.add_manifests(expected_manifest_list.entries.clone().into_iter())
14511467
.unwrap();

crates/iceberg/src/spec/snapshot.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct Summary {
5959
pub operation: Operation,
6060
/// Other summary data.
6161
#[serde(flatten)]
62-
pub other: HashMap<String, String>,
62+
pub additional_properties: HashMap<String, String>,
6363
}
6464

6565
impl Default for Operation {
@@ -291,7 +291,7 @@ pub(super) mod _serde {
291291
},
292292
summary: v1.summary.unwrap_or(Summary {
293293
operation: Operation::default(),
294-
other: HashMap::new(),
294+
additional_properties: HashMap::new(),
295295
}),
296296
schema_id: v1.schema_id,
297297
})
@@ -372,6 +372,21 @@ pub enum SnapshotRetention {
372372
},
373373
}
374374

375+
impl SnapshotRetention {
376+
/// Create a new branch retention policy
377+
pub fn branch(
378+
min_snapshots_to_keep: Option<i32>,
379+
max_snapshot_age_ms: Option<i64>,
380+
max_ref_age_ms: Option<i64>,
381+
) -> Self {
382+
SnapshotRetention::Branch {
383+
min_snapshots_to_keep,
384+
max_snapshot_age_ms,
385+
max_ref_age_ms,
386+
}
387+
}
388+
}
389+
375390
#[cfg(test)]
376391
mod tests {
377392
use std::collections::HashMap;
@@ -408,7 +423,7 @@ mod tests {
408423
assert_eq!(
409424
Summary {
410425
operation: Operation::Append,
411-
other: HashMap::new()
426+
additional_properties: HashMap::new()
412427
},
413428
*result.summary()
414429
);

crates/iceberg/src/spec/table_metadata.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ impl TableMetadata {
201201
self.last_sequence_number
202202
}
203203

204+
/// Returns the next sequence number for the table.
205+
///
206+
/// For format version 1, it always returns the initial sequence number.
207+
/// For other versions, it returns the last sequence number incremented by 1.
208+
#[inline]
209+
pub fn next_sequence_number(&self) -> i64 {
210+
match self.format_version {
211+
FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
212+
_ => self.last_sequence_number + 1,
213+
}
214+
}
215+
204216
/// Returns last updated time.
205217
#[inline]
206218
pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
@@ -1476,7 +1488,7 @@ mod tests {
14761488
.with_sequence_number(0)
14771489
.with_schema_id(0)
14781490
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
1479-
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
1491+
.with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
14801492
.build();
14811493

14821494
let expected = TableMetadata {
@@ -1895,7 +1907,7 @@ mod tests {
18951907
.with_manifest_list("s3://a/b/1.avro")
18961908
.with_summary(Summary {
18971909
operation: Operation::Append,
1898-
other: HashMap::new(),
1910+
additional_properties: HashMap::new(),
18991911
})
19001912
.build();
19011913

@@ -1908,7 +1920,7 @@ mod tests {
19081920
.with_manifest_list("s3://a/b/2.avro")
19091921
.with_summary(Summary {
19101922
operation: Operation::Append,
1911-
other: HashMap::new(),
1923+
additional_properties: HashMap::new(),
19121924
})
19131925
.build();
19141926

crates/iceberg/src/spec/table_metadata_builder.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,7 +1818,7 @@ mod tests {
18181818
.with_manifest_list("/snap-1.avro")
18191819
.with_summary(Summary {
18201820
operation: Operation::Append,
1821-
other: HashMap::from_iter(vec![
1821+
additional_properties: HashMap::from_iter(vec![
18221822
(
18231823
"spark.app.id".to_string(),
18241824
"local-1662532784305".to_string(),
@@ -1881,7 +1881,7 @@ mod tests {
18811881
.with_manifest_list("/snap-1.avro")
18821882
.with_summary(Summary {
18831883
operation: Operation::Append,
1884-
other: HashMap::from_iter(vec![
1884+
additional_properties: HashMap::from_iter(vec![
18851885
(
18861886
"spark.app.id".to_string(),
18871887
"local-1662532784305".to_string(),
@@ -1901,7 +1901,7 @@ mod tests {
19011901
.with_manifest_list("/snap-1.avro")
19021902
.with_summary(Summary {
19031903
operation: Operation::Append,
1904-
other: HashMap::from_iter(vec![
1904+
additional_properties: HashMap::from_iter(vec![
19051905
(
19061906
"spark.app.id".to_string(),
19071907
"local-1662532784305".to_string(),
@@ -1949,7 +1949,7 @@ mod tests {
19491949
.with_manifest_list("/snap-1.avro")
19501950
.with_summary(Summary {
19511951
operation: Operation::Append,
1952-
other: HashMap::new(),
1952+
additional_properties: HashMap::new(),
19531953
})
19541954
.build();
19551955

@@ -1994,7 +1994,7 @@ mod tests {
19941994
.with_manifest_list("/snap-1.avro")
19951995
.with_summary(Summary {
19961996
operation: Operation::Append,
1997-
other: HashMap::from_iter(vec![
1997+
additional_properties: HashMap::from_iter(vec![
19981998
(
19991999
"spark.app.id".to_string(),
20002000
"local-1662532784305".to_string(),
@@ -2114,7 +2114,7 @@ mod tests {
21142114
.with_manifest_list("/snap-1")
21152115
.with_summary(Summary {
21162116
operation: Operation::Append,
2117-
other: HashMap::new(),
2117+
additional_properties: HashMap::new(),
21182118
})
21192119
.build();
21202120

@@ -2140,7 +2140,7 @@ mod tests {
21402140
.with_parent_snapshot_id(Some(1))
21412141
.with_summary(Summary {
21422142
operation: Operation::Append,
2143-
other: HashMap::new(),
2143+
additional_properties: HashMap::new(),
21442144
})
21452145
.build();
21462146

0 commit comments

Comments
 (0)