Skip to content

Commit

Permalink
Add UnboundSortOrder
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Dec 9, 2023
1 parent 641d058 commit e2323c3
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 30 deletions.
13 changes: 7 additions & 6 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ mod _serde {

use serde_derive::{Deserialize, Serialize};

use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec};
use iceberg::spec::{Schema, TableMetadata, UnboundPartitionSpec, UnboundSortOrder};
use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate};

pub(super) const OK: u16 = 200u16;
Expand Down Expand Up @@ -695,7 +695,7 @@ mod _serde {
pub(super) location: Option<String>,
pub(super) schema: Schema,
pub(super) partition_spec: Option<UnboundPartitionSpec>,
pub(super) write_order: Option<SortOrder>,
pub(super) write_order: Option<UnboundSortOrder>,
pub(super) stage_create: Option<bool>,
pub(super) properties: Option<HashMap<String, String>>,
}
Expand All @@ -721,8 +721,8 @@ mod tests {
use iceberg::spec::ManifestListLocation::ManifestListFile;
use iceberg::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec,
SnapshotLog, SortDirection, SortOrder, Summary, Transform, Type, UnboundPartitionField,
UnboundPartitionSpec, UnboundSortField, UnboundSortOrder,
};
use iceberg::transaction::Transaction;
use mockito::{Mock, Server, ServerGuard};
Expand Down Expand Up @@ -1277,9 +1277,10 @@ mod tests {
.unwrap(),
)
.sort_order(
SortOrder::builder()
UnboundSortOrder::builder()
.with_order_id(1)
.with_sort_field(
SortField::builder()
UnboundSortField::builder()
.source_id(2)
.transform(Transform::Identity)
.direction(SortDirection::Ascending)
Expand Down
16 changes: 8 additions & 8 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
use urlencoding::encode;

use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, UnboundPartitionSpec,
FormatVersion, Schema, Snapshot, SnapshotReference, UnboundPartitionSpec, UnboundSortOrder,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct TableCreation {
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
pub sort_order: Option<SortOrder>,
pub sort_order: Option<UnboundSortOrder>,
/// The properties of the table.
#[builder(default)]
pub properties: HashMap<String, String>,
Expand Down Expand Up @@ -375,7 +375,7 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSortOrder {
/// Sort order to add.
sort_order: SortOrder,
sort_order: UnboundSortOrder,
},
/// Set table's default sort order
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -432,8 +432,8 @@ mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
SnapshotReference, SnapshotRetention, SortDirection, Summary, Transform, Type,
UnboundPartitionField, UnboundPartitionSpec, UnboundSortField, UnboundSortOrder,
};
use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -848,18 +848,18 @@ mod tests {
"#;

let update = TableUpdate::AddSortOrder {
sort_order: SortOrder::builder()
sort_order: UnboundSortOrder::builder()
.with_order_id(1)
.with_sort_field(
SortField::builder()
UnboundSortField::builder()
.source_id(2)
.direction(SortDirection::Ascending)
.null_order(NullOrder::First)
.transform(Transform::Identity)
.build(),
)
.with_sort_field(
SortField::builder()
UnboundSortField::builder()
.source_id(3)
.direction(SortDirection::Descending)
.null_order(NullOrder::Last)
Expand Down
143 changes: 134 additions & 9 deletions crates/iceberg/src/spec/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
/*!
* Sorting
*/
use crate::error::Result;
use crate::{Error, ErrorKind};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use typed_builder::TypedBuilder;

use super::transform::Transform;
use super::{schema::SchemaRef, transform::Transform};

/// Reference to [`SortOrder`].
pub type SortOrderRef = Arc<SortOrder>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
/// Sort direction in a partition, either ascending or descending
pub enum SortDirection {
/// Ascending
Expand All @@ -37,7 +40,7 @@ pub enum SortDirection {
Descending,
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Copy, Clone)]
/// Describes the order of null values when sorted.
pub enum NullOrder {
#[serde(rename = "nulls-first")]
Expand Down Expand Up @@ -88,15 +91,106 @@ impl SortOrder {
pub fn is_unsorted(&self) -> bool {
self.fields.is_empty()
}

/// Converts to an unbound sort order
pub fn to_unbound(&self) -> UnboundSortOrder {
UnboundSortOrder::builder()
.with_order_id(self.order_id)
.with_fields(
self.fields
.iter()
.map(|x| UnboundSortField {
source_id: x.source_id,
transform: x.transform,
direction: x.direction,
null_order: x.null_order,
})
.collect_vec(),
)
.build()
.unwrap()
}
}

/// Reference to [`UnboundSortOrder`].
pub type UnboundSortOrderRef = Arc<UnboundSortOrder>;

/// Entry for every column that is to be sorted
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
#[serde(rename_all = "kebab-case")]
pub struct UnboundSortField {
/// A source column id from the table’s schema
pub source_id: i32,
/// A transform that is used to produce values to be sorted on from the source column.
pub transform: Transform,
/// A sort direction, that can only be either asc or desc
pub direction: SortDirection,
/// A null order that describes the order of null values when sorted.
pub null_order: NullOrder,
}

/// Unbound sort order can be later bound to a schema.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder, Default)]
#[serde(rename_all = "kebab-case")]
#[builder(setter(prefix = "with"))]
#[builder(build_fn(skip))]
pub struct UnboundSortOrder {
/// Identifier for the SortOrder, order_id `0` is no sort order.
pub order_id: i64,
/// Details of the sort
#[builder(setter(each(name = "with_sort_field")))]
pub fields: Vec<UnboundSortField>,
}

impl UnboundSortOrder {
/// Create unbound sort order builder
pub fn builder() -> UnboundSortOrderBuilder {
UnboundSortOrderBuilder::default()
}

/// Create an unbound unsorted order
fn unsorted_order() -> UnboundSortOrder {
UnboundSortOrder {
order_id: 0,
fields: Vec::new(),
}
}

/// Bind unbound partition spec to a schema
pub fn bind(&self, _schema: SchemaRef) -> Result<SortOrder> {
todo!()
}
}

impl UnboundSortOrderBuilder {
/// Creates a new unbound sort order.
pub fn build(&self) -> Result<UnboundSortOrder> {
let fields = self.fields.clone().unwrap_or(Vec::new());
return match (self.order_id, fields.as_slice()) {
(Some(0) | None, []) => Ok(UnboundSortOrder::unsorted_order()),
(_, []) => Err(Error::new(
ErrorKind::Unexpected,
"Unsorted order ID must be 0",
)),
(Some(0), [..]) => Err(Error::new(
ErrorKind::Unexpected,
"Sort order ID 0 is reserved for unsorted order",
)),
(_, [..]) => Ok(UnboundSortOrder {
order_id: self.order_id.unwrap_or(1),
fields: fields.to_vec(),
}),
};
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn sort_field() {
let sort_field = r#"
fn test_sort_field() {
let spec = r#"
{
"transform": "bucket[4]",
"source-id": 3,
Expand All @@ -105,16 +199,47 @@ mod tests {
}
"#;

let field: SortField = serde_json::from_str(sort_field).unwrap();
let field: SortField = serde_json::from_str(spec).unwrap();
assert_eq!(Transform::Bucket(4), field.transform);
assert_eq!(3, field.source_id);
assert_eq!(SortDirection::Descending, field.direction);
assert_eq!(NullOrder::Last, field.null_order);
}

#[test]
fn sort_order() {
let sort_order = r#"
fn test_sort_order() {
let spec = r#"
{
"order-id": 1,
"fields": [ {
"transform": "identity",
"source-id": 2,
"direction": "asc",
"null-order": "nulls-first"
}, {
"transform": "bucket[4]",
"source-id": 3,
"direction": "desc",
"null-order": "nulls-last"
} ]
}
"#;

let order: SortOrder = serde_json::from_str(spec).unwrap();
assert_eq!(Transform::Identity, order.fields[0].transform);
assert_eq!(2, order.fields[0].source_id);
assert_eq!(SortDirection::Ascending, order.fields[0].direction);
assert_eq!(NullOrder::First, order.fields[0].null_order);

assert_eq!(Transform::Bucket(4), order.fields[1].transform);
assert_eq!(3, order.fields[1].source_id);
assert_eq!(SortDirection::Descending, order.fields[1].direction);
assert_eq!(NullOrder::Last, order.fields[1].null_order);
}

#[test]
fn test_unbound_sort_order() {
let spec = r#"
{
"order-id": 1,
"fields": [ {
Expand All @@ -131,7 +256,7 @@ mod tests {
}
"#;

let order: SortOrder = serde_json::from_str(sort_order).unwrap();
let order: UnboundSortOrder = serde_json::from_str(spec).unwrap();
assert_eq!(Transform::Identity, order.fields[0].transform);
assert_eq!(2, order.fields[0].source_id);
assert_eq!(SortDirection::Ascending, order.fields[0].direction);
Expand Down
17 changes: 10 additions & 7 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
//! This module contains transaction api.
use crate::error::Result;
use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform};
use crate::spec::{
FormatVersion, NullOrder, SortDirection, Transform, UnboundSortField, UnboundSortOrder,
};
use crate::table::Table;
use crate::TableUpdate::UpgradeFormatVersion;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
Expand Down Expand Up @@ -124,7 +126,7 @@ impl<'a> Transaction<'a> {
/// Transaction action for replacing sort order.
pub struct ReplaceSortOrderAction<'a> {
tx: Transaction<'a>,
sort_fields: Vec<SortField>,
sort_fields: Vec<UnboundSortField>,
}

impl<'a> ReplaceSortOrderAction<'a> {
Expand All @@ -140,12 +142,13 @@ impl<'a> ReplaceSortOrderAction<'a> {

/// Finished building the action and apply it to the transaction.
pub fn apply(mut self) -> Result<Transaction<'a>> {
let unbound_sort_order = UnboundSortOrder::builder()
.with_fields(self.sort_fields)
.build()?;

let updates = vec![
TableUpdate::AddSortOrder {
sort_order: SortOrder {
fields: self.sort_fields,
..SortOrder::default()
},
sort_order: unbound_sort_order,
},
TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
];
Expand Down Expand Up @@ -189,7 +192,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
)
})?;

let sort_field = SortField::builder()
let sort_field = UnboundSortField::builder()
.source_id(field_id)
.transform(Transform::Identity)
.direction(sort_direction)
Expand Down

0 comments on commit e2323c3

Please sign in to comment.