Skip to content

Commit 4275c6b

Browse files
authored
feat(spec): Impl the ManifestFile read functionality (#69)
1 parent f727960 commit 4275c6b

File tree

9 files changed

+403
-188
lines changed

9 files changed

+403
-188
lines changed

crates/paimon/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ serde = { version = "1", features = ["derive"] }
4444
serde_bytes = "0.11.15"
4545
serde_json = "1.0.120"
4646
serde_with = "3.9.0"
47+
serde_repr = "0.1"
4748
snafu = "0.8.3"
4849
typed-builder = "^0.19"
4950
opendal = { version = "0.49", features = ["services-fs"] }

crates/paimon/src/spec/data_file.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::spec::RowType;
18+
use crate::spec::stats::BinaryTableStats;
19+
use chrono::serde::ts_milliseconds::deserialize as from_millis;
20+
use chrono::serde::ts_milliseconds::serialize as to_millis;
1921
use chrono::{DateTime, Utc};
2022
use serde::{Deserialize, Serialize};
2123
use std::fmt::{Display, Formatter};
@@ -48,12 +50,6 @@ impl BinaryRow {
4850
}
4951
}
5052

51-
/// TODO: implement me.
52-
/// The statistics for columns, supports the following stats.
53-
///
54-
/// Impl References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStats.java>
55-
type SimpleStats = ();
56-
5753
/// The Source of a file.
5854
/// TODO: move me to the manifest module.
5955
///
@@ -72,25 +68,43 @@ pub enum FileSource {
7268
#[derive(Debug, Eq, PartialEq, Serialize, Deserialize)]
7369
#[serde(rename_all = "camelCase")]
7470
pub struct DataFileMeta {
71+
#[serde(rename = "_FILE_NAME")]
7572
pub file_name: String,
73+
#[serde(rename = "_FILE_SIZE")]
7674
pub file_size: i64,
7775
// row_count tells the total number of rows (including add & delete) in this file.
76+
#[serde(rename = "_ROW_COUNT")]
7877
pub row_count: i64,
79-
pub min_key: BinaryRow,
80-
pub max_key: BinaryRow,
81-
pub key_stats: SimpleStats,
82-
pub value_stats: SimpleStats,
78+
#[serde(rename = "_MIN_KEY", with = "serde_bytes")]
79+
pub min_key: Vec<u8>,
80+
#[serde(rename = "_MAX_KEY", with = "serde_bytes")]
81+
pub max_key: Vec<u8>,
82+
#[serde(rename = "_KEY_STATS")]
83+
pub key_stats: BinaryTableStats,
84+
#[serde(rename = "_VALUE_STATS")]
85+
pub value_stats: BinaryTableStats,
86+
#[serde(rename = "_MIN_SEQUENCE_NUMBER")]
8387
pub min_sequence_number: i64,
88+
#[serde(rename = "_MAX_SEQUENCE_NUMBER")]
8489
pub max_sequence_number: i64,
90+
#[serde(rename = "_SCHEMA_ID")]
8591
pub schema_id: i64,
92+
#[serde(rename = "_LEVEL")]
8693
pub level: i32,
94+
#[serde(rename = "_EXTRA_FILES")]
8795
pub extra_files: Vec<String>,
96+
#[serde(
97+
rename = "_CREATION_TIME",
98+
serialize_with = "to_millis",
99+
deserialize_with = "from_millis"
100+
)]
88101
pub creation_time: DateTime<Utc>,
102+
#[serde(rename = "_DELETE_ROW_COUNT")]
89103
// rowCount = add_row_count + delete_row_count.
90104
pub delete_row_count: Option<i64>,
91105
// file index filter bytes, if it is small, store in data file meta
106+
#[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")]
92107
pub embedded_index: Option<Vec<u8>>,
93-
pub file_source: Option<FileSource>,
94108
}
95109

96110
impl Display for DataFileMeta {
@@ -99,7 +113,5 @@ impl Display for DataFileMeta {
99113
}
100114
}
101115

102-
impl DataFileMeta {
103-
// TODO: implement me
104-
pub const SCHEMA: RowType = RowType::new(vec![]);
105-
}
116+
#[allow(dead_code)]
117+
impl DataFileMeta {}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::spec::DataFileMeta;
19+
use serde::Deserialize;
20+
use serde_repr::{Deserialize_repr, Serialize_repr};
21+
use serde_with::serde_derive::Serialize;
22+
23+
/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file.
24+
///
25+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java#L58>
26+
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27+
pub struct Identifier {
28+
pub partition: Vec<u8>,
29+
pub bucket: i32,
30+
pub level: i32,
31+
pub file_name: String,
32+
}
33+
34+
/// Kind of a file.
35+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/FileKind.java>
36+
#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)]
37+
#[repr(u8)]
38+
pub enum FileKind {
39+
Add = 0,
40+
Delete = 1,
41+
}
42+
43+
/// Entry of a manifest file, representing an addition / deletion of a data file.
44+
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java>
45+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
46+
pub struct ManifestEntry {
47+
#[serde(rename = "_KIND")]
48+
kind: FileKind,
49+
50+
#[serde(rename = "_PARTITION", with = "serde_bytes")]
51+
partition: Vec<u8>,
52+
53+
#[serde(rename = "_BUCKET")]
54+
bucket: i32,
55+
56+
#[serde(rename = "_TOTAL_BUCKETS")]
57+
total_buckets: i32,
58+
59+
#[serde(rename = "_FILE")]
60+
file: DataFileMeta,
61+
62+
#[serde(rename = "_VERSION")]
63+
version: i32,
64+
}
65+
66+
#[allow(dead_code)]
67+
impl ManifestEntry {
68+
fn kind(&self) -> &FileKind {
69+
&self.kind
70+
}
71+
72+
fn partition(&self) -> &Vec<u8> {
73+
&self.partition
74+
}
75+
76+
fn bucket(&self) -> i32 {
77+
self.bucket
78+
}
79+
80+
fn level(&self) -> i32 {
81+
self.file.level
82+
}
83+
84+
fn file_name(&self) -> &str {
85+
&self.file.file_name
86+
}
87+
88+
fn min_key(&self) -> &Vec<u8> {
89+
&self.file.min_key
90+
}
91+
92+
fn max_key(&self) -> &Vec<u8> {
93+
&self.file.max_key
94+
}
95+
96+
fn identifier(&self) -> Identifier {
97+
Identifier {
98+
partition: self.partition.clone(),
99+
bucket: self.bucket,
100+
level: self.file.level,
101+
file_name: self.file.file_name.clone(),
102+
}
103+
}
104+
105+
pub fn total_buckets(&self) -> i32 {
106+
self.total_buckets
107+
}
108+
109+
pub fn file(&self) -> &DataFileMeta {
110+
&self.file
111+
}
112+
113+
pub fn new(
114+
kind: FileKind,
115+
partition: Vec<u8>,
116+
bucket: i32,
117+
total_buckets: i32,
118+
file: DataFileMeta,
119+
version: i32,
120+
) -> Self {
121+
ManifestEntry {
122+
kind,
123+
partition,
124+
bucket,
125+
total_buckets,
126+
file,
127+
version,
128+
}
129+
}
130+
}

crates/paimon/src/spec/manifest_file_meta.rs

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::spec::stats::BinaryTableStats;
1819
use serde::{Deserialize, Serialize};
1920
use std::fmt::{Display, Formatter};
2021

@@ -128,61 +129,3 @@ impl Display for ManifestFileMeta {
128129
)
129130
}
130131
}
131-
132-
/// The statistics for columns, supports the following stats.
133-
///
134-
/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting.
135-
///
136-
/// Impl Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-core/src/main/java/org/apache/paimon/stats/FieldStatsArraySerializer.java#L111>
137-
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
138-
pub struct BinaryTableStats {
139-
/// the minimum values of the columns
140-
#[serde(rename = "_MIN_VALUES", with = "serde_bytes")]
141-
min_values: Vec<u8>,
142-
143-
/// the maximum values of the columns
144-
#[serde(rename = "_MAX_VALUES", with = "serde_bytes")]
145-
max_values: Vec<u8>,
146-
147-
/// the number of nulls of the columns
148-
#[serde(rename = "_NULL_COUNTS")]
149-
null_counts: Vec<i64>,
150-
}
151-
152-
impl BinaryTableStats {
153-
/// Get the minimum values of the columns
154-
#[inline]
155-
pub fn min_values(&self) -> &[u8] {
156-
&self.min_values
157-
}
158-
159-
/// Get the maximum values of the columns
160-
#[inline]
161-
pub fn max_values(&self) -> &[u8] {
162-
&self.max_values
163-
}
164-
165-
/// Get the number of nulls of the columns
166-
#[inline]
167-
pub fn null_counts(&self) -> &Vec<i64> {
168-
&self.null_counts
169-
}
170-
171-
pub fn new(
172-
min_values: Vec<u8>,
173-
max_values: Vec<u8>,
174-
null_counts: Vec<i64>,
175-
) -> BinaryTableStats {
176-
Self {
177-
min_values,
178-
max_values,
179-
null_counts,
180-
}
181-
}
182-
}
183-
184-
impl Display for BinaryTableStats {
185-
fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result {
186-
todo!()
187-
}
188-
}

0 commit comments

Comments
 (0)