Skip to content

Commit 2bb93ec

Browse files
author
Devdutt Shenoi
committed
revert: all writes into a single file
1 parent 9c10041 commit 2bb93ec

File tree

2 files changed

+10
-52
lines changed

2 files changed

+10
-52
lines changed

src/parseable/staging/writer.rs

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow_ipc::writer::FileWriter;
2929
use arrow_schema::Schema;
3030
use arrow_select::concat::concat_batches;
3131
use itertools::Itertools;
32+
use tracing::trace;
3233

3334
use crate::parseable::{ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION};
3435
use crate::utils::arrow::adapt_batch;
@@ -40,17 +41,12 @@ pub struct DiskWriter<const N: usize> {
4041
inner: FileWriter<BufWriter<File>>,
4142
/// Used to ensure un"finish"ed arrow files are renamed on "finish"
4243
path_prefix: String,
43-
/// Number of rows written onto disk
44-
count: usize,
45-
/// Denotes distinct files created with similar schema during the same minute by the same ingestor
46-
file_id: usize,
4744
}
4845

4946
impl<const N: usize> DiskWriter<N> {
5047
pub fn new(path_prefix: String, schema: &Schema) -> Result<Self, StagingError> {
51-
let file_id = 0;
5248
// Live writes happen into partfile
53-
let partfile_path = format!("{path_prefix}.{file_id}.{ARROW_PART_FILE_EXTENSION}");
49+
let partfile_path = format!("{path_prefix}.{ARROW_PART_FILE_EXTENSION}");
5450
let file = OpenOptions::new()
5551
.create(true)
5652
.append(true)
@@ -60,29 +56,12 @@ impl<const N: usize> DiskWriter<N> {
6056
inner: FileWriter::try_new_buffered(file, schema)
6157
.expect("File and RecordBatch both are checked"),
6258
path_prefix,
63-
count: 0,
64-
file_id,
6559
})
6660
}
6761

68-
/// Appends records into an `{file_id}.part.arrows` files,
69-
/// flushing onto disk and increments count on breaching row limit.
62+
/// Appends records into a `.part.arrows` file
7063
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
71-
if self.count + rb.num_rows() >= N {
72-
let left = N - self.count;
73-
let left_slice = rb.slice(0, left);
74-
self.inner.write(&left_slice)?;
75-
self.finish()?;
76-
77-
// Write leftover records into new files until all have been written
78-
if left < rb.num_rows() {
79-
let right = rb.num_rows() - left;
80-
self.write(&rb.slice(left, right))?;
81-
}
82-
} else {
83-
self.inner.write(rb)?;
84-
self.count += rb.num_rows();
85-
}
64+
self.inner.write(rb)?;
8665

8766
Ok(())
8867
}
@@ -91,32 +70,12 @@ impl<const N: usize> DiskWriter<N> {
9170
pub fn finish(&mut self) -> Result<(), StagingError> {
9271
self.inner.finish()?;
9372

94-
let partfile_path = format!(
95-
"{}.{}.{ARROW_PART_FILE_EXTENSION}",
96-
self.path_prefix, self.file_id
97-
);
98-
let arrows_path = format!(
99-
"{}.{}.{ARROW_FILE_EXTENSION}",
100-
self.path_prefix, self.file_id
101-
);
73+
let partfile_path = format!("{}.{ARROW_PART_FILE_EXTENSION}", self.path_prefix);
74+
let arrows_path = format!("{}.{ARROW_FILE_EXTENSION}", self.path_prefix);
10275

10376
// Rename from part file to finished arrows file
104-
std::fs::rename(partfile_path, arrows_path)?;
105-
106-
self.file_id += 1;
107-
self.count = 0;
108-
109-
let partfile_path = format!(
110-
"{}.{}.{ARROW_PART_FILE_EXTENSION}",
111-
self.path_prefix, self.file_id
112-
);
113-
let file = OpenOptions::new()
114-
.create(true)
115-
.append(true)
116-
.open(partfile_path)?;
117-
118-
self.inner = FileWriter::try_new_buffered(file, self.inner.schema())
119-
.expect("File and RecordBatch both are checked");
77+
std::fs::rename(partfile_path, &arrows_path)?;
78+
trace!("Finished arrows file: {arrows_path}");
12079

12180
Ok(())
12281
}

src/parseable/streams.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,12 @@ use super::{
6969
/// Regex pattern for parsing arrow file names.
7070
///
7171
/// # Format
72-
/// The expected format is: `<schema_key>.<front_part>.<file_id>.data.arrows`
72+
/// The expected format is: `<schema_key>.<front_part>.data.arrows`
7373
/// where:
7474
/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value
7575
/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2")
7676
/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition
7777
/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76")
78-
/// - file_id: Numeric id for individual arrows files
7978
///
8079
/// # Limitations
8180
/// - Partition keys and values must only contain alphanumeric characters
@@ -85,7 +84,7 @@ use super::{
8584
/// Valid: "key1=value1,key2=value2"
8685
/// Invalid: "key1=special!value,key2=special#value"
8786
static ARROWS_NAME_STRUCTURE: Lazy<Regex> = Lazy::new(|| {
88-
Regex::new(r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.\d+\.data\.arrows$").expect("Validated regex")
87+
Regex::new(r"^[a-zA-Z0-9&=]+\.(?P<front>\S+)\.data\.arrows$").expect("Validated regex")
8988
});
9089

9190
/// Returns the filename for parquet if provided arrows file path is valid as per our expectation

0 commit comments

Comments
 (0)