Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Type-safe handle for native Parquet writer with lifecycle management.
*/
public class NativeParquetWriter implements Closeable {

private final AtomicBoolean writerClosed = new AtomicBoolean(false);
private final String filePath;

/**
Expand Down Expand Up @@ -47,15 +49,23 @@ public void flush() throws IOException {
RustBridge.flushToDisk(filePath);
}

private ParquetFileMetadata metadata;

@Override
public void close() {
try {
RustBridge.closeWriter(filePath);
} catch (IOException e) {
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
if (writerClosed.compareAndSet(false, true)) {
try {
metadata = RustBridge.closeWriter(filePath);
} catch (IOException e) {
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
}
}
}

public ParquetFileMetadata getMetadata() {
return metadata;
}

public String getFilePath() {
return filePath;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.parquet.parquetdataformat.bridge;

/**
* Represents metadata information for a Parquet file.
* <p>
* This class holds the essential metadata extracted from a Parquet file
* when the writer is closed, providing visibility into the file's characteristics.
*/
public record ParquetFileMetadata(int version, long numRows, String createdBy) {
/**
* Constructs a new ParquetFileMetadata instance.
*
* @param version the Parquet format version used
* @param numRows the total number of rows in the file
* @param createdBy the application/library that created the file (can be null)
*/
public ParquetFileMetadata {
}

/**
* Gets the Parquet format version.
*
* @return the version number
*/
@Override
public int version() {
return version;
}

/**
* Gets the total number of rows in the Parquet file.
*
* @return the number of rows
*/
@Override
public long numRows() {
return numRows;
}

/**
* Gets information about what created this Parquet file.
*
* @return the creator information, or null if not available
*/
@Override
public String createdBy() {
return createdBy;
}

@Override
public String toString() {
return "ParquetFileMetadata{" +
"version=" + version +
", numRows=" + numRows +
", createdBy='" + createdBy + '\'' +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ParquetFileMetadata that = (ParquetFileMetadata) o;

if (version != that.version) return false;
if (numRows != that.numRows) return false;
return createdBy != null ? createdBy.equals(that.createdBy) : that.createdBy == null;
}

@Override
public int hashCode() {
int result = version;
result = 31 * result + (int) (numRows ^ (numRows >>> 32));
result = 31 * result + (createdBy != null ? createdBy.hashCode() : 0);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public class RustBridge {
// Enhanced native methods that handle validation and provide better error reporting
public static native void createWriter(String file, long schemaAddress) throws IOException;
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
public static native void closeWriter(String file) throws IOException;
public static native ParquetFileMetadata closeWriter(String file) throws IOException;
public static native void flushToDisk(String file) throws IOException;
public static native ParquetFileMetadata getFileMetadata(String file) throws IOException;

public static native long getFilteredNativeBytesUsed(String pathPrefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,16 @@

import com.parquet.parquetdataformat.bridge.ArrowExport;
import com.parquet.parquetdataformat.bridge.NativeParquetWriter;
import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.exec.FlushIn;
import org.opensearch.index.engine.exec.WriteResult;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -107,7 +104,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
}
}

public String flush(FlushIn flushIn) throws IOException {
public ParquetFileMetadata flush(FlushIn flushIn) throws IOException {
ManagedVSR currentVSR = managedVSR.get();
logger.info("Flush called for {}, row count: {}", fileName, currentVSR.getRowCount());
try {
Expand All @@ -120,15 +117,17 @@ public String flush(FlushIn flushIn) throws IOException {
// Transition VSR to FROZEN state before flushing
currentVSR.moveToFrozen();
logger.info("Flushing {} rows for {}", currentVSR.getRowCount(), fileName);
ParquetFileMetadata metadata;

// Write through native writer handle
try (ArrowExport export = currentVSR.exportToArrow()) {
writer.write(export.getArrayAddress(), export.getSchemaAddress());
writer.close();
metadata = writer.getMetadata();
}
logger.info("Successfully flushed data for {}", fileName);
logger.debug("Successfully flushed data for {} with metadata: {}", fileName, metadata);

return fileName;
return metadata;
} catch (Exception e) {
logger.error("Error in flush for {}: {}", fileName, e.getMessage(), e);
throw new IOException("Failed to flush data: " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.parquet.parquetdataformat.writer;

import com.parquet.parquetdataformat.bridge.ParquetFileMetadata;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.vsr.VSRManager;
import org.apache.arrow.vector.types.pojo.Schema;
Expand Down Expand Up @@ -58,16 +59,17 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {

@Override
public FileInfos flush(FlushIn flushIn) throws IOException {
String fileName = vsrManager.flush(flushIn);
ParquetFileMetadata parquetFileMetadata = vsrManager.flush(flushIn);
// no data flushed
if (fileName == null) {
if (file == null) {
return FileInfos.empty();
}
Path file = Path.of(fileName);
Path filePath = Path.of(file);
WriterFileSet writerFileSet = WriterFileSet.builder()
.directory(file.getParent())
.directory(filePath.getParent())
.writerGeneration(writerGeneration)
.addFile(file.getFileName().toString())
.addFile(filePath.getFileName().toString())
.addNumRows(parquetFileMetadata.numRows())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add the entire metadata? We can reuse it later to store checksums as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetFileMetadata is created in plugin and this code is in core.

.build();
return FileInfos.builder().putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet).build();
}
Expand Down
Loading
Loading