diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 1e33ce0..e38f72a 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -27,9 +27,13 @@ license.workspace = true version.workspace = true [dependencies] +rand = "0.8.5" +async-trait = "0.1.81" +lazy_static = "1.5.0" bitflags = "2.6.0" -chrono = {version = "0.4.38", features = ["serde"]} +chrono = { version = "0.4.38", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_with = "3.8.3" snafu = "0.8.3" typed-builder = "^0.18" +tokio = { version = "1.39.2", features = ["full"] } diff --git a/crates/paimon/src/fileindex/file_index_common.rs b/crates/paimon/src/fileindex/file_index_common.rs new file mode 100644 index 0000000..2e1615c --- /dev/null +++ b/crates/paimon/src/fileindex/file_index_common.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use crate::spec::{DataField, DataType}; + +pub fn to_map_key(map_column_name: &str, key_name: &str) -> String { + format!("{}[{}]", map_column_name, key_name) +} + +pub fn get_field_type(_fields: &HashMap, _column_name: &str) -> DataType { + todo!() +} diff --git a/crates/paimon/src/fileindex/file_index_format.rs b/crates/paimon/src/fileindex/file_index_format.rs new file mode 100644 index 0000000..86c26d3 --- /dev/null +++ b/crates/paimon/src/fileindex/file_index_format.rs @@ -0,0 +1,486 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, io::SeekFrom}; + +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; + +pub const MAGIC: u64 = 1493475289347502; +pub const EMPTY_INDEX_FLAG: i32 = -1; + +/// File index file format. Put all columns and offsets in the header. +/// +/// ``` +/// _____________________________________ _____________________ +/// | magic |version|head length | +/// |-------------------------------------| +/// | column number | +/// |-------------------------------------| +/// | column 1 | index number | +/// |-------------------------------------| +/// | index name 1 |start pos |length | +/// |-------------------------------------| +/// | index name 2 |start pos |length | +/// |-------------------------------------| +/// | index name 3 |start pos |length | +/// |-------------------------------------| HEAD +/// | column 2 | index number | +/// |-------------------------------------| +/// | index name 1 |start pos |length | +/// |-------------------------------------| +/// | index name 2 |start pos |length | +/// |-------------------------------------| +/// | index name 3 |start pos |length | +/// |-------------------------------------| +/// | ... | +/// |-------------------------------------| +/// | ... | +/// |-------------------------------------| +/// | redundant length |redundant bytes | +/// |-------------------------------------| --------------------- +/// | BODY | +/// | BODY | +/// | BODY | BODY +/// | BODY | +/// |_____________________________________| _____________________ +/// ``` +/// +/// - `magic`: 8 bytes long +/// - `version`: 4 bytes int +/// - `head length`: 4 bytes int +/// - `column number`: 4 bytes int +/// - `column x`: var bytes utf (length + bytes) +/// - `index number`: 4 bytes int (how many column items below) +/// - `index name x`: var bytes utf +/// - `start pos`: 4 bytes int +/// - `length`: 4 bytes int +/// - `redundant length`: 4 bytes int (for compatibility with later versions, in this version, content is zero) +/// - `redundant bytes`: var bytes (for compatibility with later version, in this version, is empty) +/// - `BODY`: column index bytes + column index bytes + column index bytes + ....... + +#[derive(Debug)] +pub struct FileIndexFormat; + +impl FileIndexFormat { + pub fn create_writer(writer: W) -> Writer { + Writer::new(writer) + } + + pub async fn create_reader( + reader: R, + ) -> std::io::Result> { + Reader::new(reader).await + } +} + +#[allow(dead_code)] +pub struct Writer { + writer: BufWriter, +} + +#[allow(dead_code)] +impl Writer { + pub fn new(writer: W) -> Self { + Self { + writer: BufWriter::new(writer), + } + } + + async fn write_column_indexes( + &mut self, + indexes: HashMap>>, + ) -> std::io::Result<()> { + let mut body_info: HashMap> = HashMap::new(); + let mut baos: Vec = Vec::new(); + + for (column_name, bytes_map) in indexes.into_iter() { + let inner_map = body_info.entry(column_name.clone()).or_default(); + for (index_name, data) in bytes_map { + let start_position = baos.len() as i32; + if data.is_empty() { + inner_map.insert( + index_name, + IndexInfo { + start_pos: EMPTY_INDEX_FLAG, + length: 0, + }, + ); + } else { + baos.extend(data); + inner_map.insert( + index_name, + IndexInfo { + start_pos: start_position, + length: baos.len() as i32 - start_position, + }, + ); + } + } + } + + let body = baos; + self.write_head(&body_info).await?; + self.writer.write_all(&body).await?; + Ok(()) + } + + async fn write_head( + &mut self, + body_info: &HashMap>, + ) -> std::io::Result<()> { + let head_length = self.calculate_head_length(body_info).await?; + + // write Magic + self.writer.write_u64(MAGIC).await?; + // write Version + self.writer.write_i32(Version::V1.into()).await?; + // write HeadLength + self.writer.write_i32(head_length as i32).await?; + // write ColumnSize + self.writer.write_i32(body_info.len() as i32).await?; + for (column_name, index_info) in body_info { + // write ColumnName + self.writer.write_u16(column_name.len() as u16).await?; + self.writer.write_all(column_name.as_bytes()).await?; + // write IndexTypeSize + self.writer.write_i32(index_info.len() as i32).await?; + // write ColumnInfo, offset = headLength + for (index_name, IndexInfo { start_pos, length }) in index_info { + self.writer.write_u16(index_name.len() as u16).await?; + self.writer.write_all(index_name.as_bytes()).await?; + let adjusted_start = if *start_pos == EMPTY_INDEX_FLAG { + EMPTY_INDEX_FLAG + } else { + *start_pos + head_length as i32 + }; + self.writer.write_i32(adjusted_start).await?; + self.writer.write_i32(*length).await?; + } + } + // write RedundantLength + self.writer.write_i32(0).await?; + Ok(()) + } + + async fn calculate_head_length( + &self, + body_info: &HashMap>, + ) -> std::io::Result { + let base_length = 8 + + 4 + + 4 + + 4 + + body_info.values().map(|v| v.len()).sum::() * 8 + + body_info.len() * 4 + + 4; + + let mut baos = Vec::new(); + for (column_name, index_info) in body_info { + baos.extend_from_slice(&(column_name.len() as u16).to_be_bytes()); + baos.extend_from_slice(column_name.as_bytes()); + for index_name in index_info.keys() { + baos.extend_from_slice(&(index_name.len() as u16).to_be_bytes()); + baos.extend_from_slice(index_name.as_bytes()); + } + } + + Ok(base_length + baos.len()) + } +} + +#[allow(dead_code)] +pub struct Reader { + reader: BufReader, + header: HashMap>, +} + +#[allow(dead_code)] +impl Reader { + pub async fn new(reader: R) -> std::io::Result { + let mut buf_reader = BufReader::new(reader); + let magic = buf_reader.read_u64().await?; + if magic != MAGIC { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid magic number", + )); + } + + let version: Version = buf_reader.read_i32().await?.into(); + if version != Version::V1 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unsupported file index version", + )); + } + + let head_length = buf_reader.read_i32().await?; + let mut header = HashMap::new(); + + let column_size = buf_reader.read_i32().await?; + for _ in 0..column_size { + let column_name = Self::read_string(&mut buf_reader).await?; + let index_size = buf_reader.read_i32().await?; + let mut index_info = HashMap::new(); + for _ in 0..index_size { + let index_name = Self::read_string(&mut buf_reader).await?; + let start_pos = buf_reader.read_i32().await?; + let length = buf_reader.read_i32().await?; + index_info.insert(index_name, IndexInfo { start_pos, length }); + } + header.insert(column_name, index_info); + } + + buf_reader.seek(SeekFrom::Start(head_length as u64)).await?; + + Ok(Reader { + reader: buf_reader, + header, + }) + } + + async fn read_column_index( + &self, + column_name: &str, + ) -> std::io::Result>> { + if let Some(index_info) = self.header.get(column_name) { + let mut result = HashMap::new(); + for (index_name, info) in index_info { + let bytes = self.get_bytes_with_start_and_length(info).await?; + result.insert(index_name.clone(), bytes); + } + Ok(result) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Column not found", + )) + } + } + + async fn get_bytes_with_start_and_length( + &self, + index_info: &IndexInfo, + ) -> std::io::Result> { + let mut reader = self.reader.get_ref().clone(); + let mut buffer = vec![0; index_info.length as usize]; + reader + .seek(SeekFrom::Start(index_info.start_pos as u64)) + .await?; + reader.read_exact(&mut buffer).await?; + Ok(buffer) + } + + async fn read_string(reader: &mut BufReader) -> std::io::Result { + let len = reader.read_u16().await? as usize; + let mut buffer = vec![0; len]; + reader.read_exact(&mut buffer).await?; + Ok(String::from_utf8(buffer).expect("Invalid UTF-8")) + } + + #[cfg(test)] + pub async fn get_bytes_with_name_and_type( + &self, + column_name: &str, + index_type: &str, + ) -> Option> { + if let Some(indexes) = self.header.get(column_name) { + if let Some(index_info) = indexes.get(index_type) { + return self.get_bytes_with_start_and_length(index_info).await.ok(); + } + } + None + } +} + +#[derive(Debug)] +struct IndexInfo { + start_pos: i32, + length: i32, +} + +#[derive(Debug, PartialEq, Eq)] +enum Version { + V1, +} + +impl From for Version { + fn from(version: i32) -> Self { + match version { + 1 => Version::V1, + _ => panic!("Unsupported file index version: {}", version), + } + } +} + +impl From for i32 { + fn from(version: Version) -> Self { + match version { + Version::V1 => 1, + } + } +} + +#[cfg(test)] +mod test { + use std::{collections::HashMap, io::Cursor}; + + use rand::{distributions::Uniform, Rng}; + use tokio::io::AsyncWriteExt; + + use crate::fileindex::FileIndexFormat; + + #[tokio::test] + async fn test_write_read() -> std::io::Result<()> { + let mut writer_buffer = Cursor::new(Vec::new()); + let mut writer = FileIndexFormat::create_writer(&mut writer_buffer); + + let mut indexes: HashMap>> = HashMap::new(); + indexes + .entry("column1".to_string()) + .or_default() + .insert("index1".to_string(), vec![1, 2, 3, 4]); + indexes + .entry("column1".to_string()) + .or_default() + .insert("index2".to_string(), vec![5, 6, 7, 8]); + + writer.write_column_indexes(indexes.clone()).await?; + writer.writer.flush().await?; + let index_bytes = writer_buffer.into_inner(); + + let reader = FileIndexFormat::create_reader(Cursor::new(index_bytes)).await?; + for (column, types) in indexes { + for (type_name, expected_data) in types { + let data = reader + .get_bytes_with_name_and_type(&column, &type_name) + .await + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::NotFound, "Data not found") + })?; + assert_eq!(data, expected_data); + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_random_data() -> std::io::Result<()> { + let mut writer_buffer = Cursor::new(Vec::new()); + let mut writer = FileIndexFormat::create_writer(&mut writer_buffer); + + let mut indexes: HashMap>> = HashMap::new(); + let mut rng = rand::thread_rng(); + let char_range = Uniform::new(b'a', b'z' + 1); + let num_columns = rng.gen_range(1..=100); + for _ in 0..num_columns { + let column_name: String = (0..rng.gen_range(1..=50)) + .map(|_| char::from(rng.sample(char_range))) + .collect(); + let mut column_indexes = HashMap::new(); + let num_indexes = rng.gen_range(1..=100); + for _ in 0..num_indexes { + let index_name: String = (0..rng.gen_range(1..=20)) + .map(|_| char::from(rng.sample(char_range))) + .collect(); + let data_len = rng.gen_range(1..=1000); + let data: Vec = (0..data_len).map(|_| rng.gen()).collect(); + column_indexes.insert(index_name, data); + } + indexes.insert(column_name, column_indexes); + } + + writer.write_column_indexes(indexes.clone()).await?; + writer.writer.flush().await?; + let index_bytes = writer_buffer.into_inner(); + + let reader = FileIndexFormat::create_reader(Cursor::new(index_bytes)).await?; + for (column, types) in indexes { + for (type_name, expected_data) in types { + let data = reader + .get_bytes_with_name_and_type(&column, &type_name) + .await + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::NotFound, "Data not found") + })?; + assert_eq!(data, expected_data); + } + } + + Ok(()) + } + + #[tokio::test] + async fn test_empty_and_missing_data() -> std::io::Result<()> { + let mut writer_buffer = Cursor::new(Vec::new()); + let mut writer = FileIndexFormat::create_writer(&mut writer_buffer); + + let mut indexes: HashMap>> = HashMap::new(); + indexes + .entry("a".to_string()) + .or_default() + .insert("b".to_string(), Vec::new()); + indexes + .entry("a".to_string()) + .or_default() + .insert("c".to_string(), vec![1, 2, 3]); + + writer.write_column_indexes(indexes.clone()).await?; + writer.writer.flush().await?; + let index_bytes = writer_buffer.into_inner(); + + let reader = FileIndexFormat::create_reader(Cursor::new(index_bytes)).await?; + let file_index_format_list = reader.read_column_index("a").await?; + assert_eq!(file_index_format_list.len(), 2); + + let empty_data = reader + .get_bytes_with_name_and_type("a", "b") + .await + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "Data not found"))?; + assert!(empty_data.is_empty()); + + let normal_data = reader + .get_bytes_with_name_and_type("a", "c") + .await + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::NotFound, "Data not found"))?; + assert_eq!(normal_data, vec![1, 2, 3]); + + Ok(()) + } + + #[should_panic] + #[tokio::test] + async fn test_corrupted_data() { + let mut writer_buffer = Cursor::new(Vec::new()); + let mut writer = FileIndexFormat::create_writer(&mut writer_buffer); + + let mut indexes: HashMap>> = HashMap::new(); + indexes + .entry("column1".to_string()) + .or_default() + .insert("index1".to_string(), vec![1, 2, 3, 4]); + + assert!(writer.write_column_indexes(indexes).await.is_ok()); + assert!(writer.writer.flush().await.is_ok()); + let mut index_bytes = writer_buffer.into_inner(); + + index_bytes[10] = 0xFF; + + let _ = FileIndexFormat::create_reader(Cursor::new(index_bytes)).await; + } +} diff --git a/crates/paimon/src/fileindex/file_index_reader.rs b/crates/paimon/src/fileindex/file_index_reader.rs new file mode 100644 index 0000000..badfa99 --- /dev/null +++ b/crates/paimon/src/fileindex/file_index_reader.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::predicate::{FieldRef, FunctionVisitor}; + +use super::FileIndexResult; + +/// Read file index from serialized bytes. Return true, +/// means we need to search this file, else means needn't. +pub trait FileIndexReader: FunctionVisitor { + fn visit_is_not_null(&self, _field_ref: &FieldRef) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_is_null(&self, _field_ref: &FieldRef) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_starts_with(&self, _field_ref: &FieldRef, _literal: Self::Literal) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_ends_with(&self, _field_ref: &FieldRef, _literal: Self::Literal) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_less_than(&self, _field_ref: &FieldRef, _literal: Self::Literal) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_greater_or_equal( + &self, + _field_ref: &FieldRef, + _literal: Self::Literal, + ) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_not_equal(&self, _field_ref: &FieldRef, _literal: Self::Literal) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_less_or_equal( + &self, + _field_ref: &FieldRef, + _literal: Self::Literal, + ) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_equal(&self, _field_ref: &FieldRef, _literal: Self::Literal) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_greater_than( + &self, + _field_ref: &FieldRef, + _literal: Self::Literal, + ) -> FileIndexResult { + FileIndexResult::Remain + } + + fn visit_in(&self, _field_ref: &FieldRef, _literals: Vec) -> FileIndexResult { + let mut file_index_result = FileIndexResult::Remain; + for key in _literals { + file_index_result = match file_index_result { + FileIndexResult::Remain => FileIndexReader::visit_equal(self, _field_ref, key), + _ => file_index_result.or(FileIndexReader::visit_equal(self, _field_ref, key)), + }; + } + file_index_result + } + + fn visit_not_in( + &self, + _field_ref: &FieldRef, + _literals: Vec, + ) -> FileIndexResult { + let mut file_index_result = FileIndexResult::Remain; + for key in _literals { + file_index_result = match file_index_result { + FileIndexResult::Remain => FileIndexReader::visit_not_equal(self, _field_ref, key), + _ => file_index_result.or(FileIndexReader::visit_not_equal(self, _field_ref, key)), + }; + } + file_index_result + } + + fn visit_and(&self, _children: Vec) -> FileIndexResult { + panic!("Should not invoke this"); + } + + fn visit_or(&self, _children: Vec) -> FileIndexResult { + panic!("Should not invoke this"); + } +} diff --git a/crates/paimon/src/fileindex/file_index_result.rs b/crates/paimon/src/fileindex/file_index_result.rs new file mode 100644 index 0000000..54f8cac --- /dev/null +++ b/crates/paimon/src/fileindex/file_index_result.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FileIndexResult { + Remain, + Skip, +} + +impl FileIndexResult { + pub fn remain(&self) -> bool { + matches!(self, FileIndexResult::Remain) + } + + pub fn and(&self, other: FileIndexResult) -> FileIndexResult { + if self.remain() && other.remain() { + FileIndexResult::Remain + } else { + FileIndexResult::Skip + } + } + + pub fn or(&self, other: FileIndexResult) -> FileIndexResult { + if self.remain() || other.remain() { + FileIndexResult::Remain + } else { + FileIndexResult::Skip + } + } +} + +lazy_static::lazy_static! { + pub static ref REMAIN: Arc = Arc::new(FileIndexResult::Remain); + pub static ref SKIP: Arc = Arc::new(FileIndexResult::Skip); +} diff --git a/crates/paimon/src/fileindex/file_index_writer.rs b/crates/paimon/src/fileindex/file_index_writer.rs new file mode 100644 index 0000000..2c3bd75 --- /dev/null +++ b/crates/paimon/src/fileindex/file_index_writer.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; + +#[async_trait] +pub trait FileIndexWriter: Send + Sync { + async fn write(&mut self, key: &[u8]); + + async fn serialized_bytes(&self) -> Vec; + + async fn write_record(&mut self, key: &[u8]) { + self.set_empty(false); + self.write(key).await; + } + + fn is_empty(&self) -> bool; + + fn set_empty(&mut self, empty: bool); +} diff --git a/crates/paimon/src/fileindex/file_indexer.rs b/crates/paimon/src/fileindex/file_indexer.rs new file mode 100644 index 0000000..a63e2f5 --- /dev/null +++ b/crates/paimon/src/fileindex/file_indexer.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::{fs::SeekableInputStream, options::Options, spec::DataType}; + +use super::{FileIndexReader, FileIndexWriter}; + +pub trait FileIndexer: Send + Sync { + fn create_writer(&self) -> Arc; + fn create_reader( + &self, + input_stream: Arc, + start: usize, + length: usize, + ) -> impl FileIndexReader; + + fn create(&self, _typ: &str, _data_type: DataType, _options: Options) -> Arc { + // let file_indexer_factory = FileIndexerFactoryRegistry::load(typ); + // file_indexer_factory.create(data_type, options) + todo!() + } +} diff --git a/crates/paimon/src/fileindex/file_indexer_factory.rs b/crates/paimon/src/fileindex/file_indexer_factory.rs new file mode 100644 index 0000000..dc4f2b8 --- /dev/null +++ b/crates/paimon/src/fileindex/file_indexer_factory.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{options::Options, spec::DataType}; + +use super::FileIndexer; + +pub trait FileIndexerFactory: Send + Sync { + fn identifier(&self) -> String; + fn create(&self, data_type: DataType, options: Options) -> impl FileIndexer; +} + +// pub struct FileIndexerFactoryRegistry { +// factories: Mutex>>, +// } + +// lazy_static! { +// static ref EXAMPLE_FACTORY_REGISTRY: FileIndexerFactoryRegistry = +// FileIndexerFactoryRegistry { +// factories: Mutex::new(HashMap::new()), +// }; +// } + +// impl FileIndexerFactoryRegistry { +// pub fn register(&self, factory: Arc) { +// let identifier = factory.identifier().to_string(); +// let mut factories = self.factories.lock().unwrap(); +// if factories.insert(identifier.clone(), factory).is_some() { +// warn!( +// "Found multiple FileIndexer for type: {}, choose one of them", +// identifier +// ); +// } +// } + +// pub fn load(&self, type_name: &str) -> Arc { +// let factories = self.factories.lock().unwrap(); +// if let Some(factory) = factories.get(type_name) { +// Arc::clone(factory) +// } else { +// panic!("Can't find file index for type: {}", type_name); +// } +// } +// } diff --git a/crates/paimon/src/fileindex/mod.rs b/crates/paimon/src/fileindex/mod.rs new file mode 100644 index 0000000..d8bad5f --- /dev/null +++ b/crates/paimon/src/fileindex/mod.rs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod file_index_format; +pub use file_index_format::*; + +mod file_index_writer; +pub use file_index_writer::*; + +mod file_index_reader; +pub use file_index_reader::*; + +mod file_index_result; +pub use file_index_result::*; + +mod file_index_common; +pub use file_index_common::*; + +mod file_indexer_factory; +pub use file_indexer_factory::*; + +mod file_indexer; +pub use file_indexer::*; diff --git a/crates/paimon/src/fs/mod.rs b/crates/paimon/src/fs/mod.rs new file mode 100644 index 0000000..4e105c6 --- /dev/null +++ b/crates/paimon/src/fs/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod seekable_input; +pub use seekable_input::*; diff --git a/crates/paimon/src/fs/seekable_input.rs b/crates/paimon/src/fs/seekable_input.rs new file mode 100644 index 0000000..23b7f73 --- /dev/null +++ b/crates/paimon/src/fs/seekable_input.rs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use std::io::{self, SeekFrom}; +use tokio::io::{AsyncRead, AsyncSeek}; + +#[async_trait] +pub trait SeekableInputStream: AsyncRead + AsyncSeek + Unpin { + async fn seek(&mut self, pos: SeekFrom) -> io::Result; + + async fn get_pos(&mut self) -> io::Result; + + async fn read(&mut self, buf: &mut [u8]) -> io::Result; + + async fn close(&mut self) -> io::Result<()>; +} diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index bf367f0..753a8b1 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -16,4 +16,8 @@ // under the License. mod error; +pub mod fileindex; +pub mod fs; +pub mod options; +pub mod predicate; pub mod spec; diff --git a/crates/paimon/src/options/mod.rs b/crates/paimon/src/options/mod.rs new file mode 100644 index 0000000..049e070 --- /dev/null +++ b/crates/paimon/src/options/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod option; +pub use option::*; diff --git a/crates/paimon/src/options/option.rs b/crates/paimon/src/options/option.rs new file mode 100644 index 0000000..cef610d --- /dev/null +++ b/crates/paimon/src/options/option.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// TODO:implement options +pub struct Options {} diff --git a/crates/paimon/src/predicate/field_ref.rs b/crates/paimon/src/predicate/field_ref.rs new file mode 100644 index 0000000..4bb8f23 --- /dev/null +++ b/crates/paimon/src/predicate/field_ref.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataType; +use std::fmt::{Debug, Display, Formatter}; +use std::hash::{Hash, Hasher}; + +#[derive(Clone)] +pub struct FieldRef { + index: usize, + name: String, + data_type: DataType, +} + +impl FieldRef { + pub fn new(index: usize, name: &str, data_type: DataType) -> Self { + FieldRef { + index, + name: name.to_string(), + data_type, + } + } + + pub fn index(&self) -> usize { + self.index + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn data_type(&self) -> &DataType { + &self.data_type + } +} + +impl PartialEq for FieldRef { + fn eq(&self, other: &Self) -> bool { + self.index == other.index && self.name == other.name && self.data_type == other.data_type + } +} + +impl Eq for FieldRef {} + +impl Debug for FieldRef { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "FieldRef {{ index: {}, name: '{}', data_type: {:?} }}", + self.index, self.name, self.data_type + ) + } +} + +impl Hash for FieldRef { + fn hash(&self, state: &mut H) { + self.index.hash(state); + self.name.hash(state); + self.data_type.hash(state); + } +} + +impl Display for FieldRef { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "FieldRef {{ index: {}, name: '{}', data_type: {:?} }}", + self.index, self.name, self.data_type + ) + } +} diff --git a/crates/paimon/src/predicate/function_visitor.rs b/crates/paimon/src/predicate/function_visitor.rs new file mode 100644 index 0000000..1b3a0a2 --- /dev/null +++ b/crates/paimon/src/predicate/function_visitor.rs @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{FieldRef, PredicateVisitor}; + +pub trait FunctionVisitor: PredicateVisitor { + type Literal; + type Target; + + fn visit_is_not_null(&self, field_ref: &FieldRef) -> Self::Target; + + fn visit_is_null(&self, field_ref: &FieldRef) -> Self::Target; + + fn visit_starts_with(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_ends_with(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_less_than(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_greater_or_equal(&self, field_ref: &FieldRef, literal: &Self::Literal) + -> Self::Target; + + fn visit_not_equal(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_less_or_equal(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_equal(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_greater_than(&self, field_ref: &FieldRef, literal: &Self::Literal) -> Self::Target; + + fn visit_in(&self, field_ref: &FieldRef, literals: &[Self::Literal]) -> Self::Target; + + fn visit_not_in(&self, field_ref: &FieldRef, literals: &[Self::Literal]) -> Self::Target; + + fn visit_and(&self, children: Vec) -> Self::Target; + + fn visit_or(&self, children: Vec) -> Self::Target; +} diff --git a/crates/paimon/src/predicate/mod.rs b/crates/paimon/src/predicate/mod.rs new file mode 100644 index 0000000..7720ba3 --- /dev/null +++ b/crates/paimon/src/predicate/mod.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod field_ref; +pub use field_ref::*; + +mod predicate_visitor; +pub use predicate_visitor::*; + +mod function_visitor; +pub use function_visitor::*; + +pub trait Predicate: Send + Sync { + fn negate(&self) -> Option>; + fn visit(&self, visitor: &impl PredicateVisitor) -> T; +} diff --git a/crates/paimon/src/predicate/predicate_visitor.rs b/crates/paimon/src/predicate/predicate_visitor.rs new file mode 100644 index 0000000..6d8b096 --- /dev/null +++ b/crates/paimon/src/predicate/predicate_visitor.rs @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub struct LeafPredicate {} + +pub struct CompoundPredicate {} + +pub trait PredicateVisitor { + fn visit_leaf(&self, predicate: &LeafPredicate) -> T; + fn visit_compound(&self, predicate: &CompoundPredicate) -> T; +}