diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java index 1ffa170c0ad98..bc8932c3c8833 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java @@ -10,12 +10,14 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; /** * Type-safe handle for native Parquet writer with lifecycle management. */ public class NativeParquetWriter implements Closeable { + private final AtomicBoolean writerClosed = new AtomicBoolean(false); private final String filePath; /** @@ -47,15 +49,23 @@ public void flush() throws IOException { RustBridge.flushToDisk(filePath); } + private ParquetFileMetadata metadata; + @Override public void close() { - try { - RustBridge.closeWriter(filePath); - } catch (IOException e) { - throw new RuntimeException("Failed to close Parquet writer for " + filePath, e); + if (writerClosed.compareAndSet(false, true)) { + try { + metadata = RustBridge.closeWriter(filePath); + } catch (IOException e) { + throw new RuntimeException("Failed to close Parquet writer for " + filePath, e); + } } } + public ParquetFileMetadata getMetadata() { + return metadata; + } + public String getFilePath() { return filePath; } diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ParquetFileMetadata.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ParquetFileMetadata.java new file mode 100644 index 0000000000000..fc309857be290 --- /dev/null +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ParquetFileMetadata.java @@ -0,0 +1,78 @@ +package com.parquet.parquetdataformat.bridge; + +/** + * Represents metadata information for a Parquet file. + *

+ * This class holds the essential metadata extracted from a Parquet file + * when the writer is closed, providing visibility into the file's characteristics. + */ +public record ParquetFileMetadata(int version, long numRows, String createdBy) { + /** + * Constructs a new ParquetFileMetadata instance. + * + * @param version the Parquet format version used + * @param numRows the total number of rows in the file + * @param createdBy the application/library that created the file (can be null) + */ + public ParquetFileMetadata { + } + + /** + * Gets the Parquet format version. + * + * @return the version number + */ + @Override + public int version() { + return version; + } + + /** + * Gets the total number of rows in the Parquet file. + * + * @return the number of rows + */ + @Override + public long numRows() { + return numRows; + } + + /** + * Gets information about what created this Parquet file. + * + * @return the creator information, or null if not available + */ + @Override + public String createdBy() { + return createdBy; + } + + @Override + public String toString() { + return "ParquetFileMetadata{" + + "version=" + version + + ", numRows=" + numRows + + ", createdBy='" + createdBy + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ParquetFileMetadata that = (ParquetFileMetadata) o; + + if (version != that.version) return false; + if (numRows != that.numRows) return false; + return createdBy != null ? createdBy.equals(that.createdBy) : that.createdBy == null; + } + + @Override + public int hashCode() { + int result = version; + result = 31 * result + (int) (numRows ^ (numRows >>> 32)); + result = 31 * result + (createdBy != null ? createdBy.hashCode() : 0); + return result; + } +} diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java index 408ef74ea44f7..ebc7af2f7a2bd 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java @@ -31,8 +31,9 @@ public class RustBridge { // Enhanced native methods that handle validation and provide better error reporting public static native void createWriter(String file, long schemaAddress) throws IOException; public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException; - public static native void closeWriter(String file) throws IOException; + public static native ParquetFileMetadata closeWriter(String file) throws IOException; public static native void flushToDisk(String file) throws IOException; + public static native ParquetFileMetadata getFileMetadata(String file) throws IOException; public static native long getFilteredNativeBytesUsed(String pathPrefix); diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java index 8f900a4084821..5c404ce0ff586 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java @@ -10,19 +10,16 @@ import com.parquet.parquetdataformat.bridge.ArrowExport; import com.parquet.parquetdataformat.bridge.NativeParquetWriter; +import com.parquet.parquetdataformat.bridge.ParquetFileMetadata; import com.parquet.parquetdataformat.memory.ArrowBufferPool; import com.parquet.parquetdataformat.writer.ParquetDocumentInput; -import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.index.engine.exec.FlushIn; import org.opensearch.index.engine.exec.WriteResult; -import java.io.Closeable; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; /** @@ -107,7 +104,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep } } - public String flush(FlushIn flushIn) throws IOException { + public ParquetFileMetadata flush(FlushIn flushIn) throws IOException { ManagedVSR currentVSR = managedVSR.get(); logger.info("Flush called for {}, row count: {}", fileName, currentVSR.getRowCount()); try { @@ -120,15 +117,17 @@ public String flush(FlushIn flushIn) throws IOException { // Transition VSR to FROZEN state before flushing currentVSR.moveToFrozen(); logger.info("Flushing {} rows for {}", currentVSR.getRowCount(), fileName); + ParquetFileMetadata metadata; // Write through native writer handle try (ArrowExport export = currentVSR.exportToArrow()) { writer.write(export.getArrayAddress(), export.getSchemaAddress()); writer.close(); + metadata = writer.getMetadata(); } - logger.info("Successfully flushed data for {}", fileName); + logger.debug("Successfully flushed data for {} with metadata: {}", fileName, metadata); - return fileName; + return metadata; } catch (Exception e) { logger.error("Error in flush for {}: {}", fileName, e.getMessage(), e); throw new IOException("Failed to flush data: " + e.getMessage(), e); diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java index 84df70879e550..d6820d4df5aec 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java @@ -1,5 +1,6 @@ package com.parquet.parquetdataformat.writer; +import com.parquet.parquetdataformat.bridge.ParquetFileMetadata; import com.parquet.parquetdataformat.memory.ArrowBufferPool; import com.parquet.parquetdataformat.vsr.VSRManager; import org.apache.arrow.vector.types.pojo.Schema; @@ -58,16 +59,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException { @Override public FileInfos flush(FlushIn flushIn) throws IOException { - String fileName = vsrManager.flush(flushIn); + ParquetFileMetadata parquetFileMetadata = vsrManager.flush(flushIn); // no data flushed - if (fileName == null) { + if (file == null) { return FileInfos.empty(); } - Path file = Path.of(fileName); + Path filePath = Path.of(file); WriterFileSet writerFileSet = WriterFileSet.builder() - .directory(file.getParent()) + .directory(filePath.getParent()) .writerGeneration(writerGeneration) - .addFile(file.getFileName().toString()) + .addFile(filePath.getFileName().toString()) + .addNumRows(parquetFileMetadata.numRows()) .build(); return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build(); } diff --git a/modules/parquet-data-format/src/main/rust/src/lib.rs b/modules/parquet-data-format/src/main/rust/src/lib.rs index 1ef5e7c40da6e..8e1e8f19e323d 100644 --- a/modules/parquet-data-format/src/main/rust/src/lib.rs +++ b/modules/parquet-data-format/src/main/rust/src/lib.rs @@ -1,8 +1,8 @@ use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema}; use arrow::record_batch::RecordBatch; use dashmap::DashMap; -use jni::objects::{JClass, JString}; -use jni::sys::{jint, jlong}; +use jni::objects::{JClass, JString, JObject}; +use jni::sys::{jint, jlong, jobject}; use jni::JNIEnv; use lazy_static::lazy_static; use parquet::arrow::ArrowWriter; @@ -10,6 +10,9 @@ use parquet::basic::{Compression, ZstdLevel}; use parquet::file::properties::WriterProperties; use std::fs::File; use std::sync::{Arc, Mutex}; +use parquet::format::FileMetaData as FormatFileMetaData; +use parquet::file::metadata::FileMetaData as FileFileMetaData; +use parquet::file::reader::{FileReader, SerializedFileReader}; pub mod logger; pub mod parquet_merge; @@ -118,7 +121,7 @@ impl NativeParquetWriter { } } - fn close_writer(filename: String) -> Result<(), Box> { + fn close_writer(filename: String) -> Result, Box> { log_info!("[RUST] close_writer called for file: {}", filename); if let Some((_, writer_arc)) = WRITER_MANAGER.remove(&filename) { @@ -126,9 +129,10 @@ impl NativeParquetWriter { Ok(mutex) => { let writer = mutex.into_inner().unwrap(); match writer.close() { - Ok(_) => { - log_info!("[RUST] Successfully closed writer for file: {}", filename); - Ok(()) + Ok(file_metadata) => { + log_info!("[RUST] Successfully closed writer for file: {}, metadata: version={}, num_rows={}\n", + filename, file_metadata.version, file_metadata.num_rows); + Ok(Some(file_metadata)) } Err(e) => { log_error!("[RUST] ERROR: Failed to close writer for file: {}", filename); @@ -195,6 +199,77 @@ impl NativeParquetWriter { Ok(total_memory) } + + fn get_file_metadata(filename: String) -> Result> { + log_debug!("[RUST] get_file_metadata called for file: {}\n", filename); + + // Open the Parquet file + let file = match File::open(&filename) { + Ok(f) => f, + Err(e) => { + log_error!("[RUST] ERROR: Failed to open file {}: {:?}", filename, e); + return Err(format!("File not found: {}", filename).into()); + } + }; + + // Create SerializedFileReader + let reader = match SerializedFileReader::new(file) { + Ok(r) => r, + Err(e) => { + log_error!("[RUST] ERROR: Failed to create Parquet reader for {}: {:?}", filename, e); + return Err(format!("Invalid Parquet file format: {}", e).into()); + } + }; + + // Get metadata from the reader + let parquet_metadata = reader.metadata(); + let file_metadata = parquet_metadata.file_metadata().clone(); + + log_debug!("[RUST] Successfully read metadata from file: {}, version={}, num_rows={}\n", + filename, file_metadata.version(), file_metadata.num_rows()); + + Ok(file_metadata) + } + + fn create_java_metadata<'local>(env: &mut JNIEnv<'local>, metadata: &FormatFileMetaData) -> Result, Box> { + // Find the ParquetFileMetadata class + let class = env.find_class("com/parquet/parquetdataformat/bridge/ParquetFileMetadata")?; + + // Create Java String for created_by (handle None case) + let created_by_jstring = match &metadata.created_by { + Some(created_by) => env.new_string(created_by)?, + None => JObject::null().into(), + }; + + // Create the Java object using new_object with signature + let java_metadata = env.new_object(&class, "(IJLjava/lang/String;)V", &[ + (metadata.version).into(), + (metadata.num_rows).into(), + (&created_by_jstring).into(), + ])?; + + Ok(java_metadata) + } + + fn create_java_metadata_from_file<'local>(env: &mut JNIEnv<'local>, metadata: &FileFileMetaData) -> Result, Box> { + // Find the ParquetFileMetadata class + let class = env.find_class("com/parquet/parquetdataformat/bridge/ParquetFileMetadata")?; + + // Create Java String for created_by (handle None case) + let created_by_jstring = match metadata.created_by() { + Some(created_by) => env.new_string(created_by)?, + None => JObject::null().into(), + }; + + // Create the Java object using new_object with signature + let java_metadata = env.new_object(&class, "(IJLjava/lang/String;)V", &[ + (metadata.version()).into(), + (metadata.num_rows()).into(), + (&created_by_jstring).into(), + ])?; + + Ok(java_metadata) + } } #[unsafe(no_mangle)] @@ -240,11 +315,36 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_clos mut env: JNIEnv, _class: JClass, file: JString -) -> jint { +) -> jobject { let filename: String = env.get_string(&file).expect("Couldn't get java string!").into(); match NativeParquetWriter::close_writer(filename) { - Ok(_) => 0, - Err(_) => -1, + Ok(maybe_metadata) => { + match maybe_metadata { + Some(metadata) => { + match NativeParquetWriter::create_java_metadata(&mut env, &metadata) { + Ok(java_obj) => java_obj.into_raw(), + Err(e) => { + let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e); + log_error!("{}", error_msg.trim()); + // Throw IOException to Java + let _ = env.throw_new("java/io/IOException", "Failed to create metadata object"); + JObject::null().into_raw() + } + } + } + None => { + // No writer was found, but this is not necessarily an error + // Return null to indicate no metadata available + JObject::null().into_raw() + } + } + } + Err(e) => { + log_error!("[RUST] ERROR: Failed to close writer: {:?}\n", e); + // Throw IOException to Java + let _ = env.throw_new("java/io/IOException", &format!("Failed to close writer: {}", e)); + JObject::null().into_raw() + } } } @@ -261,6 +361,38 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_flus } } +#[unsafe(no_mangle)] +pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getFileMetadata( + mut env: JNIEnv, + _class: JClass, + file: JString +) -> jobject { + let filename: String = env.get_string(&file).expect("Couldn't get java string!").into(); + match NativeParquetWriter::get_file_metadata(filename) { + Ok(metadata) => { + match NativeParquetWriter::create_java_metadata_from_file(&mut env, &metadata) { + Ok(java_obj) => java_obj.into_raw(), + Err(e) => { + let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e); + println!("{}", error_msg.trim()); + log_error!("{}", error_msg); + // Throw IOException to Java + let _ = env.throw_new("java/io/IOException", "Failed to create metadata object"); + JObject::null().into_raw() + } + } + } + Err(e) => { + let error_msg = format!("[RUST] ERROR: Failed to read file metadata: {:?}\n", e); + println!("{}", error_msg.trim()); + log_error!("{}", error_msg); + // Throw IOException to Java + let _ = env.throw_new("java/io/IOException", &format!("Failed to read file metadata: {}", e)); + JObject::null().into_raw() + } + } +} + #[unsafe(no_mangle)] pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getFilteredNativeBytesUsed( mut env: JNIEnv, @@ -269,7 +401,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getF ) -> jlong { let prefix: String = env.get_string(&path_prefix).expect("Couldn't get java string!").into(); match NativeParquetWriter::get_filtered_writer_memory_usage(prefix) { - Ok(memory_usage) => memory_usage as jlong, + Ok(memory) => memory as jlong, Err(_) => 0, } } diff --git a/modules/parquet-data-format/src/test/java/com/parquet/parquetdataformat/bridge/RustBridgeTests.java b/modules/parquet-data-format/src/test/java/com/parquet/parquetdataformat/bridge/RustBridgeTests.java new file mode 100644 index 0000000000000..bc0994774117d --- /dev/null +++ b/modules/parquet-data-format/src/test/java/com/parquet/parquetdataformat/bridge/RustBridgeTests.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package com.parquet.parquetdataformat.bridge; + +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Path; +import java.util.Objects; + +public class RustBridgeTests extends OpenSearchTestCase { + + private String getTestFilePath(String fileName) { + // Get the absolute path to test resources + URL resource = getClass().getClassLoader().getResource(Path.of("parquetTestFiles", fileName).toString()); + return Objects.requireNonNull(resource).getPath(); + } + + public void testGetFileMetadata() throws IOException { + try { + String filePath = getTestFilePath("large_file1.parquet"); + System.out.println("DEBUG" + filePath); + ParquetFileMetadata metadata = RustBridge.getFileMetadata(filePath); + + assertNotNull("Metadata should not be null", metadata); + assertTrue("Version should be positive", metadata.version() > 0); + assertTrue("Number of rows should be non-negative", metadata.numRows() >= 0); + + // Log the metadata for verification + logger.info("Small file 1 metadata - Version: {}, NumRows: {}, CreatedBy: {}", + metadata.version(), metadata.numRows(), metadata.createdBy()); + + } catch (UnsatisfiedLinkError e) { + logger.warn("Native library not loaded, skipping test: " + e.getMessage()); + assumeFalse("Native library not available: " + e.getMessage(), true); + } + } + + public void testGetFileMetadataWithNonExistentFile() { + try { + String filePath = "non_existent_file.parquet"; + logger.info("[DEBUG] " + filePath); + IOException exception = expectThrows(IOException.class, () -> { + RustBridge.getFileMetadata(filePath); + }); + + assertNotNull("Exception should not be null", exception); + assertTrue("Exception message should contain relevant error info", + exception.getMessage().contains("Failed to read file metadata") || + exception.getMessage().contains("File not found")); + + } catch (UnsatisfiedLinkError e) { + logger.warn("Native library not loaded, skipping test: " + e.getMessage()); + assumeFalse("Native library not available: " + e.getMessage(), true); + } + } +// + public void testGetFileMetadataWithInvalidFile() throws IOException { + try { + // Create a temporary invalid file + java.nio.file.Path tempFile = java.nio.file.Files.createTempFile("invalid", ".parquet"); + logger.info("[DEBUG] " + tempFile); + java.nio.file.Files.write(tempFile, "This is not a valid parquet file".getBytes()); + + try { + IOException exception = expectThrows(IOException.class, () -> { + RustBridge.getFileMetadata(tempFile.toString()); + }); + + assertNotNull("Exception should not be null", exception); + assertTrue("Exception message should contain relevant error info", + exception.getMessage().contains("Failed to read file metadata") || + exception.getMessage().contains("Invalid Parquet file format")); + + } finally { + // Clean up temp file + java.nio.file.Files.deleteIfExists(tempFile); + } + + } catch (UnsatisfiedLinkError e) { + logger.warn("Native library not loaded, skipping test: " + e.getMessage()); + assumeFalse("Native library not available: " + e.getMessage(), true); + } + } + + public void testGetFileMetadataWithEmptyPath() { + try { + IOException exception = expectThrows(IOException.class, () -> { + RustBridge.getFileMetadata(""); + }); + + assertNotNull("Exception should not be null", exception); + + } catch (UnsatisfiedLinkError e) { + logger.warn("Native library not loaded, skipping test: " + e.getMessage()); + assumeFalse("Native library not available: " + e.getMessage(), true); + } + } +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java index 8e0f62f58a1a5..9d60ee01ec006 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java @@ -40,7 +40,7 @@ public class DatafusionReaderManager implements EngineReaderManager refreshListeners = new CopyOnWriteArrayList(); public DatafusionReaderManager(String path, Collection files, String dataFormat) throws IOException { - WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1); + WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1, 0); files.forEach(fileMetadata -> writerFileSet.add(fileMetadata.file())); this.current = new DatafusionReader(path, null, List.of(writerFileSet)); this.path = path; diff --git a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionReaderManagerTests.java b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionReaderManagerTests.java index 3ecc5dc458804..284cd5bc5e857 100644 --- a/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionReaderManagerTests.java +++ b/plugins/engine-datafusion/src/test/java/org/opensearch/datafusion/DataFusionReaderManagerTests.java @@ -104,7 +104,7 @@ public void testInitialReaderCreation() throws IOException { Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1, 4); writerFileSet.add(parquetDir + "/parquet_file_generation_0.parquet"); writerFileSet.add(parquetDir + "/parquet_file_generation_1.parquet"); segment.addSearchableFiles(getMockDataFormat().name(), writerFileSet); @@ -135,7 +135,7 @@ public void testMultipleSearchersShareSameReader() throws IOException { Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1, 2); writerFileSet.add(parquetDir + "/parquet_file_generation_0.parquet"); segment.addSearchableFiles(getMockDataFormat().name(), writerFileSet); @@ -166,7 +166,7 @@ public void testReaderSurvivesPartialSearcherClose() throws IOException { Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1, 2); writerFileSet.add(parquetDir + "/parquet_file_generation_0.parquet"); segment.addSearchableFiles(getMockDataFormat().name(), writerFileSet); @@ -198,7 +198,7 @@ public void testRefreshCreatesNewReader() throws IOException { // Initial refresh CatalogSnapshot.Segment segment1 = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet1 = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet1 = new WriterFileSet(parquetDir, 1, 2); addFilesToShardPath(shardPath, "parquet_file_generation_0.parquet"); writerFileSet1.add(parquetDir + "/parquet_file_generation_0.parquet"); segment1.addSearchableFiles(getMockDataFormat().name(), writerFileSet1); @@ -213,7 +213,7 @@ public void testRefreshCreatesNewReader() throws IOException { // Add new file and refresh addFilesToShardPath(shardPath, "parquet_file_generation_1.parquet"); CatalogSnapshot.Segment segment2 = new CatalogSnapshot.Segment(2); - WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 2); + WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 2, 4); writerFileSet2.add(parquetDir + "/parquet_file_generation_0.parquet"); writerFileSet2.add(parquetDir + "/parquet_file_generation_1.parquet"); segment2.addSearchableFiles(getMockDataFormat().name(), writerFileSet2); @@ -247,7 +247,7 @@ public void testDecRefAfterCloseThrowsException() throws IOException { Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1, 4); writerFileSet.add(parquetDir + "/parquet_file_generation_2.parquet"); segment.addSearchableFiles(getMockDataFormat().name(), writerFileSet); @@ -277,7 +277,7 @@ public void testReaderClosesAfterSearchRelease() throws IOException { Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet = new WriterFileSet(parquetDir, 1, 6); writerFileSet.add(parquetDir + "/parquet_file_generation_2.parquet"); writerFileSet.add(parquetDir + "/parquet_file_generation_1.parquet"); segment.addSearchableFiles(getMockDataFormat().name(), writerFileSet); @@ -300,7 +300,7 @@ public void testReaderClosesAfterSearchRelease() throws IOException { addFilesToShardPath(shardPath, "parquet_file_generation_0.parquet"); // now trigger refresh to have new Reader with F2, F3 CatalogSnapshot.Segment segment2 = new CatalogSnapshot.Segment(2); - WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 2); + WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 2, 4); writerFileSet2.add(parquetDir + "/parquet_file_generation_1.parquet"); writerFileSet2.add(parquetDir + "/parquet_file_generation_0.parquet"); segment2.addSearchableFiles(getMockDataFormat().name(), writerFileSet2); @@ -346,7 +346,7 @@ public void testSearch() throws Exception { // Initial refresh - files are in the parquet subdirectory Path parquetDir = shardPath.getDataPath().resolve("parquet"); CatalogSnapshot.Segment segment1 = new CatalogSnapshot.Segment(0); - WriterFileSet writerFileSet1 = new WriterFileSet(parquetDir, 0); + WriterFileSet writerFileSet1 = new WriterFileSet(parquetDir, 0, 2); writerFileSet1.add(parquetDir + "/parquet_file_generation_0.parquet"); segment1.addSearchableFiles(getMockDataFormat().name(), writerFileSet1); @@ -376,7 +376,7 @@ public void testSearch() throws Exception { addFilesToShardPath(shardPath, "parquet_file_generation_1.parquet"); CatalogSnapshot.Segment segment2 = new CatalogSnapshot.Segment(1); - WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 1); + WriterFileSet writerFileSet2 = new WriterFileSet(parquetDir, 1, 2); writerFileSet2.add(parquetDir + "/parquet_file_generation_1.parquet"); segment2.addSearchableFiles(getMockDataFormat().name(), writerFileSet2); diff --git a/server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java b/server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java index a638c26d5034b..6996ca35305ba 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java @@ -24,8 +24,10 @@ public class WriterFileSet implements Serializable, Writeable { private final String directory; private final long writerGeneration; private final Set files; + private final long numRows; - public WriterFileSet(Path directory, long writerGeneration) { + public WriterFileSet(Path directory, long writerGeneration, long numRows) { + this.numRows = numRows; this.files = new HashSet<>(); this.writerGeneration = writerGeneration; this.directory = directory.toString(); @@ -34,6 +36,7 @@ public WriterFileSet(Path directory, long writerGeneration) { public WriterFileSet(StreamInput in) throws IOException { this.directory = in.readString(); this.writerGeneration = in.readLong(); + this.numRows = in.readVInt(); int fileCount = in.readVInt(); this.files = new HashSet<>(fileCount); @@ -75,6 +78,22 @@ public String getDirectory() { return directory; } + public long getNumRows() { + return numRows; + } + + public long getTotalSize() { + return files.stream() + .mapToLong(file -> { + try { + return java.nio.file.Files.size(Path.of(directory, file)); + } catch (IOException e) { + return 0; + } + }) + .sum(); + } + public long getWriterGeneration() { return writerGeneration; } @@ -106,6 +125,7 @@ public static Builder builder() { public static class Builder { private Path directory; private Long writerGeneration; + private long numRows; private final Set files = new HashSet<>(); public Builder directory(Path directory) { @@ -128,6 +148,11 @@ public Builder addFiles(Set files) { return this; } + public Builder addNumRows(long numRows) { + this.numRows = numRows; + return this; + } + public WriterFileSet build() { if (directory == null) { throw new IllegalStateException("directory must be set"); @@ -137,7 +162,7 @@ public WriterFileSet build() { throw new IllegalStateException("writerGeneration must be set"); } - WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration); + WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration, numRows); fileSet.files.addAll(this.files); return fileSet; } diff --git a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java index cf5c34a012f1f..ab731ba79863f 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java @@ -1015,7 +1015,7 @@ public PollingIngestStats pollingIngestStats() { @Override public MergeStats getMergeStats() { - return null; + return mergeScheduler.stats(); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java b/server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java index f36cdd0a9ab15..c6f0c88a1ab88 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java @@ -24,7 +24,6 @@ import org.opensearch.index.engine.exec.coord.CatalogSnapshot; import java.io.IOException; -import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; @@ -107,7 +106,7 @@ private SegmentInfos convertToSegmentInfos( SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major); for (CatalogSnapshot.Segment segment : segments) { - SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment)); + SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment)); segmentInfos.add(wrapper); segmentMap.put(wrapper, segment); } @@ -154,28 +153,36 @@ public Set getMergingSegments() { return Collections.unmodifiableSet(mergingSegments); } - private long calculateSegmentSize(CatalogSnapshot.Segment segment) { - long totalSize = 0; + private long calculateNumDocs(CatalogSnapshot.Segment segment) { try { - for (WriterFileSet writerFileSet : segment.getDFGroupedSearchableFiles().values()) { - for (String fileName : writerFileSet.getFiles()) { - Path filePath = Path.of(writerFileSet.getDirectory(), fileName); - if (java.nio.file.Files.exists(filePath)) { - totalSize += java.nio.file.Files.size(filePath); - } - } - } + return segment.getDFGroupedSearchableFiles().values() + .stream() + .mapToLong(WriterFileSet::getNumRows) + .sum(); } catch (Exception e) { // Log error but continue with 0 size logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e)); } - return totalSize; + return 0; + } + + private long calculateTotalSize(CatalogSnapshot.Segment segment) { + try { + return segment.getDFGroupedSearchableFiles().values() + .stream() + .mapToLong(WriterFileSet::getTotalSize) + .sum(); + } catch (Exception e) { + // Log error but continue with 0 size + logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e)); + } + return 0; } public synchronized void addMergingSegment(Collection segments) { try { for (CatalogSnapshot.Segment segment : segments) { - SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment)); + SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment)); mergingSegments.add(wrapper); } } catch (Exception e) { @@ -189,7 +196,7 @@ public synchronized void removeMergingSegment(Collection segmentsToMerge; + private final long totalSize; + private final long totalNumDocs; public OneMerge(List segmentsToMerge) { - this.segmentsToMerge = segmentsToMerge; + this.segmentsToMerge = Collections.unmodifiableList(segmentsToMerge); + this.totalSize = calculateTotalSizeInBytes(); + this.totalNumDocs = calculateTotalNumDocs(); } public List getSegmentsToMerge() { return segmentsToMerge; } + public long getTotalSizeInBytes() { + return totalSize; + } + + public long getTotalNumDocs() { + return totalNumDocs; + } + + private long calculateTotalSizeInBytes() { + return segmentsToMerge.stream() + .flatMap(segment -> segment.getDFGroupedSearchableFiles().values().stream()) + .mapToLong(WriterFileSet::getTotalSize) + .sum(); + } + + private long calculateTotalNumDocs() { + return segmentsToMerge.stream() + .flatMap(segment -> segment.getDFGroupedSearchableFiles().values().stream()) + .mapToLong(WriterFileSet::getNumRows) + .sum(); + } + + @Override public String toString() { return "Merge [SegmentsToMerge=" + segmentsToMerge + "] "; }