Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() throws IOException {
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
filePath = generateTempFilePath();
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress(), true);
RustBridge.write(filePath, writerWriteBenchmarkData.getArrowArray().memoryAddress(), writerWriteBenchmarkData.getArrowSchema().memoryAddress());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void tearDown() throws IOException {
@Benchmark
public void benchmarkCreate() throws IOException {
// This is what we're benchmarking - just writer creation
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress(), true);
}

private String generateTempFilePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void setup() throws IOException {
writerCreationBenchmarkData = generator.generate("simple", fieldCount, 0);
writerWriteBenchmarkData = generator.generate("simple", fieldCount, recordCount);
filePath = generateTempFilePath();
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress());
RustBridge.createWriter(filePath, writerCreationBenchmarkData.getArrowSchema().memoryAddress(), true);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin,

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath, indexSettings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package com.parquet.parquetdataformat.bridge;

import org.opensearch.index.IndexSettings;

import java.io.Closeable;
import java.io.IOException;

Expand All @@ -24,9 +26,10 @@ public class NativeParquetWriter implements Closeable {
* @param schemaAddress Arrow C Data Interface schema pointer
* @throws IOException if writer creation fails
*/
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
public NativeParquetWriter(String filePath, long schemaAddress, IndexSettings indexSettings) throws IOException {
this.filePath = filePath;
RustBridge.createWriter(filePath, schemaAddress);
final boolean isCompressionEnabled = indexSettings.isCompressionEnabled();
RustBridge.createWriter(filePath, schemaAddress, isCompressionEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class RustBridge {
public static native void initLogger();

// Enhanced native methods that handle validation and provide better error reporting
public static native void createWriter(String file, long schemaAddress) throws IOException;
public static native void createWriter(String file, long schemaAddress, boolean isCompressionEnabled) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change makes it very specific for compression. Can we instead pass a map here which can be read and have attribute names? Also, there are certain settings which we may want to apply for certain columns only (some may get handled via mappings as well). Can you look into if we can make the argument passed to Rust aligned to handle the index/field level settings?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg configuration for ref

write.format.default parquet Default file format for the table; parquet, avro, or orc
write.delete.format.default data file format Default delete file format for the table; parquet, avro, or orc
write.parquet.row-group-size-bytes 134217728 (128 MB) Parquet row group size
write.parquet.page-size-bytes 1048576 (1 MB) Parquet page size
write.parquet.page-row-limit 20000 Parquet page row limit
write.parquet.dict-size-bytes 2097152 (2 MB) Parquet dictionary page size
write.parquet.compression-codec zstd Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed
write.parquet.compression-level null Parquet compression level
write.parquet.bloom-filter-enabled.column.col1 (not set) Hint to parquet to write a bloom filter for the column: 'col1'
write.parquet.bloom-filter-max-bytes 1048576 (1 MB) The maximum number of bytes for a bloom filter bitset
write.parquet.bloom-filter-fpp.column.col1 0.01 The false positive probability for a bloom filter applied to 'col1' (must > 0.0 and < 1.0)

public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
public static native void closeWriter(String file) throws IOException;
public static native void flushToDisk(String file) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.engine.exec.Merger;
Expand Down Expand Up @@ -71,11 +72,18 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa
private final ShardPath shardPath;
private final ParquetMerger parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
private final ArrowBufferPool arrowBufferPool;

public ParquetExecutionEngine(Settings settings, Supplier<Schema> schema, ShardPath shardPath) {
private final IndexSettings indexSettings;

public ParquetExecutionEngine(
Settings settings,
Supplier<Schema> schema,
ShardPath shardPath,
IndexSettings indexSettings
) {
this.schema = schema;
this.shardPath = shardPath;
this.arrowBufferPool = new ArrowBufferPool(settings);
this.indexSettings = indexSettings;
}

@Override
Expand Down Expand Up @@ -108,7 +116,7 @@ public List<String> supportedFieldTypes() {
@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool);
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool, indexSettings);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@
import com.parquet.parquetdataformat.bridge.NativeParquetWriter;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.FlushIn;
import org.opensearch.index.engine.exec.WriteResult;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -45,9 +42,10 @@ public class VSRManager implements AutoCloseable {
private final String fileName;
private final VSRPool vsrPool;
private NativeParquetWriter writer;
private final IndexSettings indexSettings;


public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPool) {
public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPool, IndexSettings indexSettings) {
this.fileName = fileName;
this.schema = schema;

Expand All @@ -57,14 +55,16 @@ public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPoo
// Get active VSR from pool
this.managedVSR.set(vsrPool.getActiveVSR());

this.indexSettings = indexSettings;

// Initialize writer lazily to avoid crashes
initializeWriter();
}

private void initializeWriter() {
try {
try (ArrowExport export = managedVSR.get().exportSchema()) {
writer = new NativeParquetWriter(fileName, export.getSchemaAddress());
writer = new NativeParquetWriter(fileName, export.getSchemaAddress(), indexSettings);
}
} catch (Exception e) {
throw new RuntimeException("Failed to initialize Parquet writer: " + e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.FileInfos;
import org.opensearch.index.engine.exec.FlushIn;
import org.opensearch.index.engine.exec.WriteResult;
Expand Down Expand Up @@ -44,10 +45,16 @@ public class ParquetWriter implements Writer<ParquetDocumentInput> {
private final VSRManager vsrManager;
private final long writerGeneration;

public ParquetWriter(String file, Schema schema, long writerGeneration, ArrowBufferPool arrowBufferPool) {
public ParquetWriter(
String file,
Schema schema,
long writerGeneration,
ArrowBufferPool arrowBufferPool,
IndexSettings indexSettings
) {
this.file = file;
this.schema = schema;
this.vsrManager = new VSRManager(file, schema, arrowBufferPool);
this.vsrManager = new VSRManager(file, schema, arrowBufferPool, indexSettings);
this.writerGeneration = writerGeneration;
}

Expand Down
28 changes: 16 additions & 12 deletions modules/parquet-data-format/src/main/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct NativeParquetWriter;

impl NativeParquetWriter {

fn create_writer(filename: String, schema_address: i64) -> Result<(), Box<dyn std::error::Error>> {
fn create_writer(filename: String, schema_address: i64, is_compression_enabled: bool) -> Result<(), Box<dyn std::error::Error>> {
log_info!("[RUST] create_writer called for file: {}, schema_address: {}", filename, schema_address);

if (schema_address as *mut u8).is_null() {
Expand All @@ -54,10 +54,13 @@ impl NativeParquetWriter {
let file = File::create(&filename)?;
let file_clone = file.try_clone()?;
FILE_MANAGER.insert(filename.clone(), file_clone);
let props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()))
.build();
let writer = ArrowWriter::try_new(file, schema, Some(props))?;
let mut props_builder = WriterProperties::builder();

if is_compression_enabled {
props_builder = props_builder.set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap()));
}

let writer = ArrowWriter::try_new(file, schema, Some(props_builder.build()))?;
WRITER_MANAGER.insert(filename, Arc::new(Mutex::new(writer)));
Ok(())
}
Expand Down Expand Up @@ -211,10 +214,11 @@ pub extern "system" fn Java_com_parquet_parquetdataformat_bridge_RustBridge_crea
mut env: JNIEnv,
_class: JClass,
file: JString,
schema_address: jlong
schema_address: jlong,
is_compression_enabled: bool
) -> jint {
let filename: String = env.get_string(&file).expect("Couldn't get java string!").into();
match NativeParquetWriter::create_writer(filename, schema_address as i64) {
match NativeParquetWriter::create_writer(filename, schema_address as i64, is_compression_enabled) {
Ok(_) => 0,
Err(_) => -1,
}
Expand Down Expand Up @@ -351,7 +355,7 @@ mod tests {

fn create_writer_and_assert_success(filename: &str) -> (Arc<Schema>, i64) {
let (schema, schema_ptr) = create_test_ffi_schema();
let result = NativeParquetWriter::create_writer(filename.to_string(), schema_ptr);
let result = NativeParquetWriter::create_writer(filename.to_string(), schema_ptr, true);
assert!(result.is_ok());
(schema, schema_ptr)
}
Expand Down Expand Up @@ -384,7 +388,7 @@ mod tests {
let invalid_path = "/invalid/path/that/does/not/exist/test.parquet";
let (_schema, schema_ptr) = create_test_ffi_schema();

let result = NativeParquetWriter::create_writer(invalid_path.to_string(), schema_ptr);
let result = NativeParquetWriter::create_writer(invalid_path.to_string(), schema_ptr, true);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No such file or directory"));

Expand All @@ -396,7 +400,7 @@ mod tests {
let (_temp_dir, filename) = get_temp_file_path("invalid_schema.parquet");

// Test with null schema pointer
let result = NativeParquetWriter::create_writer(filename, 0);
let result = NativeParquetWriter::create_writer(filename, 0, true);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Invalid schema address"));
}
Expand All @@ -407,7 +411,7 @@ mod tests {
let (_schema, schema_ptr) = create_writer_and_assert_success(&filename);

// Second writer creation for same file should fail
let result2 = NativeParquetWriter::create_writer(filename.clone(), schema_ptr);
let result2 = NativeParquetWriter::create_writer(filename.clone(), schema_ptr, true);
assert!(result2.is_err());
assert!(result2.unwrap_err().to_string().contains("Writer already exists"));

Expand Down Expand Up @@ -580,7 +584,7 @@ mod tests {
let filename = file_path.to_string_lossy().to_string();
let (_schema, schema_ptr) = create_test_ffi_schema();

if NativeParquetWriter::create_writer(filename.clone(), schema_ptr).is_ok() {
if NativeParquetWriter::create_writer(filename.clone(), schema_ptr, true).is_ok() {
success_count.fetch_add(1, Ordering::SeqCst);
let _ = NativeParquetWriter::close_writer(filename);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_SEARCH_QUERY_PLAN_EXPLAIN_SETTING,
IndexSettings.OPTIMIZED_INDEX_ENABLED_SETTING,

IndexSettings.INDEX_COMPRESSION_ENABLED_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,13 @@ private void setSearchQueryPlanExplainEnabled(Boolean searchQueryPlaneExplainEna
Property.Final
);

public static final Setting<Boolean> INDEX_COMPRESSION_ENABLED_SETTING = Setting.boolSetting(
"index.compression.enabled",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the existing codec setting here? We can validate what values to support in Parquet plugin itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, some settings may come from parquet directly. If there is any setting which is specific to a dataformat, we should have the setting declared in the plugin itself

    public List<Setting<?>> getSettings() {
        return Collections.emptyList();
    }

true,
Property.IndexScope,
Property.Final
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -970,6 +977,11 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private final boolean isCompositeIndex;

/**
* Denotes whether the Compression is enabled for this index
*/
private final boolean isCompressionEnabled;

/**
* Denotes whether search via star tree index is enabled for this index
*/
Expand Down Expand Up @@ -1142,6 +1154,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
isCompositeIndex = scopedSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING);
isCompressionEnabled = scopedSettings.get(INDEX_COMPRESSION_ENABLED_SETTING);
isStarTreeIndexEnabled = scopedSettings.get(StarTreeIndexSettings.STAR_TREE_SEARCH_ENABLED_SETTING);
isOptimizedIndex = scopedSettings.get(OPTIMIZED_INDEX_ENABLED_SETTING);

Expand Down Expand Up @@ -1429,6 +1442,10 @@ public boolean isCompositeIndex() {
return isCompositeIndex;
}

public boolean isCompressionEnabled() {
return isCompressionEnabled;
}

/**
* Returns true if segment replication is enabled on the index.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

package org.opensearch.index.engine;

import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;

public interface DataFormatPlugin {

<T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath);
<T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings);

DataFormat getDataFormat();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.FileInfos;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
Expand Down Expand Up @@ -47,7 +48,8 @@ public CompositeIndexingExecutionEngine(
MapperService mapperService,
PluginsService pluginsService,
ShardPath shardPath,
long initialWriterGeneration
long initialWriterGeneration,
IndexSettings indexSettings
) {
this.writerGeneration = new AtomicLong(initialWriterGeneration);
List<DataFormat> dataFormats = new ArrayList<>();
Expand All @@ -57,7 +59,7 @@ public CompositeIndexingExecutionEngine(
.findAny()
.orElseThrow(() -> new IllegalArgumentException("dataformat [" + DataFormat.TEXT + "] is not registered."));
dataFormats.add(plugin.getDataFormat());
delegates.add(plugin.indexingEngine(mapperService, shardPath));
delegates.add(plugin.indexingEngine(mapperService, shardPath, indexSettings));
} catch (NullPointerException e) {
delegates.add(new TextEngine());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public void onFailure(String reason, Exception ex) {
mapperService,
pluginsService,
shardPath,
lastCommittedWriterGeneration.incrementAndGet()
lastCommittedWriterGeneration.incrementAndGet(),
indexSettings
);
//Initialize CatalogSnapshotManager before loadWriterFiles to ensure stale files are cleaned up before loading
this.catalogSnapshotManager = new CatalogSnapshotManager(this, committerRef, shardPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void configureStore() {
}

@Override
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings) {
return (IndexingExecutionEngine<T>) new TextEngine();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ default Optional<Map<org.opensearch.plugins.spi.vectorized.DataFormat, DataSourc
return Optional.empty();
}

<T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath);
<T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath, IndexSettings indexSettings);

FormatStoreDirectory<?> createFormatStoreDirectory(
IndexSettings indexSettings,
Expand Down
Loading