diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java index 0e2621ccce9d7..668fe3bf2f7cc 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java @@ -23,11 +23,11 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.engine.DataFormatPlugin; import org.opensearch.index.engine.exec.DataFormat; import org.opensearch.index.engine.exec.IndexingExecutionEngine; import com.parquet.parquetdataformat.bridge.RustBridge; import com.parquet.parquetdataformat.engine.ParquetExecutionEngine; +import org.opensearch.index.mapper.Mapper; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FormatStoreDirectory; import org.opensearch.index.store.GenericStoreDirectory; @@ -78,7 +78,7 @@ *
  • Memory management via {@link com.parquet.parquetdataformat.memory} package
  • * */ -public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin { +public class ParquetDataFormatPlugin extends Plugin implements DataSourcePlugin { private Settings settings; public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%"; @@ -119,6 +119,11 @@ public DataFormat getDataFormat() { return new ParquetDataFormat(); } + @Override + public void canSupportFieldType(Mapper mapper) { + ArrowSchemaBuilder.canCreateParquetField(mapper); + } + @Override public Optional> getDataSourceCodecs() { Map codecs = new HashMap<>(); diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowSchemaBuilder.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowSchemaBuilder.java index 5430b7fa03101..8b8f8893455df 100644 --- a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowSchemaBuilder.java +++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/fields/ArrowSchemaBuilder.java @@ -105,6 +105,11 @@ private static boolean notSupportedMetadataField(final Mapper mapper) { * @throws IllegalStateException if the mapper type is not supported */ private static Field createArrowField(final Mapper mapper) { + final ParquetField parquetField = canCreateParquetField(mapper); + return new Field(mapper.name(), parquetField.getFieldType(), null); + } + + public static ParquetField canCreateParquetField(Mapper mapper) { final ParquetField parquetField = ArrowFieldRegistry.getParquetField(mapper.typeName()); if (parquetField == null) { @@ -114,6 +119,6 @@ private static Field createArrowField(final Mapper mapper) { ); } - return new Field(mapper.name(), parquetField.getFieldType(), null); + return parquetField; } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index a889091140d12..c3cf10a24cf7d 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1450,6 +1450,8 @@ private static void updateIndexMappingsAndBuildSortOrder( if (request.dataStreamName() != null) { MetadataCreateDataStreamService.validateTimestampFieldMapping(mapperService); } + + indexService.verifyMappings(); } private static void validateActiveShardCount(ActiveShardCount waitForActiveShards, IndexMetadata indexMetadata) { diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 972b1c54d300f..345cc259bf5e8 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -81,6 +81,7 @@ import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.fielddata.IndexFieldDataService; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.MetadataFieldMapper; import org.opensearch.index.query.QueryShardContext; import org.opensearch.index.query.SearchIndexNameMatcher; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; @@ -111,6 +112,7 @@ import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.plugins.DataSourcePlugin; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.plugins.SearchEnginePlugin; @@ -1099,6 +1101,18 @@ public boolean updateMapping(final IndexMetadata currentIndexMetadata, final Ind return mapperService.updateMapping(currentIndexMetadata, newIndexMetadata); } + public void verifyMappings() { + if (indexSettings.isOptimizedIndex()) { + mapperService.documentMapper().mappers().forEach(mapper -> { + if (mapper instanceof MetadataFieldMapper) { + return; + } + pluginsService.filterPlugins(DataSourcePlugin.class) + .forEach(plugin -> plugin.canSupportFieldType(mapper)); + }); + } + } + private class StoreCloseListener implements Store.OnClose { private final ShardId shardId; private final Closeable[] toClose; diff --git a/server/src/main/java/org/opensearch/index/engine/DataFormatPlugin.java b/server/src/main/java/org/opensearch/index/engine/DataFormatPlugin.java deleted file mode 100644 index 2bb09a50dee52..0000000000000 --- a/server/src/main/java/org/opensearch/index/engine/DataFormatPlugin.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.engine; - -import org.opensearch.index.engine.exec.DataFormat; -import org.opensearch.index.engine.exec.IndexingExecutionEngine; -import org.opensearch.index.mapper.MapperService; -import org.opensearch.index.shard.ShardPath; - -public interface DataFormatPlugin { - - IndexingExecutionEngine indexingEngine(MapperService mapperService, ShardPath shardPath); - - DataFormat getDataFormat(); -} diff --git a/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java b/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java index b81b3dfde7951..232f7bf0e869a 100644 --- a/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/opensearch/index/mapper/DocumentParser.java @@ -1065,6 +1065,9 @@ private static Tuple getDynamicParentMapper( // find what the dynamic setting is given the current parse context and parent private static ObjectMapper.Dynamic dynamicOrDefault(ObjectMapper parentMapper, ParseContext context) { + if (context.indexSettings().isOptimizedIndex()) { + return ObjectMapper.Dynamic.STRICT; + } ObjectMapper.Dynamic dynamic = parentMapper.dynamic(); while (dynamic == null) { int lastDotNdx = parentMapper.name().lastIndexOf('.'); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 9e2cbbd177f3f..8cdd2d749a901 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -91,7 +91,6 @@ import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.CombinedDeletionPolicy; -import org.opensearch.index.engine.DataFormatPlugin; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.exec.FileMetadata; import org.opensearch.index.engine.exec.coord.Any; diff --git a/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java b/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java index 378645b06b012..8eb269adeaec6 100644 --- a/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/DataSourcePlugin.java @@ -14,6 +14,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.exec.DataFormat; import org.opensearch.index.engine.exec.IndexingExecutionEngine; +import org.opensearch.index.mapper.Mapper; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.shard.ShardPath; import org.opensearch.plugins.spi.vectorized.DataSourceCodec; @@ -38,4 +39,7 @@ FormatStoreDirectory createFormatStoreDirectory( BlobContainer createBlobContainer(BlobStore blobStore, BlobPath blobPath) throws IOException; DataFormat getDataFormat(); + + default void canSupportFieldType(Mapper mapperType) { + } }