Skip to content

Commit f18a032

Browse files
committed
[Merge] Adding MergeStats support for Composite Engine
Signed-off-by: Sagar Darji <darjisagar7@gmail.com>
1 parent 8557d79 commit f18a032

11 files changed

Lines changed: 183 additions & 49 deletions

File tree

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/NativeParquetWriter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,21 @@ public void flush() throws IOException {
4747
RustBridge.flushToDisk(filePath);
4848
}
4949

50+
private ParquetFileMetadata metadata;
51+
5052
@Override
5153
public void close() {
5254
try {
53-
RustBridge.closeWriter(filePath);
55+
metadata = RustBridge.closeWriter(filePath);
5456
} catch (IOException e) {
5557
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
5658
}
5759
}
5860

61+
public ParquetFileMetadata getMetadata() {
62+
return metadata;
63+
}
64+
5965
public String getFilePath() {
6066
return filePath;
6167
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class VSRManager implements AutoCloseable {
4646
private final String fileName;
4747
private final VSRPool vsrPool;
4848
private NativeParquetWriter writer;
49+
private volatile boolean writerClosed = false;
4950

5051

5152
public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPool) {
@@ -108,7 +109,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
108109
}
109110
}
110111

111-
public String flush(FlushIn flushIn) throws IOException {
112+
public ParquetFileMetadata flush(FlushIn flushIn) throws IOException {
112113
ManagedVSR currentVSR = managedVSR.get();
113114
logger.info("Flush called for {}, row count: {}", fileName, currentVSR.getRowCount());
114115
try {
@@ -121,15 +122,18 @@ public String flush(FlushIn flushIn) throws IOException {
121122
// Transition VSR to FROZEN state before flushing
122123
currentVSR.moveToFrozen();
123124
logger.info("Flushing {} rows for {}", currentVSR.getRowCount(), fileName);
125+
ParquetFileMetadata metadata;
124126

125127
// Write through native writer handle
126128
try (ArrowExport export = currentVSR.exportToArrow()) {
127-
ParquetFileMetadata fileMetadata = writer.write(export.getArrayAddress(), export.getSchemaAddress());
129+
writer.write(export.getArrayAddress(), export.getSchemaAddress());
128130
writer.close();
131+
writerClosed = true;
132+
metadata = writer.getMetadata();
129133
}
130-
logger.info("Successfully flushed data for {}", fileName);
134+
logger.debug("Successfully flushed data for {} with metadata: {}", fileName, metadata);
131135

132-
return fileName;
136+
return metadata;
133137
} catch (Exception e) {
134138
logger.error("Error in flush for {}: {}", fileName, e.getMessage(), e);
135139
throw new IOException("Failed to flush data: " + e.getMessage(), e);
@@ -139,9 +143,10 @@ public String flush(FlushIn flushIn) throws IOException {
139143
@Override
140144
public void close() {
141145
try {
142-
if (writer != null) {
146+
if (writer != null && !writerClosed) {
143147
writer.flush();
144148
writer.close();
149+
writerClosed = true;
145150
}
146151

147152
// Close VSR Pool - handle IllegalStateException specially

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.parquet.parquetdataformat.writer;
22

3+
import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
34
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
45
import com.parquet.parquetdataformat.vsr.VSRManager;
56
import org.apache.arrow.vector.types.pojo.Schema;
@@ -58,16 +59,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
5859

5960
@Override
6061
public FileInfos flush(FlushIn flushIn) throws IOException {
61-
String fileName = vsrManager.flush(flushIn);
62+
ParquetFileMetadata parquetFileMetadata = vsrManager.flush(flushIn);
6263
// no data flushed
63-
if (fileName == null) {
64+
if (file == null) {
6465
return FileInfos.empty();
6566
}
66-
Path file = Path.of(fileName);
67+
Path filePath = Path.of(file);
6768
WriterFileSet writerFileSet = WriterFileSet.builder()
68-
.directory(file.getParent())
69+
.directory(filePath.getParent())
6970
.writerGeneration(writerGeneration)
70-
.addFile(file.getFileName().toString())
71+
.addFile(filePath.getFileName().toString())
72+
.addNumRows(parquetFileMetadata.numRows())
7173
.build();
7274
return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build();
7375
}

modules/parquet-data-format/src/main/rust/src/lib.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@ use parquet::basic::{Compression, ZstdLevel};
1010
use parquet::file::properties::WriterProperties;
1111
use std::fs::File;
1212
use std::sync::{Arc, Mutex};
13-
use arrow::ffi::{FFI_ArrowSchema, FFI_ArrowArray};
14-
use std::fs::OpenOptions;
15-
use std::io::Write;
16-
use chrono::Utc;
1713
use parquet::format::FileMetaData as FormatFileMetaData;
1814
use parquet::file::metadata::FileMetaData as FileFileMetaData;
1915
use parquet::file::reader::{FileReader, SerializedFileReader};
@@ -329,8 +325,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_clos
329325
Ok(java_obj) => java_obj.into_raw(),
330326
Err(e) => {
331327
let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e);
332-
println!("{}", error_msg.trim());
333-
NativeParquetWriter::log_to_file(&error_msg);
328+
log_error!("{}", error_msg.trim());
334329
// Throw IOException to Java
335330
let _ = env.throw_new("java/io/IOException", "Failed to create metadata object");
336331
JObject::null().into_raw()
@@ -345,9 +340,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_clos
345340
}
346341
}
347342
Err(e) => {
348-
let error_msg = format!("[RUST] ERROR: Failed to close writer: {:?}\n", e);
349-
println!("{}", error_msg.trim());
350-
NativeParquetWriter::log_to_file(&error_msg);
343+
log_error!("[RUST] ERROR: Failed to close writer: {:?}\n", e);
351344
// Throw IOException to Java
352345
let _ = env.throw_new("java/io/IOException", &format!("Failed to close writer: {}", e));
353346
JObject::null().into_raw()
@@ -382,7 +375,7 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getF
382375
Err(e) => {
383376
let error_msg = format!("[RUST] ERROR: Failed to create Java metadata object: {:?}\n", e);
384377
println!("{}", error_msg.trim());
385-
NativeParquetWriter::log_to_file(&error_msg);
378+
log_error!("{}", error_msg);
386379
// Throw IOException to Java
387380
let _ = env.throw_new("java/io/IOException", "Failed to create metadata object");
388381
JObject::null().into_raw()
@@ -392,14 +385,27 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getF
392385
Err(e) => {
393386
let error_msg = format!("[RUST] ERROR: Failed to read file metadata: {:?}\n", e);
394387
println!("{}", error_msg.trim());
395-
NativeParquetWriter::log_to_file(&error_msg);
388+
log_error!("{}", error_msg);
396389
// Throw IOException to Java
397390
let _ = env.throw_new("java/io/IOException", &format!("Failed to read file metadata: {}", e));
398391
JObject::null().into_raw()
399392
}
400393
}
401394
}
402395

396+
#[unsafe(no_mangle)]
397+
pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_getFilteredNativeBytesUsed(
398+
mut env: JNIEnv,
399+
_class: JClass,
400+
path_prefix: JString
401+
) -> jlong {
402+
let prefix: String = env.get_string(&path_prefix).expect("Couldn't get java string!").into();
403+
match NativeParquetWriter::get_filtered_writer_memory_usage(prefix) {
404+
Ok(memory) => memory as jlong,
405+
Err(_) => 0,
406+
}
407+
}
408+
403409
#[cfg(test)]
404410
mod tests {
405411
use super::*;

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class DatafusionReaderManager implements EngineReaderManager<DatafusionRe
4040
// private final List<ReferenceManager.RefreshListener> refreshListeners = new CopyOnWriteArrayList();
4141

4242
public DatafusionReaderManager(String path, Collection<FileMetadata> files, String dataFormat) throws IOException {
43-
WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1);
43+
WriterFileSet writerFileSet = new WriterFileSet(Path.of(URI.create("file:///" + path)), 1, 0);
4444
files.forEach(fileMetadata -> writerFileSet.add(fileMetadata.file()));
4545
this.current = new DatafusionReader(path, null, List.of(writerFileSet));
4646
this.path = path;

server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ public class WriterFileSet implements Serializable, Writeable {
2424
private final String directory;
2525
private final long writerGeneration;
2626
private final Set<String> files;
27+
private final long numRows;
2728

28-
public WriterFileSet(Path directory, long writerGeneration) {
29+
public WriterFileSet(Path directory, long writerGeneration, long numRows) {
30+
this.numRows = numRows;
2931
this.files = new HashSet<>();
3032
this.writerGeneration = writerGeneration;
3133
this.directory = directory.toString();
@@ -34,6 +36,7 @@ public WriterFileSet(Path directory, long writerGeneration) {
3436
public WriterFileSet(StreamInput in) throws IOException {
3537
this.directory = in.readString();
3638
this.writerGeneration = in.readLong();
39+
this.numRows = in.readVInt();
3740

3841
int fileCount = in.readVInt();
3942
this.files = new HashSet<>(fileCount);
@@ -75,6 +78,16 @@ public String getDirectory() {
7578
return directory;
7679
}
7780

81+
public long getNumRows() {
82+
return numRows;
83+
}
84+
85+
public long getTotalSize() {
86+
return files.stream()
87+
.mapToLong(file -> files.size())
88+
.sum();
89+
}
90+
7891
public long getWriterGeneration() {
7992
return writerGeneration;
8093
}
@@ -106,6 +119,7 @@ public static Builder builder() {
106119
public static class Builder {
107120
private Path directory;
108121
private Long writerGeneration;
122+
private long numRows;
109123
private final Set<String> files = new HashSet<>();
110124

111125
public Builder directory(Path directory) {
@@ -128,6 +142,11 @@ public Builder addFiles(Set<String> files) {
128142
return this;
129143
}
130144

145+
public Builder addNumRows(long numRows) {
146+
this.numRows = numRows;
147+
return this;
148+
}
149+
131150
public WriterFileSet build() {
132151
if (directory == null) {
133152
throw new IllegalStateException("directory must be set");
@@ -137,7 +156,7 @@ public WriterFileSet build() {
137156
throw new IllegalStateException("writerGeneration must be set");
138157
}
139158

140-
WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration);
159+
WriterFileSet fileSet = new WriterFileSet(directory, writerGeneration, numRows);
141160
fileSet.files.addAll(this.files);
142161
return fileSet;
143162
}

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,7 @@ public PollingIngestStats pollingIngestStats() {
10151015

10161016
@Override
10171017
public MergeStats getMergeStats() {
1018-
return null;
1018+
return mergeScheduler.stats();
10191019
}
10201020

10211021
@Override

server/src/main/java/org/opensearch/index/engine/exec/merge/CompositeMergePolicy.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
2525

2626
import java.io.IOException;
27-
import java.nio.file.Path;
2827
import java.nio.file.Paths;
2928
import java.util.ArrayList;
3029
import java.util.Collection;
@@ -107,7 +106,7 @@ private SegmentInfos convertToSegmentInfos(
107106
SegmentInfos segmentInfos = new SegmentInfos(Version.LATEST.major);
108107

109108
for (CatalogSnapshot.Segment segment : segments) {
110-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
109+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
111110
segmentInfos.add(wrapper);
112111
segmentMap.put(wrapper, segment);
113112
}
@@ -154,28 +153,36 @@ public Set<SegmentCommitInfo> getMergingSegments() {
154153
return Collections.unmodifiableSet(mergingSegments);
155154
}
156155

157-
private long calculateSegmentSize(CatalogSnapshot.Segment segment) {
158-
long totalSize = 0;
156+
private long calculateNumDocs(CatalogSnapshot.Segment segment) {
159157
try {
160-
for (WriterFileSet writerFileSet : segment.getDFGroupedSearchableFiles().values()) {
161-
for (String fileName : writerFileSet.getFiles()) {
162-
Path filePath = Path.of(writerFileSet.getDirectory(), fileName);
163-
if (java.nio.file.Files.exists(filePath)) {
164-
totalSize += java.nio.file.Files.size(filePath);
165-
}
166-
}
167-
}
158+
return segment.getDFGroupedSearchableFiles().values()
159+
.stream()
160+
.mapToLong(WriterFileSet::getNumRows)
161+
.sum();
168162
} catch (Exception e) {
169163
// Log error but continue with 0 size
170164
logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e));
171165
}
172-
return totalSize;
166+
return 0;
167+
}
168+
169+
private long calculateTotalSize(CatalogSnapshot.Segment segment) {
170+
try {
171+
return segment.getDFGroupedSearchableFiles().values()
172+
.stream()
173+
.mapToLong(WriterFileSet::getTotalSize)
174+
.sum();
175+
} catch (Exception e) {
176+
// Log error but continue with 0 size
177+
logger.warn(() -> new ParameterizedMessage("Error calculating segment size", e));
178+
}
179+
return 0;
173180
}
174181

175182
public synchronized void addMergingSegment(Collection<CatalogSnapshot.Segment> segments) {
176183
try {
177184
for (CatalogSnapshot.Segment segment : segments) {
178-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
185+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
179186
mergingSegments.add(wrapper);
180187
}
181188
} catch (Exception e) {
@@ -189,7 +196,7 @@ public synchronized void removeMergingSegment(Collection<CatalogSnapshot.Segment
189196
try {
190197

191198
for (CatalogSnapshot.Segment segment : segments) {
192-
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateSegmentSize(segment));
199+
SegmentWrapper wrapper = new SegmentWrapper(segment, calculateTotalSize(segment), calculateNumDocs(segment));
193200
segmentToRemove.add(wrapper);
194201
}
195202
segmentToRemove.forEach(mergingSegments::remove);
@@ -202,7 +209,7 @@ public synchronized void removeMergingSegment(Collection<CatalogSnapshot.Segment
202209
private static class SegmentWrapper extends SegmentCommitInfo {
203210
private final long totalSizeBytes;
204211

205-
public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes) throws IOException {
212+
public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes, long totalNumDocs) throws IOException {
206213
super(
207214
// SegmentInfo
208215
new org.apache.lucene.index.SegmentInfo(
@@ -215,8 +222,7 @@ public SegmentWrapper(CatalogSnapshot.Segment segment, long totalSizeBytes) thro
215222
// segment name
216223
"segment_" + segment.getGeneration(),
217224
// maxDoc - total document count across all files in segment
218-
// TODO: Get correct total doc from catalogSnaoshot or Segment
219-
(int)(totalSizeBytes / 1000),
225+
(int)(totalNumDocs),
220226
// isCompound - false as we don't need compound file format
221227
false,
222228
// has block

0 commit comments

Comments
 (0)