Skip to content

Commit f713e09

Browse files
committed
[Merge] Adding MergeStats support for Composite Engine
Signed-off-by: Sagar Darji <darjisagar7@gmail.com>
1 parent 8557d79 commit f713e09

10 files changed

Lines changed: 184 additions & 54 deletions

File tree

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,14 @@
1010

1111
import java.io.Closeable;
1212
import java.io.IOException;
13+
import java.util.concurrent.atomic.AtomicBoolean;
1314

1415
/**
1516
* Type-safe handle for native Parquet writer with lifecycle management.
1617
*/
1718
public class NativeParquetWriter implements Closeable {
1819

20+
private volatile AtomicBoolean writerClosed = new AtomicBoolean(false);
1921
private final String filePath;
2022

2123
/**
@@ -47,15 +49,24 @@ public void flush() throws IOException {
4749
RustBridge.flushToDisk(filePath);
4850
}
4951

52+
private ParquetFileMetadata metadata;
53+
5054
@Override
5155
public void close() {
52-
try {
53-
RustBridge.closeWriter(filePath);
54-
} catch (IOException e) {
55-
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
56+
if (!writerClosed.get()) {
57+
try {
58+
metadata = RustBridge.closeWriter(filePath);
59+
writerClosed.set(true);
60+
} catch (IOException e) {
61+
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
62+
}
5663
}
5764
}
5865

66+
public ParquetFileMetadata getMetadata() {
67+
return metadata;
68+
}
69+
5970
public String getFilePath() {
6071
return filePath;
6172
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,13 @@
1313
import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
1414
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
1515
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
16-
import org.apache.arrow.vector.FieldVector;
1716
import org.apache.arrow.vector.types.pojo.Schema;
1817
import org.apache.logging.log4j.LogManager;
1918
import org.apache.logging.log4j.Logger;
2019
import org.opensearch.index.engine.exec.FlushIn;
2120
import org.opensearch.index.engine.exec.WriteResult;
2221

23-
import java.io.Closeable;
2422
import java.io.IOException;
25-
import java.util.HashMap;
26-
import java.util.Map;
2723
import java.util.concurrent.atomic.AtomicReference;
2824

2925
/**
@@ -108,7 +104,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
108104
}
109105
}
110106

111-
public String flush(FlushIn flushIn) throws IOException {
107+
public ParquetFileMetadata flush(FlushIn flushIn) throws IOException {
112108
ManagedVSR currentVSR = managedVSR.get();
113109
logger.info("Flush called for {}, row count: {}", fileName, currentVSR.getRowCount());
114110
try {
@@ -121,15 +117,17 @@ public String flush(FlushIn flushIn) throws IOException {
121117
// Transition VSR to FROZEN state before flushing
122118
currentVSR.moveToFrozen();
123119
logger.info("Flushing {} rows for {}", currentVSR.getRowCount(), fileName);
120+
ParquetFileMetadata metadata;
124121

125122
// Write through native writer handle
126123
try (ArrowExport export = currentVSR.exportToArrow()) {
127-
ParquetFileMetadata fileMetadata = writer.write(export.getArrayAddress(), export.getSchemaAddress());
124+
writer.write(export.getArrayAddress(), export.getSchemaAddress());
128125
writer.close();
126+
metadata = writer.getMetadata();
129127
}
130-
logger.info("Successfully flushed data for {}", fileName);
128+
logger.debug("Successfully flushed data for {} with metadata: {}", fileName, metadata);
131129

132-
return fileName;
130+
return metadata;
133131
} catch (Exception e) {
134132
logger.error("Error in flush for {}: {}", fileName, e.getMessage(), e);
135133
throw new IOException("Failed to flush data: " + e.getMessage(), e);

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.parquet.parquetdataformat.writer;
22

3+
import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
34
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
45
import com.parquet.parquetdataformat.vsr.VSRManager;
56
import org.apache.arrow.vector.types.pojo.Schema;
@@ -58,16 +59,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
5859

5960
@Override
6061
public FileInfos flush(FlushIn flushIn) throws IOException {
61-
String fileName = vsrManager.flush(flushIn);
62+
ParquetFileMetadata parquetFileMetadata = vsrManager.flush(flushIn);
6263
// no data flushed
63-
if (fileName == null) {
64+
if (file == null) {
6465
return FileInfos.empty();
6566
}
66-
Path file = Path.of(fileName);
67+
Path filePath = Path.of(file);
6768
WriterFileSet writerFileSet = WriterFileSet.builder()
68-
.directory(file.getParent())
69+
.directory(filePath.getParent())
6970
.writerGeneration(writerGeneration)
70-
.addFile(file.getFileName().toString())
71+
.addFile(filePath.getFileName().toString())
72+
.addNumRows(parquetFileMetadata.numRows())
7173
.build();
7274
return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build();
7375
}

modules/parquet-data-format/src/main/rust/src/lib.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ use parquet::basic::{Compression, ZstdLevel};
1010
use parquet::file::properties::WriterProperties;
1111
use std::fs::File;
1212
use std::sync::{Arc, Mutex};
13-
use arrow::ffi::{FFI_ArrowSchema, FFI_ArrowArray};
14-
use std::fs::OpenOptions;
15-
use std::io::Write;
16-
use chrono::Utc;
1713
use parquet::format::FileMetaData as FormatFileMetaData;
1814
use parquet::file::metadata::FileMetaData as FileFileMetaData;
1915
use parquet::file::reader::{FileReader, SerializedFileReader};
@@ -329,8 +325,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_clos
329325
Ok(java_obj) => java_obj.into_raw(),
330326
Err(e) => {
331327
let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e);
332-
println!("{}", error_msg.trim());
333-
NativeParquetWriter::log_to_file(&error_msg);
328+
log_error!("{}", error_msg.trim());
334329
// Throw IOException to Java
335330
let _ = env.throw_new("java/io/IOException", "Failed to create metadata object");
336331
JObject::null().into_raw()
@@ -345,9 +340,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_clos
345340
}
346341
}
347342
Err(e) => {
348-
let error_msg = format!("[RUST] ERROR: Failed to close writer: {:?}\n", e);
349-
println!("{}", error_msg.trim());
350-
NativeParquetWriter::log_to_file(&error_msg);
343+
log_error!("[RUST] ERROR: Failed to close writer: {:?}\n", e);
351344
// Throw IOException to Java
352345
let _ = env.throw_new("java/io/IOException", &format!("Failed to close writer: {}", e));
353346
JObject::null().into_raw()
@@ -382,7 +375,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getF
382375
Err(e) => {
383376
let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e);
384377
println!("{}", error_msg.trim());
385-
NativeParquetWriter::log_to_file(&error_msg);
378+
log_error!("{}", error_msg);
386379
// Throw IOException to Java
387380
let _ = env.throw_new("java/io/IOException", "Failed to create metadata object");
388381
JObject::null().into_raw()
@@ -392,14 +385,27 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getF
392385
Err(e) => {
393386
let error_msg = format!("[RUST] ERROR: Failed to read file metadata: {:?}\n", e);
394387
println!("{}", error_msg.trim());
395-
NativeParquetWriter::log_to_file(&error_msg);
388+
log_error!("{}", error_msg);
396389
// Throw IOException to Java
397390
let _ = env.throw_new("java/io/IOException", &format!("Failed to read file metadata: {}", e));
398391
JObject::null().into_raw()
399392
}
400393
}
401394
}
402395

396+
#[unsafe(no_mangle)]
397+
pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getFilteredNativeBytesUsed(
398+
mut env: JNIEnv,
399+
_class: JClass,
400+
path_prefix: JString
401+
) -> jlong {
402+
let prefix: String = env.get_string(&path_prefix).expect("Couldn't get java string!").into();
403+
match NativeParquetWriter::get_filtered_writer_memory_usage(prefix) {
404+
Ok(memory) => memory as jlong,
405+
Err(_) => 0,
406+
}
407+
}
408+
403409
#[cfg(test)]
404410
mod tests {
405411
use super::*;

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class DatafusionReaderManager implements EngineReaderManager<DatafusionRe
4040
// private final List<ReferenceManager.RefreshListener> refreshListeners = new CopyOnWriteArrayList();
4141

4242
public DatafusionReaderManager(String path, Collection<FileMetadata> files, String dataFormat) throws IOException {
43-
WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1);
43+
WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1, 0);
4444
files.forEach(fileMetadata -> writerFileSet.add(fileMetadata.file()));
4545
this.current = new DatafusionReader(path, null, List.of(writerFileSet));
4646
this.path = path;

server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ public class WriterFileSet implements Serializable, Writeable {
2424
private final String directory;
2525
private final long writerGeneration;
2626
private final Set<String> files;
27+
private final long numRows;
2728

28-
public WriterFileSet(Path directory, long writerGeneration) {
29+
public WriterFileSet(Path directory, long writerGeneration, long numRows) {
30+
this.numRows = numRows;
2931
this.files = new HashSet<>();
3032
this.writerGeneration = writerGeneration;
3133
this.directory = directory.toString();
@@ -34,6 +36,7 @@ public WriterFileSet(Path directory, long writerGeneration) {
3436
public WriterFileSet(StreamInput in) throws IOException {
3537
this.directory = in.readString();
3638
this.writerGeneration = in.readLong();
39+
this.numRows = in.readVInt();
3740

3841
int fileCount = in.readVInt();
3942
this.files = new HashSet<>(fileCount);
@@ -75,6 +78,22 @@ public String getDirectory() {
7578
return directory;
7679
}
7780

81+
public long getNumRows() {
82+
return numRows;
83+
}
84+
85+
public long getTotalSize() {
86+
return files.stream()
87+
.mapToLong(file -> {
88+
try {
89+
return java.nio.file.Files.size(Path.of(directory, file));
90+
} catch (IOException e) {
91+
return 0;
92+
}
93+
})
94+
.sum();
95+
}
96+
7897
public long getWriterGeneration() {
7998
return writerGeneration;
8099
}
@@ -106,6 +125,7 @@ public static Builder builder() {
106125
public static class Builder {
107126
private Path directory;
108127
private Long writerGeneration;
128+
private long numRows;
109129
private final Set<String> files = new HashSet<>();
110130

111131
public Builder directory(Path directory) {
@@ -128,6 +148,11 @@ public Builder addFiles(Set<String> files) {
128148
return this;
129149
}
130150

151+
public Builder addNumRows(long numRows) {
152+
this.numRows = numRows;
153+
return this;
154+
}
155+
131156
public WriterFileSet build() {
132157
if (directory == null) {
133158
throw new IllegalStateException("directory must be set");
@@ -137,7 +162,7 @@ public WriterFileSet build() {
137162
throw new IllegalStateException("writerGeneration must be set");
138163
}
139164

140-
WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration);
165+
WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration, numRows);
141166
fileSet.files.addAll(this.files);
142167
return fileSet;
143168
}

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,7 @@ public PollingIngestStats pollingIngestStats() {
10151015

10161016
@Override
10171017
public MergeStats getMergeStats() {
1018-
return null;
1018+
return mergeScheduler.stats();
10191019
}
10201020

10211021
@Override

server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
2525

2626
import java.io.IOException;
27-
import java.nio.file.Path;
2827
import java.nio.file.Paths;
2928
import java.util.ArrayList;
3029
import java.util.Collection;
@@ -107,7 +106,7 @@ private SegmentInfos convertToSegmentInfos(
107106
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
108107

109108
for (CatalogSnapshot.Segment segment : segments) {
110-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
109+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
111110
segmentInfos.add(wrapper);
112111
segmentMap.put(wrapper, segment);
113112
}
@@ -154,28 +153,36 @@ public Set<SegmentCommitInfo> getMergingSegments() {
154153
return Collections.unmodifiableSet(mergingSegments);
155154
}
156155

157-
private long calculateSegmentSize(CatalogSnapshot.Segment segment) {
158-
long totalSize = 0;
156+
private long calculateNumDocs(CatalogSnapshot.Segment segment) {
159157
try {
160-
for (WriterFileSet writerFileSet : segment.getDFGroupedSearchableFiles().values()) {
161-
for (String fileName : writerFileSet.getFiles()) {
162-
Path filePath = Path.of(writerFileSet.getDirectory(), fileName);
163-
if (java.nio.file.Files.exists(filePath)) {
164-
totalSize += java.nio.file.Files.size(filePath);
165-
}
166-
}
167-
}
158+
return segment.getDFGroupedSearchableFiles().values()
159+
.stream()
160+
.mapToLong(WriterFileSet::getNumRows)
161+
.sum();
168162
} catch (Exception e) {
169163
// Log error but continue with 0 size
170164
logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e));
171165
}
172-
return totalSize;
166+
return 0;
167+
}
168+
169+
private long calculateTotalSize(CatalogSnapshot.Segment segment) {
170+
try {
171+
return segment.getDFGroupedSearchableFiles().values()
172+
.stream()
173+
.mapToLong(WriterFileSet::getTotalSize)
174+
.sum();
175+
} catch (Exception e) {
176+
// Log error but continue with 0 size
177+
logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e));
178+
}
179+
return 0;
173180
}
174181

175182
public synchronized void addMergingSegment(Collection<CatalogSnapshot.Segment> segments) {
176183
try {
177184
for (CatalogSnapshot.Segment segment : segments) {
178-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
185+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
179186
mergingSegments.add(wrapper);
180187
}
181188
} catch (Exception e) {
@@ -189,7 +196,7 @@ public synchronized void removeMergingSegment(Collection<CatalogSnapshot.Segment
189196
try {
190197

191198
for (CatalogSnapshot.Segment segment : segments) {
192-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
199+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
193200
segmentToRemove.add(wrapper);
194201
}
195202
segmentToRemove.forEach(mergingSegments::remove);
@@ -202,7 +209,7 @@ public synchronized void removeMergingSegment(Collection<CatalogSnapshot.Segment
202209
private static class SegmentWrapper extends SegmentCommitInfo {
203210
private final long totalSizeBytes;
204211

205-
public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes) throws IOException {
212+
public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes, long totalNumDocs) throws IOException {
206213
super(
207214
// SegmentInfo
208215
new org.apache.lucene.index.SegmentInfo(
@@ -215,8 +222,7 @@ public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes) thro
215222
// segment name
216223
"segment_" + segment.getGeneration(),
217224
// maxDoc - total document count across all files in segment
218-
// TODO: Get correct total doc from catalogSnaoshot or Segment
219-
(int)(totalSizeBytes / 1000),
225+
(int)(totalNumDocs),
220226
// isCompound - false as we don't need compound file format
221227
false,
222228
// has block

0 commit comments

Comments
 (0)