Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.DataFormatPlugin;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.engine.ParquetExecutionEngine;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FormatStoreDirectory;
import org.opensearch.index.store.GenericStoreDirectory;
Expand Down Expand Up @@ -78,7 +78,7 @@
* <li>Memory management via {@link com.parquet.parquetdataformat.memory} package</li>
* </ul>
*/
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {
public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin {
private Settings settings;

public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
Expand Down Expand Up @@ -119,6 +119,11 @@ public DataFormat getDataFormat() {
return new ParquetDataFormat();
}

@Override
public void canSupportFieldType(Mapper mapper) {
ArrowSchemaBuilder.canCreateParquetField(mapper);
}

@Override
public Optional<Map<org.opensearch.plugins.spi.vectorized.DataFormat, DataSourceCodec>> getDataSourceCodecs() {
Map<org.opensearch.plugins.spi.vectorized.DataFormat, DataSourceCodec> codecs = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ private static boolean notSupportedMetadataField(final Mapper mapper) {
* @throws IllegalStateException if the mapper type is not supported
*/
private static Field createArrowField(final Mapper mapper) {
final ParquetField parquetField = canCreateParquetField(mapper);
return new Field(mapper.name(), parquetField.getFieldType(), null);
}

public static ParquetField canCreateParquetField(Mapper mapper) {
final ParquetField parquetField = ArrowFieldRegistry.getParquetField(mapper.typeName());

if (parquetField == null) {
Expand All @@ -114,6 +119,6 @@ private static Field createArrowField(final Mapper mapper) {
);
}

return new Field(mapper.name(), parquetField.getFieldType(), null);
return parquetField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,8 @@ private static void updateIndexMappingsAndBuildSortOrder(
if (request.dataStreamName() != null) {
MetadataCreateDataStreamService.validateTimestampFieldMapping(mapperService);
}

indexService.verifyMappings();
}

private static void validateActiveShardCount(ActiveShardCount waitForActiveShards, IndexMetadata indexMetadata) {
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.fielddata.IndexFieldDataCache;
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MetadataFieldMapper;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
Expand Down Expand Up @@ -111,6 +112,7 @@
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.plugins.DataSourcePlugin;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.plugins.SearchEnginePlugin;
Expand Down Expand Up @@ -1099,6 +1101,18 @@ public boolean updateMapping(final IndexMetadata currentIndexMetadata, final Ind
return mapperService.updateMapping(currentIndexMetadata, newIndexMetadata);
}

public void verifyMappings() {
if (indexSettings.isOptimizedIndex()) {
mapperService.documentMapper().mappers().forEach(mapper -> {
if (mapper instanceof MetadataFieldMapper) {
return;
}
pluginsService.filterPlugins(DataSourcePlugin.class)
.forEach(plugin -> plugin.canSupportFieldType(mapper));
});
}
}

private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final Closeable[] toClose;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,9 @@ private static Tuple<Integer, ObjectMapper> getDynamicParentMapper(

// find what the dynamic setting is given the current parse context and parent
private static ObjectMapper.Dynamic dynamicOrDefault(ObjectMapper parentMapper, ParseContext context) {
if (context.indexSettings().isOptimizedIndex()) {
return ObjectMapper.Dynamic.STRICT;
}
ObjectMapper.Dynamic dynamic = parentMapper.dynamic();
while (dynamic == null) {
int lastDotNdx = parentMapper.name().lastIndexOf('.');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.CombinedDeletionPolicy;
import org.opensearch.index.engine.DataFormatPlugin;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.exec.FileMetadata;
import org.opensearch.index.engine.exec.coord.Any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.plugins.spi.vectorized.DataSourceCodec;
Expand All @@ -38,4 +39,7 @@ FormatStoreDirectory<?> createFormatStoreDirectory(
BlobContainer createBlobContainer(BlobStore blobStore, BlobPath blobPath) throws IOException;

DataFormat getDataFormat();

default void canSupportFieldType(Mapper mapperType) {
}
}
Loading