diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e1d096868dea..daaa7568f9d4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix Netty deprecation warnings in transport-netty4 module ([#20233](https://github.com/opensearch-project/OpenSearch/pull/20233)) - Fix snapshot restore when an index sort is present ([#20284](https://github.com/opensearch-project/OpenSearch/pull/20284)) - Fix SearchPhaseExecutionException to properly initCause ([#20320](https://github.com/opensearch-project/OpenSearch/pull/20320)) +- Remove child level directory on refresh for CompositeIndexWriter ([#20326](https://github.com/opensearch-project/OpenSearch/pull/20326)) ### Dependencies - Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183)) diff --git a/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java b/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java index e35c5dd1145d4..f6b7b101fd0b5 100644 --- a/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java +++ b/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java @@ -19,6 +19,8 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchException; import org.opensearch.common.CheckedBiFunction; @@ -39,6 +41,7 @@ import java.io.Closeable; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -540,12 +543,28 @@ private void refreshDocumentsForParentDirectory(CriteriaBasedIndexWriterLookup o if (!directoryToCombine.isEmpty()) { accumulatingIndexWriter.addIndexes(directoryToCombine.toArray(new Directory[0])); + Path[] childDirectoryPath = directoryToCombine.stream() + .map(directory -> getLocalFSDirectory(directory).getDirectory()) + .toArray(Path[]::new); IOUtils.closeWhileHandlingException(directoryToCombine); + IOUtils.rm(childDirectoryPath); } deleteDummyTombstoneEntry(); } + private FSDirectory getLocalFSDirectory(Directory localDirectory) { + FSDirectory localFSDirectory; + if (localDirectory instanceof FSDirectory) { + localFSDirectory = (FSDirectory) localDirectory; + } else { + // In this case it should be a FilterDirectory wrapped over FSDirectory as per above validation. + localFSDirectory = (FSDirectory) (((FilterDirectory) localDirectory).getDelegate()); + } + + return localFSDirectory; + } + private void deleteDummyTombstoneEntry() throws IOException { Term uid = new Term(IdFieldMapper.NAME, DUMMY_TOMBSTONE_DOC_ID); accumulatingIndexWriter.deleteDocuments(uid); diff --git a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java index 7625bc5945e5a..15c9002db7a50 100644 --- a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java +++ b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java @@ -29,6 +29,8 @@ import org.opensearch.index.mapper.ParsedDocument; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -187,8 +189,49 @@ public void testUnableToObtainLockOnActiveLookupWhenWriteLockDuringIndexing() th writer.join(); } - public void testConcurrentIndexingDuringRefresh() throws IOException, InterruptedException { + public void testChildDirectoryDeletedPostRefresh() throws IOException, InterruptedException { + final EngineConfig engineConfig = config(); + Path dataPath = engineConfig.getStore().shardPath().resolveIndex(); + CompositeIndexWriter compositeIndexWriter = null; + + try { + compositeIndexWriter = new CompositeIndexWriter( + engineConfig, + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + Engine.Index operation = indexForDoc(createParsedDoc("id", null, DEFAULT_CRITERIA)); + try (Releasable ignore1 = compositeIndexWriter.acquireLock(operation.uid().bytes())) { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } + operation = indexForDoc(createParsedDoc("id2", null, "testingNewCriteria")); + try (Releasable ignore1 = compositeIndexWriter.acquireLock(operation.uid().bytes())) { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } + + compositeIndexWriter.beforeRefresh(); + compositeIndexWriter.afterRefresh(true); + + long directoryCount = Files.find( + dataPath, + 1, + (path, attributes) -> attributes.isDirectory() == true && path.endsWith("extra0") == false + ).count() - 1; + // Ensure no child directory is pending here. + assertEquals(0, directoryCount); + } finally { + if (compositeIndexWriter != null) { + IOUtils.closeWhileHandlingException(compositeIndexWriter); + } + } + + } + + public void testConcurrentIndexingDuringRefresh() throws IOException, InterruptedException { CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( config(), createWriter(),