Skip to content

Commit

Permalink
feat(spec): impl file index format and interface (apache#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
devillove084 committed Aug 4, 2024
2 parents 8105945 + 65a8e74 commit 42fe0af
Show file tree
Hide file tree
Showing 12 changed files with 882 additions and 418 deletions.
2 changes: 2 additions & 0 deletions crates/paimon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ bitflags = "2.6.0"
chrono = { version = "0.4.38", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_with = "3.8.3"
serde_bytes = "0.11.15"
snafu = "0.8.3"
typed-builder = "^0.18"
opendal = "0.48"
tokio = { version = "1.39.2", features = ["full"] }
16 changes: 13 additions & 3 deletions crates/paimon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@
// specific language governing permissions and limitations
// under the License.

use snafu::Snafu;
use snafu::prelude::*;

/// Result type used in paimon.
pub type Result<T, E = Error> = std::result::Result<T, E>;

/// Error type for paimon.
#[allow(dead_code)]
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("paimon data invalid for {}: {:?}", message, source))]
#[snafu(display("Paimon data invalid for {}: {:?}", message, source))]
DataInvalid {
message: String,
#[snafu(backtrace)]
source: snafu::Whatever,
},
#[snafu(
visibility(pub(crate)),
display("Paimon hitting unexpected error {}: {:?}", message, source)
)]
IoUnexpected {
message: String,
source: opendal::Error,
},
}
186 changes: 186 additions & 0 deletions crates/paimon/src/io/file_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::*;
use std::collections::HashMap;

use opendal::services::MemoryConfig;
use opendal::{Metakey, Operator};
use snafu::ResultExt;

#[derive(Clone, Debug)]
pub struct FileIO {
op: Operator,
}

impl FileIO {
/// Create a new FileIO.
///
/// The input HashMap is paimon-java's [`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
///
/// TODO: Support building Operator from HashMap via options.
pub fn new(_: HashMap<String, String>) -> Result<Self> {
let op = Operator::from_config(MemoryConfig::default())
.context(IoUnexpectedSnafu {
message: "Failed to create operator".to_string(),
})?
.finish();
Ok(Self { op })
}

/// Create a new input file to read data.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
pub fn new_input(&self, path: &str) -> InputFile {
InputFile {
_op: self.op.clone(),
path: path.to_string(),
}
}

/// Create a new output file to write data.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
pub fn new_output(&self, path: &str) -> OutputFile {
OutputFile {
_op: self.op.clone(),
path: path.to_string(),
}
}

/// Return a file status object that represents the path.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
let meta = self.op.stat(path).await.context(IoUnexpectedSnafu {
message: "Failed to get file status".to_string(),
})?;

Ok(FileStatus {
size: meta.content_length(),
})
}

/// List the statuses of the files/directories in the given path if the path is a directory.
///
/// References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L105>
///
/// FIXME: how to handle large dir? Better to return a stream instead?
pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
let entries = self
.op
.list_with(path)
.metakey(Metakey::ContentLength)
.await
.context(IoUnexpectedSnafu {
message: "Failed to list file status".to_string(),
})?;

Ok(entries
.into_iter()
.map(|meta| FileStatus {
size: meta.metadata().content_length(),
})
.collect())
}

/// Check if exists.
///
/// References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
pub async fn exists(&self, path: &str) -> Result<bool> {
self.op.is_exist(path).await.context(IoUnexpectedSnafu {
message: "Failed to check file existence".to_string(),
})
}

/// Delete a file.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
pub async fn delete_file(&self, path: &str) -> Result<()> {
self.op.delete(path).await.context(IoUnexpectedSnafu {
message: "Failed to delete file".to_string(),
})?;

Ok(())
}

/// Delete a dir recursively.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
pub async fn delete_dir(&self, path: &str) -> Result<()> {
self.op.remove_all(path).await.context(IoUnexpectedSnafu {
message: "Failed to delete dir".to_string(),
})?;
Ok(())
}

/// Make the given file and all non-existent parents into directories.
///
/// Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
pub async fn mkdirs(&self, path: &str) -> Result<()> {
self.op.create_dir(path).await.context(IoUnexpectedSnafu {
message: "Failed to create dir".to_string(),
})?;
Ok(())
}

/// Renames the file/directory src to dst.
///
/// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
self.op.rename(src, dst).await.context(IoUnexpectedSnafu {
message: "Failed to rename file".to_string(),
})?;
Ok(())
}
}

/// FileStatus represents the status of a file.
#[derive(Clone, Debug)]
pub struct FileStatus {
pub size: u64,
}

/// Input file represents a file that can be read from.
#[derive(Clone, Debug)]
pub struct InputFile {
_op: Operator,
path: String,
}

impl InputFile {
/// Get the path of given input file.
pub fn path(&self) -> &str {
&self.path
}
}

/// Output file represents a file that can be written to.
#[derive(Clone, Debug)]
pub struct OutputFile {
_op: Operator,
path: String,
}

impl OutputFile {
/// Get the path of given output file.
pub fn path(&self) -> &str {
&self.path
}
}
19 changes: 19 additions & 0 deletions crates/paimon/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod file_io;
pub use file_io::*;
4 changes: 4 additions & 0 deletions crates/paimon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
// under the License.

mod error;
pub use error::Error;
pub use error::Result;

pub mod fileindex;
pub mod fs;
pub mod io;
pub mod options;
pub mod predicate;
pub mod spec;
30 changes: 5 additions & 25 deletions crates/paimon/src/spec/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use super::schema::DataField;
use crate::spec::RowType;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

/// Data type of a sequence of fields. A field consists of a field name, field type, and an optional
/// description. The most specific type of a row of a table is a row type. In this case, each column
/// of the row corresponds to the field of the row type that has the same ordinal position as the
/// column. Compared to the SQL standard, an optional field description simplifies the handling with
/// complex structures.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/db8bcd7fdd9c2705435d2ab1d2341c52d1f67ee5/paimon-common/src/main/java/org/apache/paimon/types/RowType.java>
///
/// TODO: make RowType extends DataType.
/// TODO: move me to a better place.
pub struct RowType {
_fields: Vec<DataField>,
}

impl RowType {
pub const fn new(list: Vec<DataField>) -> Self {
Self { _fields: list }
}
}

pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0);

/// An implementation of InternalRow.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/db8bcd7fdd9c2705435d2ab1d2341c52d1f67ee5/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java>
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java>
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BinaryRow {
Expand All @@ -71,13 +51,13 @@ impl BinaryRow {
/// TODO: implement me.
/// The statistics for columns, supports the following stats.
///
/// Impl References: <https://github.com/apache/paimon/blob/db8bcd7fdd9c2705435d2ab1d2341c52d1f67ee5/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStats.java>
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStats.java>
type SimpleStats = ();

/// The Source of a file.
/// TODO: move me to the manifest module.
///
/// Impl References: <https://github.com/apache/paimon/blob/db8bcd7fdd9c2705435d2ab1d2341c52d1f67ee5/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java>
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java>
#[repr(u8)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -88,7 +68,7 @@ pub enum FileSource {

/// Metadata of a data file.
///
/// Impl References: <https://github.com/apache/paimon/blob/db8bcd7fdd9c2705435d2ab1d2341c52d1f67ee5/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java>
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java>
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DataFileMeta {
Expand Down
Loading

0 comments on commit 42fe0af

Please sign in to comment.