diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index 256133841..ced53ce36 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -30,6 +30,6 @@ pub enum StagingError { ObjectStorage(#[from] std::io::Error), #[error("Could not generate parquet file")] Create, - // #[error("Metadata Error: {0}")] - // Metadata(#[from] MetadataError), + #[error("Too many rows: {0}")] + RowLimit(usize), } diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 372bfa885..6def41af0 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -19,51 +19,90 @@ use std::{ fs::{remove_file, File}, - io::{self, BufReader, Read, Seek, SeekFrom}, - path::PathBuf, + io::BufReader, + path::{Path, PathBuf}, sync::Arc, - vec::IntoIter, }; use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; -use arrow_schema::Schema; -use byteorder::{LittleEndian, ReadBytesExt}; +use arrow_ipc::reader::FileReader; +use arrow_schema::{ArrowError, Schema, SchemaRef}; use itertools::kmerge_by; -use tracing::{error, warn}; +use tracing::error; use crate::{ event::DEFAULT_TIMESTAMP_KEY, utils::arrow::{adapt_batch, reverse}, }; +/// `ReverseReader` provides an iterator over record batches in an Arrow IPC file format +/// in reverse order (from the last batch to the first). +/// +/// This is useful for scenarios where you need to process the most recent data first, +/// or when implementing time-series data exploration that starts with the latest records. +#[derive(Debug)] +pub struct ReverseReader { + inner: FileReader>, + /// Current index for iteration (starts from the last batch) + idx: usize, +} + +impl ReverseReader { + /// Creates a new `ReverseReader` from given path. + pub fn try_new(path: impl AsRef) -> Result { + let inner = FileReader::try_new(BufReader::new(File::open(path).unwrap()), None)?; + let idx = inner.num_batches(); + + Ok(Self { inner, idx }) + } + + /// Returns the schema of the underlying Arrow file. + pub fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + +impl Iterator for ReverseReader { + type Item = Result; + + /// Returns the next record batch in reverse order(latest to the first) from arrows file. + /// + /// Returns `None` when all batches have been processed. + fn next(&mut self) -> Option { + if self.idx == 0 { + return None; + } + + self.idx -= 1; + if let Err(e) = self.inner.set_index(self.idx) { + return Some(Err(e)); + } + + self.inner.next() + } +} + #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>>, + pub readers: Vec, } impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { - let mut readers = Vec::with_capacity(files.len()); + pub fn new(paths: &[PathBuf]) -> Self { + let mut readers = Vec::with_capacity(paths.len()); - for file in files { + for path in paths { //remove empty files before reading - if file.metadata().unwrap().len() == 0 { - error!("Invalid file detected, removing it: {:?}", file); - remove_file(file).unwrap(); + if path.metadata().unwrap().len() == 0 { + error!("Invalid file detected, removing it: {path:?}"); + remove_file(path).unwrap(); } else { - let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - + let reader = ReverseReader::try_new(path).unwrap(); readers.push(reader); } } - Ok(Self { readers }) + Self { readers } } pub fn merged_schema(&self) -> Schema { @@ -74,35 +113,6 @@ impl MergedRecordReader { ) .unwrap() } -} - -#[derive(Debug)] -pub struct MergedReverseRecordReader { - pub readers: Vec>>>, -} - -impl MergedReverseRecordReader { - pub fn try_new(file_paths: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(file_paths.len()); - for path in file_paths { - let Ok(file) = File::open(path) else { - warn!("Error when trying to read file: {path:?}"); - continue; - }; - - let reader = match get_reverse_reader(file) { - Ok(r) => r, - Err(err) => { - error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); - continue; - } - }; - - readers.push(reader); - } - - Self { readers } - } pub fn merged_iter( self, @@ -119,15 +129,6 @@ impl MergedReverseRecordReader { .map(|batch| reverse(&batch)) .map(move |batch| adapt_batch(&schema, &batch)) } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } } fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { @@ -146,6 +147,7 @@ fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> None => get_default_timestamp_millis(batch), } } + fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { match batch .column(0) @@ -165,165 +167,12 @@ fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { } } -/// OffsetReader takes in a reader and list of offset and sizes and -/// provides a reader over the file by reading only the offsets -/// from start of the list to end. -/// -/// Safety Invariant: Reader is already validated and all offset and limit are valid to read. -/// -/// On empty list the reader returns no bytes read. -pub struct OffsetReader { - reader: R, - offset_list: IntoIter<(u64, usize)>, - current_offset: u64, - current_size: usize, - buffer: Vec, - buffer_position: usize, - finished: bool, -} - -impl OffsetReader { - fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self { - let mut offset_list = offset_list.into_iter(); - let mut finished = false; - - let (current_offset, current_size) = offset_list.next().unwrap_or_default(); - if current_offset == 0 && current_size == 0 { - finished = true - } - - OffsetReader { - reader, - offset_list, - current_offset, - current_size, - buffer: vec![0; 4096], - buffer_position: 0, - finished, - } - } -} - -impl Read for OffsetReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let offset = self.current_offset; - let size = self.current_size; - - if self.finished { - return Ok(0); - } - // on empty buffer load current data represented by - // current_offset and current_size into self buffer - if self.buffer_position == 0 { - self.reader.seek(SeekFrom::Start(offset))?; - // resize for current message - if self.buffer.len() < size { - self.buffer.resize(size, 0) - } - self.reader.read_exact(&mut self.buffer[0..size])?; - } - - let remaining_bytes = size - self.buffer_position; - let max_read = usize::min(remaining_bytes, buf.len()); - - // Copy data from the buffer to the provided buffer - let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read]; - buf[..max_read].copy_from_slice(read_data); - - self.buffer_position += max_read; - - if self.buffer_position >= size { - // If we've read the entire section, move to the next offset - match self.offset_list.next() { - Some((offset, size)) => { - self.current_offset = offset; - self.current_size = size; - self.buffer_position = 0; - } - None => { - // iter is exhausted, no more read can be done - self.finished = true - } - } - } - - Ok(max_read) - } -} - -pub fn get_reverse_reader( - mut reader: T, -) -> Result>>, io::Error> { - let mut offset = 0; - let mut messages = Vec::new(); - - while let Some(res) = find_limit_and_type(&mut reader).transpose() { - match res { - Ok((header, size)) => { - messages.push((header, offset, size)); - offset += size; - } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof && !messages.is_empty() => break, - Err(err) => return Err(err), - } - } - - // reverse everything leaving the first because it has schema message. - messages[1..].reverse(); - let messages = messages - .into_iter() - .map(|(_, offset, size)| (offset as u64, size)) - .collect(); - - // reset reader - reader.rewind()?; - - Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) -} - -// return limit for -fn find_limit_and_type( - reader: &mut (impl Read + Seek), -) -> Result, io::Error> { - let mut size = 0; - let marker = reader.read_u32::()?; - size += 4; - - if marker != 0xFFFFFFFF { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid Continuation Marker", - )); - } - - let metadata_size = reader.read_u32::()? as usize; - size += 4; - - if metadata_size == 0x00000000 { - return Ok(None); - } - - let mut message = vec![0u8; metadata_size]; - reader.read_exact(&mut message)?; - size += metadata_size; - - let message = unsafe { root_as_message_unchecked(&message) }; - let header = message.header_type(); - let message_size = message.bodyLength(); - size += message_size as usize; - - let padding = (8 - (size % 8)) % 8; - reader.seek(SeekFrom::Current(padding as i64 + message_size))?; - size += padding; - - Ok(Some((header, size))) -} - #[cfg(test)] mod tests { use std::{ - io::{self, Cursor, Read}, - path::Path, + fs::File, + io::{self, Write}, + path::{Path, PathBuf}, sync::Arc, }; @@ -331,27 +180,29 @@ mod tests { cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, }; - use arrow_ipc::writer::{ - write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, - }; - use arrow_schema::{DataType, Field, Schema}; + use arrow_ipc::{reader::FileReader, writer::FileWriter}; + use arrow_schema::{ArrowError, DataType, Field, Schema}; use chrono::Utc; use temp_dir::TempDir; use crate::{ parseable::staging::{ - reader::{MergedReverseRecordReader, OffsetReader}, + reader::{MergedRecordReader, ReverseReader}, writer::DiskWriter, }, utils::time::TimeRange, OBJECT_STORE_DATA_GRANULARITY, }; - use super::get_reverse_reader; - fn rb(rows: usize) -> RecordBatch { - let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); - let array2: Arc = Arc::new(Float64Array::from_iter((0..rows).map(|x| x as f64))); + let array1: Arc = Arc::new(Int64Array::from_iter(0..rows as i64)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..rows as i64).map(|i| { + if i == 0 { + 0.0 + } else { + 1.0 / i as f64 + } + }))); let array3: Arc = Arc::new(StringArray::from_iter( (0..rows).map(|x| Some(format!("str {}", x))), )); @@ -364,100 +215,77 @@ mod tests { .unwrap() } - fn write_mem(rbs: &[RecordBatch]) -> Vec { - let buf = Vec::new(); - let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap(); + fn write_file(rbs: &[RecordBatch], path: &Path) { + let file = File::create(path).unwrap(); + let mut writer = FileWriter::try_new_buffered(file, &rbs[0].schema()).unwrap(); for rb in rbs { writer.write(rb).unwrap() } - writer.into_inner().unwrap() + writer.finish().unwrap(); } #[test] fn test_empty_row() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); let rb = rb(0); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + write_file(&[rb], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 0); } #[test] fn test_one_row() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); let rb = rb(1); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + write_file(&[rb], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 1); } #[test] fn test_multiple_row_multiple_rbs() { - let buf = write_mem(&[rb(1), rb(2), rb(3)]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); + write_file(&[rb(1), rb(2), rb(3)], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_rows(), 1); let col1_val: Vec = rb .column(0) .as_primitive::() .iter() .flatten() .collect(); - assert_eq!(col1_val, vec![0, 1, 2]); + assert_eq!(col1_val, vec![0]); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 2); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1]); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 1); - } - - #[test] - fn manual_write() { - let error_on_replacement = true; - let options = IpcWriteOptions::default(); - let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); - let data_gen = IpcDataGenerator {}; - - let mut buf = Vec::new(); - let rb1 = rb(1); - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut buf, schema, &options).unwrap(); - - for i in (1..=3).cycle().skip(1).take(10000) { - let (_, encoded_message) = data_gen - .encoded_batch(&rb(i), &mut dictionary_tracker, &options) - .unwrap(); - write_message(&mut buf, encoded_message, &options).unwrap(); - } - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut buf, schema, &options).unwrap(); - - let buf = Cursor::new(buf); - let reader = get_reverse_reader(buf).unwrap().flatten(); - - let mut sum = 0; - for rb in reader { - sum += 1; - assert!(rb.num_rows() > 0); - } - - assert_eq!(sum, 10000); + assert_eq!(rb.num_rows(), 3); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1, 2]); } // Helper function to create test record batches @@ -500,33 +328,6 @@ mod tests { Ok(()) } - #[test] - fn test_offset_reader() { - // Create a simple binary file in memory - let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let cursor = Cursor::new(data); - - // Define offset list: (offset, size) - let offsets = vec![(2, 3), (7, 2)]; // Read bytes 2-4 (3, 4, 5) and then 7-8 (8, 9) - - let mut reader = OffsetReader::new(cursor, offsets); - let mut buffer = [0u8; 10]; - - // First read should get bytes 3, 4, 5 - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 3); - assert_eq!(&buffer[..read_bytes], &[3, 4, 5]); - - // Second read should get bytes 8, 9 - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 2); - assert_eq!(&buffer[..read_bytes], &[8, 9]); - - // No more data - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 0); - } - #[test] fn test_merged_reverse_record_reader() -> io::Result<()> { let dir = TempDir::new().unwrap(); @@ -545,7 +346,7 @@ mod tests { write_test_batches(&file_path, &schema, &batches)?; // Now read them back in reverse order - let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + let mut reader = MergedRecordReader::new(&[file_path]).merged_iter(schema, None); // We should get batches in reverse order: 3, 2, 1 // But first message should be schema, so we'll still read them in order @@ -589,47 +390,6 @@ mod tests { Ok(()) } - #[test] - fn test_empty_offset_list() { - // Test with empty offset list - let data = vec![1, 2, 3, 4, 5]; - let cursor = Cursor::new(data); - - let mut reader = OffsetReader::new(cursor, vec![]); - let mut buffer = [0u8; 10]; - - // Should return 0 bytes read - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 0); - } - - #[test] - fn test_partial_reads() { - // Test reading with a buffer smaller than the section size - let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; - let cursor = Cursor::new(data); - - // One offset of 5 bytes - let offsets = vec![(2, 5)]; // Read bytes 2-6 (3, 4, 5, 6, 7) - - let mut reader = OffsetReader::new(cursor, offsets); - let mut buffer = [0u8; 3]; // Buffer smaller than the 5 bytes we want to read - - // First read should get first 3 bytes: 3, 4, 5 - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 3); - assert_eq!(&buffer[..read_bytes], &[3, 4, 5]); - - // Second read should get remaining 2 bytes: 6, 7 - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 2); - assert_eq!(&buffer[..read_bytes], &[6, 7]); - - // No more data - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 0); - } - #[test] fn test_get_reverse_reader_single_message() -> io::Result<()> { let dir = TempDir::new().unwrap(); @@ -646,7 +406,7 @@ mod tests { // Write batch to file write_test_batches(&file_path, &schema, &[batch])?; - let mut reader = MergedReverseRecordReader::try_new(&[file_path]).merged_iter(schema, None); + let mut reader = MergedRecordReader::new(&[file_path]).merged_iter(schema, None); // Should get the batch let result_batch = reader.next().expect("Failed to read batch"); @@ -663,25 +423,146 @@ mod tests { Ok(()) } + fn create_test_arrow_file(path: &PathBuf, num_batches: usize) -> Result<(), ArrowError> { + // Create schema + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + let schema_ref = std::sync::Arc::new(schema); + + // Create file and writer + let file = File::create(path)?; + let mut writer = FileWriter::try_new(file, &schema_ref)?; + + // Create and write batches + for i in 0..num_batches { + let id_array = + Int32Array::from(vec![i as i32 * 10, i as i32 * 10 + 1, i as i32 * 10 + 2]); + let name_array = StringArray::from(vec![ + format!("batch_{i}_name_0"), + format!("batch_{i}_name_1"), + format!("batch_{i}_name_2"), + ]); + + let batch = RecordBatch::try_new( + schema_ref.clone(), + vec![ + std::sync::Arc::new(id_array), + std::sync::Arc::new(name_array), + ], + )?; + + writer.write(&batch)?; + } + + writer.finish()?; + Ok(()) + } + #[test] - fn test_large_buffer_resizing() { - // Test that buffer resizes correctly for large sections - let data = vec![1; 10000]; // 10KB of data - let cursor = Cursor::new(data); + fn test_reverse_reader_creation() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); + + // Create test file with 3 batches + create_test_arrow_file(&file_path, 3).unwrap(); + + // Test successful creation + let reader = ReverseReader::try_new(&file_path); + assert!(reader.is_ok()); + + // Test schema retrieval + let reader = reader.unwrap(); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "name"); + } - // One large offset (8KB) - let offsets = vec![(1000, 8000)]; + #[test] + fn test_reverse_reader_iteration() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); - let mut reader = OffsetReader::new(cursor, offsets); - let mut buffer = [0u8; 10000]; + // Create test file with 3 batches + create_test_arrow_file(&file_path, 3).unwrap(); - // Should read 8KB - let read_bytes = reader.read(&mut buffer).unwrap(); - assert_eq!(read_bytes, 8000); + // Create reader and iterate + let reader = ReverseReader::try_new(&file_path).unwrap(); + let batches: Vec<_> = reader.collect::, _>>().unwrap(); - // All bytes should be 1 - for i in 0..read_bytes { - assert_eq!(buffer[i], 1); - } + // Verify correct number of batches + assert_eq!(batches.len(), 3); + + // Verify reverse order + // Batch 2 (last written, first read) + let batch0 = &batches[0]; + assert_eq!(batch0.num_columns(), 2); + let id_array = batch0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 20); + + // Batch 1 (middle) + let batch1 = &batches[1]; + let id_array = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 10); + + // Batch 0 (first written, last read) + let batch2 = &batches[2]; + let id_array = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 0); + } + + #[test] + fn test_empty_file() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("empty.arrow"); + + // Create empty file with schema but no batches + create_test_arrow_file(&file_path, 0).unwrap(); + + let reader = ReverseReader::try_new(&file_path).unwrap(); + let batches: Vec<_> = reader.collect::, _>>().unwrap(); + + // Should be empty + assert_eq!(batches.len(), 0); + } + + #[test] + fn test_invalid_file() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("invalid.txt"); + + // Create a non-Arrow file + let mut file = File::create(&file_path).unwrap(); + writeln!(&mut file, "This is not an Arrow file").unwrap(); + + // Attempting to create a reader should fail + let reader = ReverseReader::try_new(&file_path); + assert!(reader.is_err()); + } + + #[test] + fn test_num_batches() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); + + // Create test file with 5 batches + create_test_arrow_file(&file_path, 5).unwrap(); + + let reader = ReverseReader::try_new(&file_path).unwrap(); + assert_eq!(reader.count(), 5); } } diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 6397e13e9..dae954662 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -26,7 +26,7 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; +use arrow_ipc::writer::FileWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use chrono::Utc; @@ -46,8 +46,9 @@ pub struct Writer { pub disk: HashMap, } +/// Context regarding `.arrows` file being persisted onto disk pub struct DiskWriter { - inner: StreamWriter>, + inner: FileWriter>, path: PathBuf, range: TimeRange, } @@ -66,7 +67,7 @@ impl DiskWriter { .truncate(true) .create(true) .open(&path)?; - let inner = StreamWriter::try_new_buffered(file, schema)?; + let inner = FileWriter::try_new_buffered(file, schema)?; Ok(Self { inner, path, range }) } @@ -175,20 +176,15 @@ impl MutableBuffer { fn push(&mut self, rb: &RecordBatch) -> Option> { if self.inner.len() + rb.num_rows() >= N { let left = N - self.inner.len(); - let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); - let right_slice = if left < rb.num_rows() { - Some(rb.slice(left, right)) - } else { - None - }; self.inner.push(left_slice); // take all records let src = Vec::with_capacity(self.inner.len()); let inner = std::mem::replace(&mut self.inner, src); - if let Some(right_slice) = right_slice { - self.inner.push(right_slice); + if left < rb.num_rows() { + let right = rb.num_rows() - left; + self.inner.push(rb.slice(left, right)); } Some(inner) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..012a6c4af 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -60,13 +60,16 @@ use crate::{ use super::{ staging::{ - reader::{MergedRecordReader, MergedReverseRecordReader}, + reader::MergedRecordReader, writer::{DiskWriter, Writer}, StagingError, }, LogStream, ARROW_FILE_EXTENSION, }; +// ~16K rows is default in-memory limit for each recordbatch +const MAX_RECORD_BATCH_SIZE: usize = 16384; + /// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { let filename = path.file_stem()?.to_str()?; @@ -99,6 +102,7 @@ pub struct Stream { pub metadata: RwLock, pub data_path: PathBuf, pub options: Arc, + /// Writer with a ~16K rows limit for optimal I/O performance. pub writer: Mutex, pub ingestor_id: Option, } @@ -132,6 +136,11 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { + let row_count = record.num_rows(); + if row_count > MAX_RECORD_BATCH_SIZE { + return Err(StagingError::RowLimit(row_count)); + } + let mut guard = self.writer.lock().unwrap(); if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = @@ -312,7 +321,7 @@ impl Stream { self.stream_name ); - let time_partition = self.get_time_partition(); + let time_partition: Option = self.get_time_partition(); let custom_partition = self.get_custom_partition(); // read arrow files on disk @@ -329,8 +338,7 @@ impl Stream { // if yes, then merge them and save if let Some(mut schema) = schema { - let static_schema_flag = self.get_static_schema_flag(); - if !static_schema_flag { + if !self.get_static_schema_flag() { // schema is dynamic, read from staging and merge if present // need to add something before .schema to make the file have an extension of type `schema` @@ -358,8 +366,26 @@ impl Stream { Ok(()) } - pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { - self.writer.lock().unwrap().mem.recordbatch_cloned(schema) + /// Returns records batches as found in staging + pub fn recordbatches_cloned( + &self, + schema: &Arc, + time_partition: Option, + ) -> Vec { + // All records found in memory + let mut records = self.writer.lock().unwrap().mem.recordbatch_cloned(schema); + // Append records batches picked up from `.arrows` files + let arrow_files = self.arrow_files(); + let record_reader = MergedRecordReader::new(&arrow_files); + if record_reader.readers.is_empty() { + return records; + } + let mut from_file = record_reader + .merged_iter(schema.clone(), time_partition) + .collect(); + records.append(&mut from_file); + + records } pub fn clear(&self) { @@ -467,7 +493,7 @@ impl Stream { // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + let record_reader = MergedRecordReader::new(&arrow_files); if record_reader.readers.is_empty() { continue; } @@ -533,13 +559,12 @@ impl Stream { pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + let record_reader = MergedRecordReader::new(&staging_files); if record_reader.readers.is_empty() { return current_schema; } let schema = record_reader.merged_schema(); - Schema::try_merge(vec![schema, current_schema]).unwrap() } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 6b6219f91..7a86f9f96 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -234,7 +234,7 @@ impl StandardTableProvider { }; // Staging arrow exection plan - let records = staging.recordbatches_cloned(&self.schema); + let records = staging.recordbatches_cloned(&self.schema, staging.get_time_partition()); let arrow_exec = reversed_mem_table(records, self.schema.clone())? .scan(state, projection, filters, limit) .await?;