Skip to content

Commit

Permalink
feat: support UnboundPartitionSpec (#106)
Browse files Browse the repository at this point in the history
* Implement unbound partition spec.

* little update

* Update in tablecreate & addspec

* fixup: add some tests.

* Put comments before derive
  • Loading branch information
my-vegetable-has-exploded authored Dec 7, 2023
1 parent eb33e90 commit d206c1d
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 35 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
/target
/Cargo.lock
.idea
.vscode
**/.DS_Store
16 changes: 7 additions & 9 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ mod _serde {

use serde_derive::{Deserialize, Serialize};

use iceberg::spec::{PartitionSpec, Schema, SortOrder, TableMetadata};
use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate};

pub(super) const OK: u16 = 200u16;
Expand Down Expand Up @@ -660,7 +660,7 @@ mod _serde {
pub(super) name: String,
pub(super) location: Option<String>,
pub(super) schema: Schema,
pub(super) partition_spec: Option<PartitionSpec>,
pub(super) partition_spec: Option<UnboundPartitionSpec>,
pub(super) write_order: Option<SortOrder>,
pub(super) stage_create: Option<bool>,
pub(super) properties: Option<HashMap<String, String>>,
Expand All @@ -686,9 +686,9 @@ mod tests {
use chrono::{TimeZone, Utc};
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary,
Transform, Type,
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec,
};
use iceberg::transaction::Transaction;
use mockito::{Mock, Server, ServerGuard};
Expand Down Expand Up @@ -1233,14 +1233,12 @@ mod tests {
)
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
.partition_spec(
PartitionSpec::builder()
.with_fields(vec![PartitionField::builder()
UnboundPartitionSpec::builder()
.with_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.field_id(1000)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
.with_spec_id(1)
.build()
.unwrap(),
)
Expand Down
36 changes: 15 additions & 21 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
use serde_derive::{Deserialize, Serialize};
use urlencoding::encode;

use crate::spec::{FormatVersion, PartitionSpec, Schema, Snapshot, SnapshotReference, SortOrder};
use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, UnboundPartitionSpec,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -226,7 +228,7 @@ pub struct TableCreation {
pub schema: Schema,
/// The partition spec of the table, could be None.
#[builder(default, setter(strip_option))]
pub partition_spec: Option<PartitionSpec>,
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
Expand Down Expand Up @@ -361,7 +363,7 @@ pub enum TableUpdate {
/// Add a new partition spec to the table
AddSpec {
/// The partition spec to add.
spec: PartitionSpec,
spec: UnboundPartitionSpec,
},
/// Set table's default spec
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -429,9 +431,9 @@ pub enum TableUpdate {
mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec,
PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
SortField, SortOrder, Summary, Transform, Type,
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -758,23 +760,19 @@ mod tests {
{
"action": "add-spec",
"spec": {
"spec-id": 1,
"fields": [
{
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
},
{
"source-id": 1,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
},
{
"source-id": 2,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
}
Expand All @@ -783,28 +781,24 @@ mod tests {
}
"#,
TableUpdate::AddSpec {
spec: PartitionSpec::builder()
.with_spec_id(1)
.with_partition_field(
PartitionField::builder()
spec: UnboundPartitionSpec::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(4)
.field_id(1000)
.name("ts_day".to_string())
.transform(Transform::Day)
.build(),
)
.with_partition_field(
PartitionField::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(1)
.field_id(1001)
.name("id_bucket".to_string())
.transform(Transform::Bucket(16))
.build(),
)
.with_partition_field(
PartitionField::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(2)
.field_id(1002)
.name("id_truncate".to_string())
.transform(Transform::Truncate(4))
.build(),
Expand Down
108 changes: 103 additions & 5 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ use super::transform::Transform;

/// Reference to [`PartitionSpec`].
pub type PartitionSpecRef = Arc<PartitionSpec>;
/// Partition fields capture the transform from table data to partition values.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
/// Partition fields capture the transform from table data to partition values.
pub struct PartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
Expand All @@ -41,10 +41,10 @@ pub struct PartitionField {
pub transform: Transform,
}

/// Partition spec that defines how to produce a tuple of partition values from a record.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
/// Partition spec that defines how to produce a tuple of partition values from a record.
pub struct PartitionSpec {
/// Identifier for PartitionSpec
pub spec_id: i32,
Expand All @@ -60,13 +60,51 @@ impl PartitionSpec {
}
}

/// Reference to [`UnboundPartitionSpec`].
pub type UnboundPartitionSpecRef = Arc<UnboundPartitionSpec>;
/// Unbound partition field can be built without a schema and later bound to a schema.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
pub struct UnboundPartitionField {
/// A source column id from the table’s schema
pub source_id: i32,
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
#[builder(default, setter(strip_option))]
pub partition_id: Option<i32>,
/// A partition name.
pub name: String,
/// A transform that is applied to the source column to produce a partition value.
pub transform: Transform,
}

/// Unbound partition spec can be built without a schema and later bound to a schema.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
pub struct UnboundPartitionSpec {
/// Identifier for PartitionSpec
#[builder(default, setter(strip_option))]
pub spec_id: Option<i32>,
/// Details of the partition spec
#[builder(setter(each(name = "with_unbound_partition_field")))]
pub fields: Vec<UnboundPartitionField>,
}

impl UnboundPartitionSpec {
/// Create unbound partition spec builer
pub fn builder() -> UnboundPartitionSpecBuilder {
UnboundPartitionSpecBuilder::default()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn partition_spec() {
let sort_order = r#"
fn test_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
Expand All @@ -88,7 +126,7 @@ mod tests {
}
"#;

let partition_spec: PartitionSpec = serde_json::from_str(sort_order).unwrap();
let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(1000, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
Expand All @@ -104,4 +142,64 @@ mod tests {
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);
}

#[test]
fn test_unbound_partition_spec() {
let spec = r#"
{
"spec-id": 1,
"fields": [ {
"source-id": 4,
"partition-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"partition-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"partition-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
}
"#;

let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(Some(1), partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(Some(1000), partition_spec.fields[0].partition_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);

assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(Some(1001), partition_spec.fields[1].partition_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);

assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(Some(1002), partition_spec.fields[2].partition_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);

let spec = r#"
{
"fields": [ {
"source-id": 4,
"name": "ts_day",
"transform": "day"
} ]
}
"#;
let partition_spec: UnboundPartitionSpec = serde_json::from_str(spec).unwrap();
assert_eq!(None, partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(None, partition_spec.fields[0].partition_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
}

0 comments on commit d206c1d

Please sign in to comment.