-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Remove child level directory on refresh for CompositeIndexWriter #20326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,8 @@ | |
| import org.apache.lucene.index.Term; | ||
| import org.apache.lucene.store.AlreadyClosedException; | ||
| import org.apache.lucene.store.Directory; | ||
| import org.apache.lucene.store.FSDirectory; | ||
| import org.apache.lucene.store.FilterDirectory; | ||
| import org.apache.lucene.util.BytesRef; | ||
| import org.opensearch.OpenSearchException; | ||
| import org.opensearch.common.CheckedBiFunction; | ||
|
|
@@ -39,6 +41,7 @@ | |
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.nio.file.Path; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
|
|
@@ -540,12 +543,28 @@ private void refreshDocumentsForParentDirectory(CriteriaBasedIndexWriterLookup o | |
|
|
||
| if (!directoryToCombine.isEmpty()) { | ||
| accumulatingIndexWriter.addIndexes(directoryToCombine.toArray(new Directory[0])); | ||
| Path[] childDirectoryPath = directoryToCombine.stream() | ||
| .map(directory -> getLocalFSDirectory(directory).getDirectory()) | ||
| .toArray(Path[]::new); | ||
| IOUtils.closeWhileHandlingException(directoryToCombine); | ||
| IOUtils.rm(childDirectoryPath); | ||
| } | ||
|
|
||
| deleteDummyTombstoneEntry(); | ||
| } | ||
|
|
||
| private FSDirectory getLocalFSDirectory(Directory localDirectory) { | ||
| FSDirectory localFSDirectory; | ||
| if (localDirectory instanceof FSDirectory) { | ||
| localFSDirectory = (FSDirectory) localDirectory; | ||
| } else { | ||
| // In this case it should be a FilterDirectory wrapped over FSDirectory as per above validation. | ||
| localFSDirectory = (FSDirectory) (((FilterDirectory) localDirectory).getDelegate()); | ||
| } | ||
|
|
||
| return localFSDirectory; | ||
| } | ||
|
Comment on lines
+556
to
+566
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add defensive checks to prevent ClassCastException. The method assumes the directory is either
The comment mentions "as per above validation" but no such validation exists. 🔎 Proposed defensive implementation private FSDirectory getLocalFSDirectory(Directory localDirectory) {
+ if (localDirectory == null) {
+ throw new IllegalArgumentException("localDirectory cannot be null");
+ }
+
FSDirectory localFSDirectory;
if (localDirectory instanceof FSDirectory) {
localFSDirectory = (FSDirectory) localDirectory;
- } else {
- // In this case it should be a FilterDirectory wrapped over FSDirectory as per above validation.
+ } else if (localDirectory instanceof FilterDirectory) {
+ // Unwrap FilterDirectory to get the delegate
+ Directory delegate = ((FilterDirectory) localDirectory).getDelegate();
+ // Recursively unwrap in case of multiple FilterDirectory layers
+ if (delegate instanceof FilterDirectory || delegate instanceof FSDirectory) {
+ localFSDirectory = getLocalFSDirectory(delegate);
+ } else {
+ throw new IllegalArgumentException(
+ "Expected FSDirectory but FilterDirectory wraps: " + delegate.getClass().getName()
+ );
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Expected FSDirectory or FilterDirectory but got: " + localDirectory.getClass().getName()
+ );
- localFSDirectory = (FSDirectory) (((FilterDirectory) localDirectory).getDelegate());
}
return localFSDirectory;
}🤖 Prompt for AI Agents |
||
|
|
||
| private void deleteDummyTombstoneEntry() throws IOException { | ||
| Term uid = new Term(IdFieldMapper.NAME, DUMMY_TOMBSTONE_DOC_ID); | ||
| accumulatingIndexWriter.deleteDocuments(uid); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| import org.opensearch.index.mapper.ParsedDocument; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.file.Files; | ||
| import java.nio.file.Path; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
|
|
@@ -187,8 +189,49 @@ public void testUnableToObtainLockOnActiveLookupWhenWriteLockDuringIndexing() th | |
| writer.join(); | ||
| } | ||
|
|
||
| public void testConcurrentIndexingDuringRefresh() throws IOException, InterruptedException { | ||
| public void testChildDirectoryDeletedPostRefresh() throws IOException, InterruptedException { | ||
| final EngineConfig engineConfig = config(); | ||
| Path dataPath = engineConfig.getStore().shardPath().resolveIndex(); | ||
| CompositeIndexWriter compositeIndexWriter = null; | ||
|
|
||
| try { | ||
| compositeIndexWriter = new CompositeIndexWriter( | ||
| engineConfig, | ||
| createWriter(), | ||
| newSoftDeletesPolicy(), | ||
| softDeletesField, | ||
| indexWriterFactory | ||
| ); | ||
|
|
||
| Engine.Index operation = indexForDoc(createParsedDoc("id", null, DEFAULT_CRITERIA)); | ||
| try (Releasable ignore1 = compositeIndexWriter.acquireLock(operation.uid().bytes())) { | ||
| compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); | ||
| } | ||
|
|
||
| operation = indexForDoc(createParsedDoc("id2", null, "testingNewCriteria")); | ||
| try (Releasable ignore1 = compositeIndexWriter.acquireLock(operation.uid().bytes())) { | ||
| compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); | ||
| } | ||
|
|
||
| compositeIndexWriter.beforeRefresh(); | ||
| compositeIndexWriter.afterRefresh(true); | ||
|
|
||
| long directoryCount = Files.find( | ||
| dataPath, | ||
| 1, | ||
| (path, attributes) -> attributes.isDirectory() == true && path.endsWith("extra0") == false | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] Add a comment explaining why extra0 needs to be removed [ExtrasFS] |
||
| ).count() - 1; | ||
| // Ensure no child directory is pending here. | ||
| assertEquals(0, directoryCount); | ||
| } finally { | ||
| if (compositeIndexWriter != null) { | ||
| IOUtils.closeWhileHandlingException(compositeIndexWriter); | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| public void testConcurrentIndexingDuringRefresh() throws IOException, InterruptedException { | ||
| CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( | ||
| config(), | ||
| createWriter(), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should assert that it is always an FSDirectory since we use createFSDirectory method from store