Skip to content

Commit bc17e73

Browse files
committed
WaitUntil & Segrep & cluster config tweaks.
Signed-off-by: Marc Handalian <[email protected]>
1 parent 6a7a9a1 commit bc17e73

File tree

13 files changed

+96
-25
lines changed

13 files changed

+96
-25
lines changed

.github/workflows/precommit.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
name: Gradle Precommit
2-
on: [pull_request]
3-
2+
on: [workflow_dispatch]
3+
44
jobs:
55
precommit:
66
runs-on: ${{ matrix.os }}
77
strategy:
88
matrix:
9-
os: [ubuntu-latest, windows-latest, macos-latest]
9+
os: [windows-latest]
1010
steps:
1111
- uses: actions/checkout@v2
1212
- name: Set up JDK 11
@@ -16,4 +16,4 @@ jobs:
1616
distribution: adopt
1717
- name: Run Gradle
1818
run: |
19-
./gradlew javadoc precommit --parallel
19+
./gradlew :server:test -x internalClusterTest

buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ class ClusterConfiguration {
103103
boolean autoSetHostsProvider = true
104104

105105
@Input
106-
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') +
107-
" " + "-Xmx" + System.getProperty('tests.heap.size', '512m') +
106+
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '1g') +
107+
" " + "-Xmx" + System.getProperty('tests.heap.size', '6g') +
108108
" " + System.getProperty('tests.jvm.argline', '')
109109

110110
/**

buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ public void execute(Task t) {
127127
test.exclude("**/*$*.class");
128128

129129
test.jvmArgs(
130-
"-Xmx" + System.getProperty("tests.heap.size", "512m"),
131-
"-Xms" + System.getProperty("tests.heap.size", "512m"),
130+
"-Xmx" + System.getProperty("tests.heap.size", "6g"),
131+
"-Xms" + System.getProperty("tests.heap.size", "1g"),
132132
"-XX:+HeapDumpOnOutOfMemoryError"
133133
);
134134

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1616
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
1717
import org.opensearch.action.admin.indices.segments.ShardSegments;
18+
import org.opensearch.action.index.IndexResponse;
1819
import org.opensearch.action.support.WriteRequest;
1920
import org.opensearch.action.update.UpdateResponse;
2021
import org.opensearch.client.Requests;
@@ -38,6 +39,7 @@
3839
import org.opensearch.indices.recovery.FileChunkRequest;
3940
import org.opensearch.indices.replication.common.ReplicationType;
4041
import org.opensearch.plugins.Plugin;
42+
import org.opensearch.rest.RestStatus;
4143
import org.opensearch.test.BackgroundIndexer;
4244
import org.opensearch.test.InternalTestCluster;
4345
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -291,14 +293,30 @@ public void testAddNewReplicaFailure() throws Exception {
291293
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
292294
}
293295

294-
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
296+
public void testWaitUntil() {
297+
final String nodeA = internalCluster().startNode(featureFlagSettings());
298+
final String nodeB = internalCluster().startNode(featureFlagSettings());
299+
createIndex(INDEX_NAME);
300+
ensureGreen(INDEX_NAME);
301+
IndexResponse index = client().prepareIndex(INDEX_NAME)
302+
.setId("1")
303+
.setSource("foo", "bar")
304+
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
305+
.get();
306+
assertEquals(RestStatus.CREATED, index.status());
307+
assertFalse("request shouldn't have forced a refresh", index.forcedRefresh());
308+
assertSearchHits(client(nodeA).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1");
309+
assertSearchHits(client(nodeB).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1");
310+
}
311+
312+
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
295313
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
296314
final String nodeA = internalCluster().startNode(featureFlagSettings());
297315
final String nodeB = internalCluster().startNode(featureFlagSettings());
298316
createIndex(INDEX_NAME);
299317
ensureGreen(INDEX_NAME);
300318

301-
final int initialDocCount = scaledRandomIntBetween(0, 200);
319+
final int initialDocCount = 200;
302320
try (
303321
BackgroundIndexer indexer = new BackgroundIndexer(
304322
INDEX_NAME,
@@ -313,7 +331,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
313331
indexer.start(initialDocCount);
314332
waitForDocs(initialDocCount, indexer);
315333
refresh(INDEX_NAME);
316-
waitForReplicaUpdate();
334+
// waitForReplicaUpdate();
317335

318336
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
319337
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
@@ -324,7 +342,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
324342
waitForDocs(expectedHitCount, indexer);
325343

326344
flushAndRefresh(INDEX_NAME);
327-
waitForReplicaUpdate();
345+
// waitForReplicaUpdate();
328346
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
329347
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
330348

@@ -553,9 +571,9 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
553571
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
554572
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
555573

556-
client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();
574+
client().prepareIndex(INDEX_NAME).setId("3").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).setSource("foo", "bar").get();
557575
refresh(INDEX_NAME);
558-
waitForReplicaUpdate();
576+
// waitForReplicaUpdate();
559577
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
560578
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
561579
assertSegmentStats(REPLICA_COUNT);

server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,7 @@ protected void acquireReplicaOperationPermit(
12011201
final long globalCheckpoint,
12021202
final long maxSeqNoOfUpdatesOrDeletes
12031203
) {
1204+
logger.info("acquireReplicaOperationPermit - GLOBAL CHECKPOINT {} MAXSeqNoOfUpdatesOrDeletes {}", globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
12041205
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
12051206
}
12061207

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
126126
ensureOpen();
127127
try (ReleasableLock lock = writeLock.acquire()) {
128128
final long incomingGeneration = infos.getGeneration();
129+
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
129130
readerManager.updateSegments(infos);
130131

131132
// Commit and roll the translog when we receive a different generation than what was last received.
@@ -136,7 +137,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
136137
translogManager.rollTranslogGeneration();
137138
}
138139
lastReceivedGen = incomingGeneration;
139-
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
140140
}
141141
}
142142

@@ -305,7 +305,9 @@ public List<Segment> segments(boolean verbose) {
305305
}
306306

307307
@Override
308-
public void refresh(String source) throws EngineException {}
308+
public void refresh(String source) throws EngineException {
309+
logger.info("Refresh invoked on Engine");
310+
}
309311

310312
@Override
311313
public boolean maybeRefresh(String source) throws EngineException {

server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
6464
logger.trace(
6565
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
6666
);
67+
logger.info("Returning from refreshIfNeeded");
6768
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
6869
}
6970

@@ -78,6 +79,7 @@ public synchronized void updateSegments(SegmentInfos infos) throws IOException {
7879
// is always increased.
7980
infos.updateGeneration(currentInfos);
8081
currentInfos = infos;
82+
logger.info("Invoking maybeRefresh");
8183
maybeRefresh();
8284
}
8385

server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointP
3535

3636
@Override
3737
public void beforeRefresh() throws IOException {
38+
logger.info("Before Refresh invoked?");
3839
// Do nothing
3940
}
4041

4142
@Override
4243
public void afterRefresh(boolean didRefresh) throws IOException {
44+
logger.info("After Refresh invoked: {}", didRefresh);
4345
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
4446
publisher.publish(shard);
4547
}

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,7 @@ public boolean isSegmentReplicationAllowed() {
15021502
* @return true if checkpoint should be processed
15031503
*/
15041504
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
1505+
logger.info("Checking if segrep is allowed on received checkpoint {}", requestCheckpoint);
15051506
if (isSegmentReplicationAllowed() == false) {
15061507
return false;
15071508
}
@@ -3669,6 +3670,7 @@ private void innerAcquireReplicaOperationPermit(
36693670
final boolean allowCombineOperationWithPrimaryTermUpdate,
36703671
final Consumer<ActionListener<Releasable>> operationExecutor
36713672
) {
3673+
logger.info("innerAcquireReplicaOperationPermit - GLOBAL CHECKPOINT {} - CURRENT {}", globalCheckpoint, replicationTracker.getGlobalCheckpoint());
36723674
verifyNotClosed();
36733675

36743676
// This listener is used for the execution of the operation. If the operation requires all the permits for its
@@ -3816,6 +3818,26 @@ public void sync() throws IOException {
38163818
getEngine().translogManager().syncTranslog();
38173819
}
38183820

3821+
public final boolean isOperationIndexed(Translog.Location location) {
3822+
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
3823+
final Translog.Operation operation;
3824+
try {
3825+
operation = getEngine().translogManager().readOperation(location);
3826+
} catch (IOException e) {
3827+
logger.error("Error fetching operation from xlog?", e);
3828+
return false;
3829+
}
3830+
logger.info("Operation read from xlog - {} - {}", location, operation);
3831+
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
3832+
logger.info("Latest repl checkpoint {}", latestReplicationCheckpoint);
3833+
final boolean b = operation.seqNo() <= latestReplicationCheckpoint.getSeqNo();
3834+
logger.info("IS OP IN INDEX: {}", b);
3835+
return b;
3836+
// return latestReplicationCheckpoint.getSeqNo() >= replicationTracker.getGlobalCheckpoint();
3837+
}
3838+
return true;
3839+
}
3840+
38193841
/**
38203842
* Checks if the underlying storage sync is required.
38213843
*/
@@ -3909,7 +3931,8 @@ private RefreshListeners buildRefreshListeners() {
39093931
() -> refresh("too_many_listeners"),
39103932
logger,
39113933
threadPool.getThreadContext(),
3912-
externalRefreshMetric
3934+
externalRefreshMetric,
3935+
this::isOperationIndexed
39133936
);
39143937
}
39153938

@@ -4081,8 +4104,10 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
40814104
}
40824105
}
40834106
if (readAllowed) {
4084-
refreshListeners.addOrNotify(location, listener);
4107+
final boolean b = refreshListeners.addOrNotify(location, listener);
4108+
logger.info("Register listener to call later? {}", b);
40854109
} else {
4110+
logger.info("readAllowed is false");
40864111
// we're not yet ready fo ready for reads, just ignore refresh cycles
40874112
listener.accept(false);
40884113
}

server/src/main/java/org/opensearch/index/shard/RefreshListeners.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.ArrayList;
4747
import java.util.List;
4848
import java.util.function.Consumer;
49+
import java.util.function.Function;
4950
import java.util.function.IntSupplier;
5051
import java.util.function.Supplier;
5152

@@ -65,6 +66,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
6566
private final Logger logger;
6667
private final ThreadContext threadContext;
6768
private final MeanMetric refreshMetric;
69+
private final Function<Translog.Location, Boolean> isOperationIndexed;
6870

6971
/**
7072
* Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics.
@@ -107,6 +109,16 @@ public RefreshListeners(
107109
this.logger = logger;
108110
this.threadContext = threadContext;
109111
this.refreshMetric = refreshMetric;
112+
isOperationIndexed = null;
113+
}
114+
115+
public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable too_many_listeners, Logger logger, ThreadContext threadContext, MeanMetric externalRefreshMetric, Function<Translog.Location, Boolean> isOperationIndexed) {
116+
this.getMaxRefreshListeners = getMaxRefreshListeners;
117+
this.forceRefresh = too_many_listeners;
118+
this.logger = logger;
119+
this.threadContext = threadContext;
120+
this.refreshMetric = externalRefreshMetric;
121+
this.isOperationIndexed = isOperationIndexed;
110122
}
111123

112124
/**
@@ -148,8 +160,9 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
148160
requireNonNull(listener, "listener cannot be null");
149161
requireNonNull(location, "location cannot be null");
150162

151-
if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0) {
163+
if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0 && this.isOperationIndexed.apply(currentRefreshLocation)) {
152164
// Location already visible, just call the listener
165+
logger.info("lastRefreshedLocation? {}", lastRefreshedLocation);
153166
listener.accept(false);
154167
return true;
155168
}
@@ -172,12 +185,14 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
172185
}
173186
// We have a free slot so register the listener
174187
listeners.add(new Tuple<>(location, contextPreservingListener));
188+
logger.info("Set listeners - lastRefreshedLocation? {}", lastRefreshedLocation);
175189
refreshListeners = listeners;
176190
return false;
177191
}
178192
}
179193
// No free slot so force a refresh and call the listener in this thread
180194
forceRefresh.run();
195+
logger.info("End - lastRefreshedLocation? {}", lastRefreshedLocation);
181196
listener.accept(true);
182197
return true;
183198
}
@@ -270,7 +285,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {
270285
List<Tuple<Translog.Location, Consumer<Boolean>>> preservedListeners = null;
271286
for (Tuple<Translog.Location, Consumer<Boolean>> tuple : candidates) {
272287
Translog.Location location = tuple.v1();
273-
if (location.compareTo(currentRefreshLocation) <= 0) {
288+
if (location.compareTo(currentRefreshLocation) <= 0
289+
&& (this.isOperationIndexed.apply(location))) {
274290
if (listenersToFire == null) {
275291
listenersToFire = new ArrayList<>();
276292
}

server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ private void forceSegmentReplication(
817817
new SegmentReplicationTargetService.SegmentReplicationListener() {
818818
@Override
819819
public void onReplicationDone(SegmentReplicationState state) {
820-
logger.trace(
820+
logger.info(
821821
() -> new ParameterizedMessage(
822822
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
823823
indexShard.shardId().getId(),
@@ -834,7 +834,7 @@ public void onReplicationFailure(
834834
ReplicationFailedException e,
835835
boolean sendShardFailure
836836
) {
837-
logger.trace(
837+
logger.info(
838838
() -> new ParameterizedMessage(
839839
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
840840
indexShard.shardId().getId(),

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ public SegmentReplicationTarget startReplication(
237237
final IndexShard indexShard,
238238
final SegmentReplicationListener listener
239239
) {
240+
logger.info("Starting replication from {}", indexShard.getLatestReplicationCheckpoint());
240241
final SegmentReplicationTarget target = new SegmentReplicationTarget(
241242
checkpoint,
242243
indexShard,

test/framework/src/main/java/org/opensearch/test/BackgroundIndexer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.action.bulk.BulkResponse;
4545
import org.opensearch.action.bulk.BulkShardRequest;
4646
import org.opensearch.action.index.IndexResponse;
47+
import org.opensearch.action.support.WriteRequest;
4748
import org.opensearch.client.Client;
4849
import org.opensearch.common.unit.TimeValue;
4950
import org.opensearch.common.util.concurrent.ConcurrentCollections;
@@ -145,7 +146,7 @@ public BackgroundIndexer(
145146
logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
146147
for (int i = 0; i < writers.length; i++) {
147148
final int indexerId = i;
148-
final boolean batch = random.nextBoolean();
149+
final boolean batch = true;
149150
final Random threadRandom = new Random(random.nextLong());
150151
writers[i] = new Thread() {
151152
@Override
@@ -170,12 +171,15 @@ public void run() {
170171
for (int i = 0; i < batchSize; i++) {
171172
id = idGenerator.incrementAndGet();
172173
if (useAutoGeneratedIDs) {
173-
bulkRequest.add(client.prepareIndex(index).setSource(generateSource(id, threadRandom)));
174+
bulkRequest.add(client.prepareIndex(index)
175+
.setSource(generateSource(id, threadRandom)));
174176
} else {
175177
bulkRequest.add(
176-
client.prepareIndex(index).setId(Long.toString(id)).setSource(generateSource(id, threadRandom))
178+
client.prepareIndex(index)
179+
.setId(Long.toString(id)).setSource(generateSource(id, threadRandom))
177180
);
178181
}
182+
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
179183
}
180184
try {
181185
BulkResponse bulkResponse = bulkRequest.get();

0 commit comments

Comments
 (0)